In [66]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [43]:
customSchema_logs = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("timestamp", T.LongType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

In [49]:
data_logs =[(1, 1637840253, "visit", 1, 'Sport', False),
           (1, 1623427256, "move", 1, 'Sport', False),
           (1, 1629471642, "click", 1, 'Sport', False),
           (1, 1616562267, "visit", 2, 'Medicin', False),
           (1, 1638941987, "click", 2, 'Politics', False),
           (1, 1619160500, "move", 3, 'Sport', False),
           (2, 1634943260, "visit", 4, 'Politics', True),
           (2, 1626103180, "scroll", 4, 'Medicin', True),
           (2, 1618119021, "click", 4, 'Politics', True),
           (2, 1626071935, "visit", 5, 'Business', True),
           (2, 1620385526, "click", 5, 'Medicin', True),
           (2, 1636887856, "visit", 6, 'Medicin', True),
           (2, 1612165677, "move", 6, 'Business', True),
           (3, 1626960036, "visit", 1, 'Sport', False),
           (3, 1638007179, "scroll", 1, 'Sport', False),
           (3, 1617209174, "visit", 1, 'Sport', False),
           (3, 1629474919, "scroll", 1, 'Sport', False),
           (3, 1626960036, "visit", 1, 'Sport', False),
           (3, 1629863841, "scroll", 1, 'Sport', False),
           (4, 1611609527, "visit", 6, 'Business', False),
           (5, 1631865594, "move", 1, 'Sport', True),
           (5, 1636070942, "move", 1, 'Sport', True),
           (5, 1640307636, "visit", 2, 'Medicin', True),
           (5, 1618866891, "click", 2, 'Politics', True)]

df_logs = spark.createDataFrame(data = data_logs, schema = customSchema_logs)

In [50]:
df_logs.show(5,False)

                                                                                

+---+----------+-----+-------+--------+-----+
|id |timestamp |type |page_id|tag     |sign |
+---+----------+-----+-------+--------+-----+
|1  |1637840253|visit|1      |Sport   |false|
|1  |1623427256|move |1      |Sport   |false|
|1  |1629471642|click|1      |Sport   |false|
|1  |1616562267|visit|2      |Medicin |false|
|1  |1638941987|click|2      |Politics|false|
+---+----------+-----+-------+--------+-----+
only showing top 5 rows



### Вывести топ-5 самых активных посетителей сайта

In [51]:
df_logs.groupby("id").agg(F.count("*").alias("event_cnt"))\
      .orderBy("event_cnt", ascending = False)\
      .show()

[Stage 47:>                                                         (0 + 1) / 1]

+---+---------+
| id|event_cnt|
+---+---------+
|  2|        7|
|  3|        6|
|  1|        6|
|  5|        4|
|  4|        1|
+---+---------+



                                                                                

### Посчитать процент посетителей, у которых есть ЛК

In [63]:
total_count = df_logs.count()
df_web_sign = df_logs.filter((F.col('sign')=='True')).count()
df_true_percent = df_web_sign*100/total_count

print("ALL COUNT: " + str(total_count))
print("RECORD TRUE: " + str(df_web_sign))
print("RECORD TRUE PERCENT: " + str(df_true_percent) + "%")



ALL COUNT: 24
RECORD TRUE: 11
RECORD TRUE PERCENT: 45.833333333333336%


                                                                                

### Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [68]:
windowSpec  = Window.partitionBy("page_id")

df_click = df_logs.filter((F.col('type')=='click')).groupby("page_id").agg(F.count("*").alias("event_cnt"))\
      .orderBy("event_cnt", ascending = False)\
      .show()


[Stage 125:>                                                        (0 + 1) / 1]

+-------+---------+
|page_id|event_cnt|
+-------+---------+
|      2|        2|
|      4|        1|
|      5|        1|
|      1|        1|
+-------+---------+



                                                                                

### Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [69]:
df_logs = df_logs.select(*[i for i in df_logs.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [74]:
df_period = df_logs.withColumn("period", F.floor(F.hour("event_time") / F.lit(4)))
df_period.show(10)

                                                                                

+---+------+-------+--------+-----+-------------------+------+
| id|  type|page_id|     tag| sign|         event_time|period|
+---+------+-------+--------+-----+-------------------+------+
|  1| visit|      1|   Sport|false|2021-11-25 14:37:33|     3|
|  1|  move|      1|   Sport|false|2021-06-11 19:00:56|     4|
|  1| click|      1|   Sport|false|2021-08-20 18:00:42|     4|
|  1| visit|      2| Medicin|false|2021-03-24 08:04:27|     2|
|  1| click|      2|Politics|false|2021-12-08 08:39:47|     2|
|  1|  move|      3|   Sport|false|2021-04-23 09:48:20|     2|
|  2| visit|      4|Politics| true|2021-10-23 01:54:20|     0|
|  2|scroll|      4| Medicin| true|2021-07-12 18:19:40|     4|
|  2| click|      4|Politics| true|2021-04-11 08:30:21|     2|
|  2| visit|      5|Business| true|2021-07-12 09:38:55|     2|
+---+------+-------+--------+-----+-------------------+------+
only showing top 10 rows



###  Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [76]:
df_period.groupby("period")\
      .agg(F.count("*").alias("period_count"))\
      .orderBy("period_count", ascending = False)\
      .limit(1).show()



+------+------------+
|period|period_count|
+------+------------+
|     2|           7|
+------+------------+



                                                                                

### Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта

In [77]:
customSchema_lk = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("dob", T.DateType(), True),
                T.StructField("doc", T.DateType(), True)])

In [80]:
from datetime import datetime
data_lk = [
    (1, 3, "Иванов Иван Иванович", datetime(2000, 6, 5), datetime(2016, 8, 1)),
    (2, 2, "Петров Петр Петрович", datetime(1996, 1, 12), datetime(2017, 10, 7)),
    (3, 4, "Кириллов Кирилл Киррилович", datetime(1992, 4, 25), datetime(2019, 11, 8)),
    (4, 5, "Михайлов Михаил Михайлович", datetime(1989, 11, 22), datetime(2020, 12, 17)),
    (5, 6, "Сидоров Сидор Сидорович", datetime(1654, 3, 13), datetime(2021, 6, 2)),
    (6, 1, "Николаев Николай Николаевич", datetime(1987, 7, 7), datetime(2016, 10, 15)),
]

df_lk = spark.createDataFrame(data = data_lk, schema = customSchema_lk)

### Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [83]:
df_Stg = df_lk.alias("lk").join(df_logs.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")

df_sport = df_Stg.select("fio", "tag")\
                      .filter((F.col('tag')=='Sport'))\
                      .select("fio")\
                      .dropDuplicates(["fio"])
df_sport.show(truncate=False)



+---------------------------+
|fio                        |
+---------------------------+
|Николаев Николай Николаевич|
|Иванов Иван Иванович       |
|Михайлов Михаил Михайлович |
+---------------------------+



                                                                                