## 3.3 Практика Spark

### Импорты

In [35]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
import datetime

### Connection

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        master("spark://spark-master:7077").\
        appName("Task_3.3").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

22/12/02 11:17:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### a. Создание схемы будущего фрейма данных

In [3]:
schema = T.StructType([
                T.StructField("id", T.IntegerType(), True),
                T.StructField("timestamp", T.LongType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

### b. Создание датафрейма с описанной выше схемой данных

In [4]:
data = [
    (1, int(datetime.datetime(2022, 6, 7, 12, 34, 55).timestamp()), "visit", 100, 'Sport', False),
    (1, int(datetime.datetime(2022, 6, 7, 12, 35, 45).timestamp()), "click", 100, 'Tech', False),
    (1, int(datetime.datetime(2022, 6, 7, 12, 36, 35).timestamp()), "click", 100, 'Tech', False),
    (1, int(datetime.datetime(2022, 6, 7, 12, 37, 25).timestamp()), "move", 101, 'Tech', False),
    (1, int(datetime.datetime(2022, 6, 7, 12, 38, 15).timestamp()), "move", 102, 'Politics', False),
    (1, int(datetime.datetime(2022, 6, 8, 12, 39, 10).timestamp()), "visit", 102, 'Sport', False),
    (1, int(datetime.datetime(2022, 6, 8, 12, 39, 1).timestamp()), "move", 110, 'Politics', False),
    (2, int(datetime.datetime(2022, 6, 5, 10, 15, 5).timestamp()), "visit", 103, 'Sport', True),
    (2, int(datetime.datetime(2022, 6, 5, 10, 15, 23).timestamp()), "move", 109, 'Politics', True),
    (3, int(datetime.datetime(2022, 7, 5, 22, 14, 55).timestamp()), "visit", 103, 'Sport', True),
    (3, int(datetime.datetime(2022, 7, 5, 22, 15, 5).timestamp()), "click", 119, 'Politics', True),
    (3, int(datetime.datetime(2022, 7, 5, 22, 16, 15).timestamp()), "scroll", 112, 'Sport', True),
    (3, int(datetime.datetime(2022, 7, 5, 22, 17, 5).timestamp()), "scroll", 112, 'Tech', True),
    (4, int(datetime.datetime(2019, 1, 3, 17, 41, 10).timestamp()), "visit", 10, 'Tech', False),
    (4, int(datetime.datetime(2019, 1, 3, 17, 41, 50).timestamp()), "scroll", 11, 'Tech', False),
    (5, int(datetime.datetime(2010, 10, 12, 21, 13, 5).timestamp()), "visit", 1, 'Sport', True),
    (5, int(datetime.datetime(2010, 10, 12, 21, 13, 10).timestamp()), "scroll", 2, 'Tech', True),
    (5, int(datetime.datetime(2010, 11, 12, 21, 13, 15).timestamp()), "visit", 2, 'Politics', True),
    (5, int(datetime.datetime(2010, 11, 12, 21, 13, 20).timestamp()), "scroll", 3, 'Politics', True),
    (6, int(datetime.datetime(2017, 3, 15, 12, 0, 0).timestamp()), "visit", 56, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 15, 12, 1, 0).timestamp()), "scroll", 56, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 15, 12, 21, 0).timestamp()), "click", 56, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 16, 12, 0, 0).timestamp()), "visit", 57, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 16, 12, 1, 0).timestamp()), "click", 57, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 16, 12, 1, 30).timestamp()), "click", 57, 'Tech', True),
    (6, int(datetime.datetime(2017, 3, 15, 12, 2, 0).timestamp()), "click", 57, 'Tech', True),
]

df = spark.createDataFrame(data=data, schema = schema)

df.select(*[i for i in df.columns if i != "timestamp"],
    F.from_unixtime("timestamp").alias("time")).show(15)

[Stage 1:>                                                          (0 + 1) / 1]

+---+------+-------+--------+-----+-------------------+
| id|  type|page_id|     tag| sign|               time|
+---+------+-------+--------+-----+-------------------+
|  1| visit|    100|   Sport|false|2022-06-07 12:34:55|
|  1| click|    100|    Tech|false|2022-06-07 12:35:45|
|  1| click|    100|    Tech|false|2022-06-07 12:36:35|
|  1|  move|    101|    Tech|false|2022-06-07 12:37:25|
|  1|  move|    102|Politics|false|2022-06-07 12:38:15|
|  1| visit|    102|   Sport|false|2022-06-08 12:39:10|
|  1|  move|    110|Politics|false|2022-06-08 12:39:01|
|  2| visit|    103|   Sport| true|2022-06-05 10:15:05|
|  2|  move|    109|Politics| true|2022-06-05 10:15:23|
|  3| visit|    103|   Sport| true|2022-07-05 22:14:55|
|  3| click|    119|Politics| true|2022-07-05 22:15:05|
|  3|scroll|    112|   Sport| true|2022-07-05 22:16:15|
|  3|scroll|    112|    Tech| true|2022-07-05 22:17:05|
|  4| visit|     10|    Tech|false|2019-01-03 17:41:10|
|  4|scroll|     11|    Tech|false|2019-01-03 17

                                                                                

### d.1 Вывод топ-5 самых активных посетителей сайта

In [5]:
df.groupby("id")\
    .count()\
    .orderBy(F.desc("count"))\
    .show(5)



+---+-----+
| id|count|
+---+-----+
|  6|    7|
|  1|    7|
|  3|    4|
|  5|    4|
|  4|    2|
+---+-----+
only showing top 5 rows



                                                                                

### d.2 Подсчет процента посетителей, у которых есть ЛК

In [6]:
users_in = df.filter(df.sign == True)\
                .groupby("id")\
                .count()\
                .distinct()\
                .count()
users_out = df.filter(df.sign == False)\
                .groupby("id")\
                .count()\
                .distinct()\
                .count()
total_users = df.groupby("id")\
                .count()\
                .distinct()\
                .count()
print(f'Percentage of visitors with a personal account: {int(users_in*100./total_users)}%')
print(f'Percentage of visitors without a personal account: {int(users_out*100./total_users)}%')

                                                                                

Percentage of visitors with a personal account: 66%
Percentage of visitors without a personal account: 33%


                                                                                

### d.3 Вывод топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [7]:
df.filter(df.type == "click")\
    .groupby("page_id")\
    .count()\
    .orderBy("count", ascending = False)\
    .show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|     57|    3|
|    100|    2|
|    119|    1|
|     56|    1|
+-------+-----+



### d.4 Добавление столбца к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [8]:
df_with_range = df.select(*[i for i in df.columns])\
                .withColumn("time_range", F.floor(F.hour(F.from_unixtime("timestamp"))/4))
df_with_range.show()

+---+----------+------+-------+--------+-----+----------+
| id| timestamp|  type|page_id|     tag| sign|time_range|
+---+----------+------+-------+--------+-----+----------+
|  1|1654605295| visit|    100|   Sport|false|         3|
|  1|1654605345| click|    100|    Tech|false|         3|
|  1|1654605395| click|    100|    Tech|false|         3|
|  1|1654605445|  move|    101|    Tech|false|         3|
|  1|1654605495|  move|    102|Politics|false|         3|
|  1|1654691950| visit|    102|   Sport|false|         3|
|  1|1654691941|  move|    110|Politics|false|         3|
|  2|1654424105| visit|    103|   Sport| true|         2|
|  2|1654424123|  move|    109|Politics| true|         2|
|  3|1657059295| visit|    103|   Sport| true|         5|
|  3|1657059305| click|    119|Politics| true|         5|
|  3|1657059375|scroll|    112|   Sport| true|         5|
|  3|1657059425|scroll|    112|    Tech| true|         5|
|  4|1546537270| visit|     10|    Tech|false|         4|
|  4|154653731

### d.5 Вывод временного промежутка на основе предыдущего задания, в течение которого было больше всего активностей на сайте

In [9]:
df_tmp = df_with_range.groupby("time_range")\
             .count()\
             .withColumnRenamed("count","max_count")\
             .orderBy("max_count", ascending = False)
df_tmp.show()

# Простой вариант
print(df_tmp.head()[0])

# Сложный вариант
df_tmp.select("time_range").filter(df_tmp.max_count == df_tmp.agg({'max_count': 'max'}).head()[0]).show()

+----------+---------+
|time_range|max_count|
+----------+---------+
|         3|       14|
|         5|        8|
|         2|        2|
|         4|        2|
+----------+---------+

3
+----------+
|time_range|
+----------+
|         3|
+----------+



### d.6 Создание второго фрейма данных, который содержит информацию о ЛК посетителя сайта со следующим списком атрибутов:
1. Id – уникальный идентификатор личного кабинета
2. User_id – уникальный идентификатор посетителя
3. ФИО посетителя
4. Дату рождения посетителя 
5. Дата создания ЛК

#### Создание схемы

In [10]:
schema_lk = T.StructType([
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("birthday_date", T.DateType(), True),
                T.StructField("registration_date", T.DateType(), True)])

#### Создание датафрейма

In [29]:
data_lk = [
            (1, None, "Мышкин Кондратий Семёнович", datetime.datetime(1994, 1, 12), datetime.datetime(2022, 6, 6)),
            (2, 1, "Боброва Стефания Тимофеевна", datetime.datetime(2002, 2, 14), datetime.datetime(2022, 6, 4)),
            (3, 2, "Мишин Станислав Русланович", datetime.datetime(1985, 3, 16), datetime.datetime(2022, 7, 4)),
            (4, None, "Данилова Юлиана Романовна", datetime.datetime(1999, 4, 18), datetime.datetime(2019, 1, 2)),
            (5, 3, "Аксёнов Иван Борисович", datetime.datetime(2005, 5, 20), datetime.datetime(2010, 10, 11)),
            (6, 4, "Васильева Ева Анатольевна", datetime.datetime(1990, 6, 22), datetime.datetime(2017, 3, 14)),
]

df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

df_lk.show(truncate=False)

+-------+----+---------------------------+-------------+-----------------+
|user_id|id  |fio                        |birthday_date|registration_date|
+-------+----+---------------------------+-------------+-----------------+
|1      |null|Мышкин Кондратий Семёнович |1994-01-12   |2022-06-06       |
|2      |1   |Боброва Стефания Тимофеевна|2002-02-14   |2022-06-04       |
|3      |2   |Мишин Станислав Русланович |1985-03-16   |2022-07-04       |
|4      |null|Данилова Юлиана Романовна  |1999-04-18   |2019-01-02       |
|5      |3   |Аксёнов Иван Борисович     |2005-05-20   |2010-10-11       |
|6      |4   |Васильева Ева Анатольевна  |1990-06-22   |2017-03-14       |
+-------+----+---------------------------+-------------+-----------------+



### d.7 Вывод фамилий посетителей, которые читали хотя бы одну новость про спорт

In [42]:
@udf(returnType=T.StringType())
def get_surname(fio: str):
    return fio.split(" ")[0]

df_lk.join(\
           df.filter(df.tag == 'Sport')
           .groupby('id')
           .count(), df.id == df_lk.user_id, how='inner'
          )\
        .select(get_surname('fio')
                .alias("surname"))\
        .show(truncate=False)



+-------+
|surname|
+-------+
|Мышкин |
|Мишин  |
|Аксёнов|
|Боброва|
+-------+



                                                                                