In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

from pyspark.sql.functions import udf

import datetime

In [4]:
spark = SparkSession.builder.master("local") \
                    .appName("Home_work_py") \
                    .config("spark.driver.bindAddress", "localhost") \
                    .config("spark.ui.port", "4040") \
                    .getOrCreate()

In [5]:
schema_web = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("timestamp", T.LongType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

In [6]:
data_web =[(1, 1667627126, "visit", 101, 'Sport', False),
           (1, 1667627286, "scroll", 101, 'Sport', False),
           (1, 1667627300, "click", 101, 'Sport', False),
           (1, 1667627505, "visit", 102, 'Politics', False),
           (1, 1667627565, "click", 102, 'Politics', False),
           (1, 1667627586, "visit", 103, 'Sport', False),
           (2, 1667728001, "visit", 104, 'Politics', True),
           (2, 1667728101, "scroll", 104, 'Politics', True),
           (2, 1667728151, "click", 104, 'Politics', True),
           (2, 1667728200, "visit", 105, 'Business', True),
           (2, 1667728226, "click", 105, 'Business', True),
           (2, 1667728317, "visit", 106, 'Business', True),
           (2, 1667728359, "scroll", 106, 'Business', True),
           (3, 1667828422, "visit", 101, 'Sport', False),
           (3, 1667828486, "scroll", 101, 'Sport', False),
           (4, 1667828505, "visit", 106, 'Business', False),
           (5, 1667828511, "visit", 101, 'Sport', True),
           (5, 1667828901, "click", 101, 'Sport', True),
           (5, 1667828926, "visit", 102, 'Politics', True),
           (5, 1667828976, "click", 102, 'Politics', True),
           (6, 1667728317, "visit", 106, 'Business', True),
           (6, 1667728359, "scroll", 106, 'Business', True),
           (6, 1667828422, "visit", 101, 'Sport', False),]


df_web = spark.createDataFrame(data = data_web, schema = schema_web)

In [7]:
df_web.show(25)

                                                                                

+---+----------+------+-------+--------+-----+
| id| timestamp|  type|page_id|     tag| sign|
+---+----------+------+-------+--------+-----+
|  1|1667627126| visit|    101|   Sport|false|
|  1|1667627286|scroll|    101|   Sport|false|
|  1|1667627300| click|    101|   Sport|false|
|  1|1667627505| visit|    102|Politics|false|
|  1|1667627565| click|    102|Politics|false|
|  1|1667627586| visit|    103|   Sport|false|
|  2|1667728001| visit|    104|Politics| true|
|  2|1667728101|scroll|    104|Politics| true|
|  2|1667728151| click|    104|Politics| true|
|  2|1667728200| visit|    105|Business| true|
|  2|1667728226| click|    105|Business| true|
|  2|1667728317| visit|    106|Business| true|
|  2|1667728359|scroll|    106|Business| true|
|  3|1667828422| visit|    101|   Sport|false|
|  3|1667828486|scroll|    101|   Sport|false|
|  4|1667828505| visit|    106|Business|false|
|  5|1667828511| visit|    101|   Sport| true|
|  5|1667828901| click|    101|   Sport| true|
|  5|16678289

In [8]:
#Вывести топ-5 самых активных посетителей сайта
df_web.groupby('id')\
      .count()\
      .orderBy('count', ascending = False)\
      .show()



+---+-----+
| id|count|
+---+-----+
|  2|    7|
|  1|    6|
|  5|    4|
|  6|    3|
|  3|    2|
|  4|    1|
+---+-----+



                                                                                

In [9]:
df_web.groupby('id')\
      .agg(F.count('*').alias('event_cnt'))\
      .orderBy('event_cnt', ascending = False)\
      .show(5)



+---+---------+
| id|event_cnt|
+---+---------+
|  2|        7|
|  1|        6|
|  5|        4|
|  6|        3|
|  3|        2|
+---+---------+
only showing top 5 rows



                                                                                

In [10]:
# Посчитать процент посетителей, у которых есть ЛК
df_web.select(F.floor((F.count(F.when(df_web.sign == 'true', True)) * 100 / F.count('*'))).alias('percent_sign_user'))\
      .show()

+-----------------+
|percent_sign_user|
+-----------------+
|               56|
+-----------------+



In [11]:
# Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
df_web.groupby('page_id')\
      .agg(F.count(F.when(df_web.type =='click', True)).alias("type_cnt"))\
      .orderBy('type_cnt', ascending = False)\
      .select('page_id')\
      .show(5)



+-------+
|page_id|
+-------+
|    101|
|    102|
|    104|
|    105|
|    103|
+-------+
only showing top 5 rows



                                                                                

In [12]:
# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12, 12-16, 16-20, 20-24 и т.д.)
df_web = df_web.select(*[i for i in df_web.columns if i != 'timestamp'],
                       F.from_unixtime('timestamp').alias('event_time'))\
               .withColumn('range', F.floor(F.hour('event_time') / F.lit(4)))\
               .withColumn('time_range', F.expr("case when range = 0 then '0 - 4'" +
                                                     "when range = 1 then '4 - 8'" +
                                                     "when range = 2 then '8 - 12'" +
                                                     "when range = 3 then '12 - 16'" +
                                                     "when range = 4 then '16 - 20'" +
                                                     "when range = 5 then '20 - 24'" +
                                                     "else 'Unknown' end"))\
               .drop('range')

In [13]:
df_web.show(25)

+---+------+-------+--------+-----+-------------------+----------+
| id|  type|page_id|     tag| sign|         event_time|time_range|
+---+------+-------+--------+-----+-------------------+----------+
|  1| visit|    101|   Sport|false|2022-11-05 05:45:26|     4 - 8|
|  1|scroll|    101|   Sport|false|2022-11-05 05:48:06|     4 - 8|
|  1| click|    101|   Sport|false|2022-11-05 05:48:20|     4 - 8|
|  1| visit|    102|Politics|false|2022-11-05 05:51:45|     4 - 8|
|  1| click|    102|Politics|false|2022-11-05 05:52:45|     4 - 8|
|  1| visit|    103|   Sport|false|2022-11-05 05:53:06|     4 - 8|
|  2| visit|    104|Politics| true|2022-11-06 09:46:41|    8 - 12|
|  2|scroll|    104|Politics| true|2022-11-06 09:48:21|    8 - 12|
|  2| click|    104|Politics| true|2022-11-06 09:49:11|    8 - 12|
|  2| visit|    105|Business| true|2022-11-06 09:50:00|    8 - 12|
|  2| click|    105|Business| true|2022-11-06 09:50:26|    8 - 12|
|  2| visit|    106|Business| true|2022-11-06 09:51:57|    8 -

In [14]:
#  Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.
df_web.groupby('time_range')\
      .agg(F.count('*').alias('range_cnt'))\
      .orderBy('range_cnt', ascending = False)\
      .select('time_range')\
      .show()



+----------+
|time_range|
+----------+
|    8 - 12|
|   12 - 16|
|     4 - 8|
+----------+



                                                                                

In [15]:
schema_lk = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("dob", T.DateType(), True),
                T.StructField("doc", T.DateType(), True)])

In [16]:
data_lk = [
    (101, 2, "Иванов Иван Иванович", datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),
    (102, 5, "Александрова Александра Александровна", datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)),
    (103, 1, "Николаева Людмила Петрововна", datetime.datetime(1991, 5, 2), datetime.datetime(2017, 1, 17)),
    (104, 3, "Петров Петр Петрович", datetime.datetime(1998, 8, 13), datetime.datetime(2013, 5, 27)),
    (105, 4, "Сидоров Иван Петрович", datetime.datetime(1994, 3, 10), datetime.datetime(2018, 11, 9)),
    (106, 6, "Степанов Степан Степаныч", datetime.datetime(1989, 6, 11), datetime.datetime(2020, 9, 19))]

df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [17]:
df_lk.show()

+---+-------+--------------------+----------+----------+
| id|user_id|                 fio|       dob|       doc|
+---+-------+--------------------+----------+----------+
|101|      2|Иванов Иван Иванович|1990-07-05|2016-08-01|
|102|      5|Александрова Алек...|1995-01-22|2017-10-07|
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|
|104|      3|Петров Петр Петрович|1998-08-13|2013-05-27|
|105|      4|Сидоров Иван Петр...|1994-03-10|2018-11-09|
|106|      6|Степанов Степан С...|1989-06-11|2020-09-19|
+---+-------+--------------------+----------+----------+



In [18]:
# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт
df_web_sport = df_lk.alias('lk').join(df_web.alias('web'),
                                        on = [F.col('lk.user_id') == F.col('web.id')],
                                        how = 'left')\
                                .filter(df_web.tag == 'Sport')\
                                .groupby('fio').count()\
                                .select('fio')\
                                .show()

                                                                                

+--------------------+
|                 fio|
+--------------------+
|Александрова Алек...|
|Петров Петр Петрович|
|Николаева Людмила...|
|Степанов Степан С...|
+--------------------+



In [21]:
# Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения
df_t = df_lk.alias('lk').join(df_web.alias('web'),
                                        on = [F.col('lk.user_id') == F.col('web.id')],
                                        how = 'left')\
                        .withColumn('date_diff', (F.datediff('web.event_time', 'lk.doc')))\
                        .groupby('lk.id')\
                        .agg({'event_time' : 'max',
                              'date_diff' : 'max'})\
                        .orderBy('max(date_diff)',ascending = False)\
                        .select('id')\
                        .limit(int(round((df_t.count() * 0.1), 0))) # Вывод 10% ЛК

                                                                                

In [23]:
df_t.show() 



+---+
| id|
+---+
|104|
+---+



                                                                                

In [24]:
# Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины
@udf(T.StringType())
def calc_gender(fio):
    surname, name, middlename = fio.split(' ')
    if (((surname[-2:] == "ов") or (surname[-2:] == "ев")) and \
        ((middlename[-2:] == "ич") or (middlename[-2:] == "ыч"))):
        return 'm'
    else:
        return 'w'

In [25]:
df_p = df_lk.alias('lk').join(df_web.alias('web'),
                                        on = [F.col('lk.user_id') == F.col('web.id')],
                                        how = 'left')\
                        .withColumn('gender', calc_gender(F.col('fio')))

In [26]:
df_p.show(25)

                                                                                

+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+----------+------+
| id|user_id|                 fio|       dob|       doc| id|  type|page_id|     tag| sign|         event_time|time_range|gender|
+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+----------+------+
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|  1| visit|    101|   Sport|false|2022-11-05 05:45:26|     4 - 8|     w|
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|  1|scroll|    101|   Sport|false|2022-11-05 05:48:06|     4 - 8|     w|
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|  1| click|    101|   Sport|false|2022-11-05 05:48:20|     4 - 8|     w|
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|  1| visit|    102|Politics|false|2022-11-05 05:51:45|     4 - 8|     w|
|103|      1|Николаева Людмила...|1991-05-02|2017-01-17|  1| click|    102|Politics|false|2022-11

In [1245]:
df_p.filter(df_p.gender == 'm')\
    .groupby('page_id').count()\
    .orderBy('count' , ascending = False)\
    .select('page_id')\
    .show(5)



+-------+
|page_id|
+-------+
|    106|
|    101|
|    104|
|    105|
+-------+



                                                                                

In [1243]:
df_p.filter(df_p.gender == 'w')\
    .groupby('page_id').count()\
    .orderBy('count' ,ascending = False)\
    .select('page_id')\
    .show(5)



+-------+
|page_id|
+-------+
|    101|
|    102|
|    103|
+-------+



                                                                                

In [1264]:
df_p_m = df_p.select("page_id", "gender")\
             .groupby("page_id", "gender")\
             .count()\
             .filter(df_p.gender == "m")\
             .orderBy('count' ,ascending = False)\
             .limit(5)

In [1265]:
df_p_w = df_p.select("page_id", "gender")\
             .groupby("page_id", "gender")\
             .count()\
             .filter(df_p.gender == "w")\
             .orderBy('count' ,ascending = False)\
             .limit(5)

In [1266]:
df_p_m.union(df_p_w)\
      .select('page_id', 'gender')\
      .show()



+-------+------+
|page_id|gender|
+-------+------+
|    106|     m|
|    101|     m|
|    104|     m|
|    105|     m|
|    101|     w|
|    102|     w|
|    103|     w|
+-------+------+



                                                                                