In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import datetime as DT
import random

spark = SparkSession.builder\
        .master("local")\
        .appName('PySpark')\
        .config('spark.driver.bindAddress','localhost')\
        .config('spark.ui.port','4040')\
        .getOrCreate()

Создайте схему будущего фрейма данных. Схема должна включать следующие атрибуты:

·   id -  уникальный идентификатор посетителя сайта. Тип – последовательность чисел фиксированной длины. Данное поле не является первичным ключом.

·   timestamp – дата и время события в формате unix timestamp.

·   type – тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).

·   page_id – id текущей страницы. Тип - последовательность чисел фиксированной длины.

·   tag – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.

·   sign – наличие у пользователя личного кабинета. Значения – True/False.

In [2]:
schema = T.StructType ([
    T.StructField('id', T.IntegerType(), True),
    T.StructField('timestamp',T.LongType(), True),
    T.StructField('type', T.StringType(), True),
    T.StructField('page_id', T.IntegerType(), True),
    T.StructField('tag', T.StringType(), True),
    T.StructField('sign', T.BooleanType(), True)
    ])

Создайте датафрейм с описанной выше схемой данных.Наполните датафрейм данными.

In [3]:
type_list = ['visit', 'click', 'scroll', 'move']
tag_list = ['politics', 'sports', 'medicine', 'technology', 'science', 'philosophy', 'religion', 'art', 'travel']
boolean_list = [True, False]

In [4]:
iters = 500
vals = [
    {"id" : int(random.randrange(1000, 1100)),
    "timestamp" : round(DT.datetime.timestamp(DT.datetime.now() - DT.timedelta(random.random()))),
    "type" : str(random.choice(type_list)),
    "page_id" : int(random.randrange(100000, 100075)),
    "tag" : str(random.choice(tag_list)),
    "sign" : bool(random.choice(boolean_list))
    }
    for _ in range(iters)
]

In [5]:
df = spark.createDataFrame(data= vals, schema= schema)

In [6]:
df.show(3)

+----+----------+------+-------+-------+-----+
|  id| timestamp|  type|page_id|    tag| sign|
+----+----------+------+-------+-------+-----+
|1031|1669481379|  move| 100033|science| true|
|1025|1669513563|scroll| 100060| travel| true|
|1066|1669514543|scroll| 100073|science|false|
+----+----------+------+-------+-------+-----+
only showing top 3 rows



Вывести топ-5 самых активных посетителей сайта

In [7]:
df.groupBy("id")\
    .count()\
    .orderBy("count", ascending=False)\
    .show(5)


+----+-----+
|  id|count|
+----+-----+
|1024|   11|
|1019|   10|
|1081|    9|
|1089|    9|
|1078|    9|
+----+-----+
only showing top 5 rows



Посчитать процент посетителей, у которых есть ЛК

In [8]:
df.filter(df.sign == True).count() / df.count() * 100

50.8

Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [9]:
df.filter(df.type == 'click')\
    .groupBy("page_id")\
    .count()\
    .orderBy("count", ascending=False)\
    .show(5)

+-------+-----+
|page_id|count|
+-------+-----+
| 100074|    6|
| 100028|    4|
| 100060|    4|
| 100070|    4|
| 100027|    4|
+-------+-----+
only showing top 5 rows



Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [10]:
df.withColumn('time_range', F.when(F.hour(F.from_unixtime('timestamp')) < 4, "0-4")
                                .when((F.hour(F.from_unixtime('timestamp')) >= 4) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 8), '4-8')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 8) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 12), '8-12')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 12) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 16), '12-16')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 16) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 20), '16-20')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 20) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 24), '20-24')
).show(10)


+----+----------+------+-------+----------+-----+----------+
|  id| timestamp|  type|page_id|       tag| sign|time_range|
+----+----------+------+-------+----------+-----+----------+
|1031|1669481379|  move| 100033|   science| true|     16-20|
|1025|1669513563|scroll| 100060|    travel| true|       4-8|
|1066|1669514543|scroll| 100073|   science|false|       4-8|
|1024|1669515721|scroll| 100057|  politics|false|       4-8|
|1076|1669536078|  move| 100042|  medicine| true|      8-12|
|1015|1669456361| visit| 100061|    sports| true|     12-16|
|1034|1669477124| visit| 100039|  politics|false|     16-20|
|1076|1669463652|  move| 100061|       art|false|     12-16|
|1076|1669505936|  move| 100060|technology|false|       0-4|
|1076|1669468936| visit| 100034|philosophy|false|     16-20|
+----+----------+------+-------+----------+-----+----------+
only showing top 10 rows



Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [11]:
df.withColumn('time_range', F.when(F.hour(F.from_unixtime('timestamp')) < 4, "0-4")
                                .when((F.hour(F.from_unixtime('timestamp')) >= 4) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 8), '4-8')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 8) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 12), '8-12')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 12) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 16), '12-16')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 16) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 20), '16-20')
                                .when((F.hour(F.from_unixtime('timestamp')) >= 20) & 
                                        (F.hour(F.from_unixtime('timestamp')) < 24), '20-24')
).groupBy("time_range").count().orderBy("count", ascending=False).show()


+----------+-----+
|time_range|count|
+----------+-----+
|     20-24|   95|
|       4-8|   91|
|       0-4|   88|
|     12-16|   79|
|     16-20|   74|
|      8-12|   73|
+----------+-----+



Cоздайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

1.       Id – уникальный идентификатор личного кабинета

2.       User_id – уникальный идентификатор посетителя

3.       ФИО посетителя

4.       Дату рождения посетителя 

5.       Дата создания ЛК

In [12]:
schema_lk = T.StructType ([
    T.StructField('id', T.IntegerType(), False),
    T.StructField('user_id',T.IntegerType(), False),
    T.StructField('last_name', T.StringType(), True),
    T.StructField('name', T.StringType(), True),
    T.StructField('patronymic', T.StringType(), True),
    T.StructField('birth_date', T.StringType(), True),
    T.StructField('registration_date', T.StringType(), True)
    ])

In [13]:
user_id = df.select('id').filter(df.sign == True).rdd.map(lambda row : row[0]).collect()
name = ['Василий','Аркадий','Антон','Вадим','Александр','Алексей','Варлам','Золтан','Егор']
last_name = ['Гагарин','Иванов','Шишкин','Шишов','Юдин','Баранов','Волков','Казаков','Карасёв']
patronymic = ['Антонович','Геннадиевич','Ефремиевич','Климович','Макарович','Олегович','Парфентьевич','Созонович','Фридрихович']
delta =  DT.timedelta (days = 12300)

In [14]:
iters_lk = len(user_id)
vals_lk = [
    {"id" : int(random.randrange(1000, 1100)),
    "last_name" : str(random.choice(last_name)),
    "name" : str(random.choice(name)),
    "patronymic" : str(random.choice(patronymic)),
    "birth_date" : DT.datetime.strftime((DT.datetime.now() - DT.timedelta(days = (random.randrange(11000, 12100)))), "%Y/%m/%d"),
    "registration_date" : DT.datetime.strftime((DT.datetime.now() - DT.timedelta(days = (random.randrange(500, 1100)))), "%Y/%m/%d")
    }
    for _ in range(iters_lk)
]

i=0
for l in vals_lk:
    l.update({"user_id" : user_id[i]})
    i=i+1

In [15]:
df_lk = spark.createDataFrame(data= vals_lk, schema= schema_lk)
df_lk.show(5)

+----+-------+---------+---------+-----------+----------+-----------------+
|  id|user_id|last_name|     name| patronymic|birth_date|registration_date|
+----+-------+---------+---------+-----------+----------+-----------------+
|1075|   1031|   Волков|Александр|Геннадиевич|1991/12/30|       2021/06/02|
|1095|   1025|     Юдин|  Василий|Геннадиевич|1990/04/22|       2020/12/18|
|1074|   1076|  Гагарин|  Алексей|  Антонович|1992/09/02|       2020/05/07|
|1093|   1015|  Гагарин|  Аркадий|Геннадиевич|1991/06/30|       2020/07/02|
|1060|   1019|   Шишкин|   Варлам|Геннадиевич|1989/12/22|       2019/12/21|
+----+-------+---------+---------+-----------+----------+-----------------+
only showing top 5 rows



Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [18]:
df_lk.join(df, df_lk.user_id == df.id, 'inner')\
    .select('last_name','name','patronymic')\
    .filter(df.tag == 'sports')\
    .show(5)

+---------+---------+------------+
|last_name|     name|  patronymic|
+---------+---------+------------+
|    Шишов|   Варлам| Геннадиевич|
|   Шишкин|     Егор|Парфентьевич|
|  Баранов|Александр| Фридрихович|
|    Шишов|  Алексей| Геннадиевич|
|  Карасёв|   Золтан|    Климович|
+---------+---------+------------+
only showing top 5 rows

