In [186]:
# импорт библиотек,модулей

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pyspark.sql.types as T
import pyspark.sql.functions as F
from datetime import datetime
conf = SparkConf()
conf.setMaster("local").setAppName('work')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [187]:
# cоздаем схему будущего фрейма данных
    
sxema = T.StructType([
    T.StructField("id", T.IntegerType(),False),
    T.StructField("timestamp", T.LongType(),True),
    T.StructField("type", T.StringType(),True),
    T.StructField("page_id", T.IntegerType(),True),
    T.StructField("tag", T.StringType(),True),
    T.StructField("sign", T.BooleanType(),True)
    ])

In [188]:
# наполняем датафрейм данными

data     =[(1, 1637627426, "click", 101, 'Sport', False),
           (2, 1637621465, "visit", 105, 'Science', True),
           (2, 1637621650, "scroll", 102, 'Sport', True),
           (4, 1637621257, "visit", 101, 'Culture', False),
           (2, 1637621345, "click", 106, 'Politics', True),
           (1, 1637622346, "click", 103, 'Sport', True),
           (1, 1637625623, "visit", 106, 'Culture', True),
           (4, 1637621874, "click", 102, 'Politics', False),
           (4, 1637623475, "click", 101, 'Culture', True),
           (3, 1638625692, "visit", 104, 'Science', True),
           (3, 1638621793, "click", 102, 'Business', False),
           (3, 1637622463, "visit", 105, 'Science', True),
           (1, 1637623426, "scroll", 101, 'Business', False),
           (4, 1637621334, "visit", 105, 'Sport', True),
           (1, 1637822645, "visit", 101, 'Sport', False),
           (4, 1637626321, "visit", 105, 'Business', False),
           (1, 1637627325, "scroll", 102, 'Sport', True),
           (2, 1637621434, "click", 106, 'Sport', True),
           (4, 1637627353, "visit", 101, 'Science', True),
           (4, 1637629348, "visit", 105, 'Politics', True),
           (5, 1637664479, "scroll", 101, 'Politics', True),
           (5, 1638668932, "visit", 101, 'Politics', True),
           (1, 1637822645, "visit", 104, 'Sport', False),
           (5, 1637621543, "click", 104, 'Business', False),
           (2, 1637628293, "click", 102, 'Science', True),
           (3, 1637624182, "scroll", 102, 'Culture', True),
           (2, 1637629356, "visit", 104, 'Science', False),
           (4, 1637622248, "click", 105, 'Politics', False),
           (5, 1637673457, "scroll", 106, 'Business', False),
           (5, 1638602456, "scroll", 102, 'Business', True)
          ]

df = spark.createDataFrame(data=data,schema=sxema)

In [189]:
df = df.select(*[i for i in df.columns if i != "timestamp"], 
               F.from_unixtime("timestamp").alias("event_time"))

In [190]:
 df.show(50)

+---+------+-------+--------+-----+-------------------+
| id|  type|page_id|     tag| sign|         event_time|
+---+------+-------+--------+-----+-------------------+
|  1| click|    101|   Sport|false|2021-11-23 00:30:26|
|  2| visit|    105| Science| true|2021-11-22 22:51:05|
|  2|scroll|    102|   Sport| true|2021-11-22 22:54:10|
|  4| visit|    101| Culture|false|2021-11-22 22:47:37|
|  2| click|    106|Politics| true|2021-11-22 22:49:05|
|  1| click|    103|   Sport| true|2021-11-22 23:05:46|
|  1| visit|    106| Culture| true|2021-11-23 00:00:23|
|  4| click|    102|Politics|false|2021-11-22 22:57:54|
|  4| click|    101| Culture| true|2021-11-22 23:24:35|
|  3| visit|    104| Science| true|2021-12-04 13:48:12|
|  3| click|    102|Business|false|2021-12-04 12:43:13|
|  3| visit|    105| Science| true|2021-11-22 23:07:43|
|  1|scroll|    101|Business|false|2021-11-22 23:23:46|
|  4| visit|    105|   Sport| true|2021-11-22 22:48:54|
|  1| visit|    101|   Sport|false|2021-11-25 06

In [191]:
# выводим топ-5 самых активных посетителей сайта

df.groupby("id").count().orderBy("count",ascending=False).show(5)

+---+-----+
| id|count|
+---+-----+
|  4|    8|
|  1|    7|
|  2|    6|
|  5|    5|
|  3|    4|
+---+-----+



In [192]:
# посчитаем процент посетителей, у которых есть ЛК

percent = 100/(df.count()/df.filter(df.sign==True).count())
print(percent,'%')

60.0 %


In [193]:
#  выводим топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

df_click = df.filter(df.type=="click")
df_click.groupBy("page_id").count().orderBy("count",ascending=False).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    102|    3|
|    101|    2|
|    106|    2|
|    103|    1|
|    105|    1|
+-------+-----+
only showing top 5 rows



In [194]:
# добавим столбец к фрейму данных со значением временного диапазона 

dfTime = df.withColumn("new",F.floor(F.hour("event_time") / F.lit(4)))
dfTime.show(50)

+---+------+-------+--------+-----+-------------------+---+
| id|  type|page_id|     tag| sign|         event_time|new|
+---+------+-------+--------+-----+-------------------+---+
|  1| click|    101|   Sport|false|2021-11-23 00:30:26|  0|
|  2| visit|    105| Science| true|2021-11-22 22:51:05|  5|
|  2|scroll|    102|   Sport| true|2021-11-22 22:54:10|  5|
|  4| visit|    101| Culture|false|2021-11-22 22:47:37|  5|
|  2| click|    106|Politics| true|2021-11-22 22:49:05|  5|
|  1| click|    103|   Sport| true|2021-11-22 23:05:46|  5|
|  1| visit|    106| Culture| true|2021-11-23 00:00:23|  0|
|  4| click|    102|Politics|false|2021-11-22 22:57:54|  5|
|  4| click|    101| Culture| true|2021-11-22 23:24:35|  5|
|  3| visit|    104| Science| true|2021-12-04 13:48:12|  3|
|  3| click|    102|Business|false|2021-12-04 12:43:13|  3|
|  3| visit|    105| Science| true|2021-11-22 23:07:43|  5|
|  1|scroll|    101|Business|false|2021-11-22 23:23:46|  5|
|  4| visit|    105|   Sport| true|2021-

In [195]:
# выводим временной промежуток в течение которого было больше всего активностей на сайте

df2 = dfTime.groupby("new").agg(F.count("*").alias("event_count")).orderBy("event_count",ascending=False)

In [196]:
df2.registerTempTable("df_table")
spark.sql("SELECT new FROM df_table WHERE event_count = (SELECT MAX(event_count) FROM df_table)").show()

+---+
|new|
+---+
|  5|
+---+



In [197]:
# cоздаем схему второго фрейма данных с информацией личного кабинета

LKsxema = T.StructType([
    T.StructField("id", T.IntegerType(),True),
    T.StructField("user_id", T.IntegerType(),True),
    T.StructField("fio", T.StringType(),True),
    T.StructField("bday", T.DateType(),True),
    T.StructField("crday", T.DateType(),True),
])

from datetime import datetime

In [198]:
# заполним данными второй фрейм

data_2 = [
    (101,1,"Иванов Иван Иванович",datetime.strptime("1999-09-18", "%Y-%m-%d"),datetime.strptime("2022-06-02", "%Y-%m-%d")),
    (102,4,"Симонова Надежда Юрьевна",datetime.strptime("1995-09-22", "%Y-%m-%d"),datetime.strptime("2022-12-30", "%Y-%m-%d")),
    (105,5,"Петрович Олег Генадьевич",datetime.strptime("2007-10-16", "%Y-%m-%d"),datetime.strptime("2023-01-03", "%Y-%m-%d")),
    (103,2,"Терешкова Надежда Борисовна",datetime.strptime("2010-11-08", "%Y-%m-%d"),datetime.strptime("2021-05-19", "%Y-%m-%d")),
    (104,3,"Пушкин Анатолий Аркадьевич",datetime.strptime("2020-07-07", "%Y-%m-%d"),datetime.strptime("2020-09-23", "%Y-%m-%d"))
]

df_2 = spark.createDataFrame(data = data_2, schema = LKsxema)
df_2.show()

+---+-------+--------------------+----------+----------+
| id|user_id|                 fio|      bday|     crday|
+---+-------+--------------------+----------+----------+
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|
|102|      4|Симонова Надежда ...|1995-09-22|2022-12-30|
|105|      5|Петрович Олег Ген...|2007-10-16|2023-01-03|
|103|      2|Терешкова Надежда...|2010-11-08|2021-05-19|
|104|      3|Пушкин Анатолий А...|2020-07-07|2020-09-23|
+---+-------+--------------------+----------+----------+



In [199]:
# объединим таблицы

df_all = df_2.alias("lk").join(dfTime.alias("web"),on = [F.col("lk.user_id")==F.col("web.id")],how = "left")
df_all.show(100)

+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+---+
| id|user_id|                 fio|      bday|     crday| id|  type|page_id|     tag| sign|         event_time|new|
+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+---+
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1| visit|    104|   Sport|false|2021-11-25 06:44:05|  1|
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1|scroll|    102|   Sport| true|2021-11-23 00:28:45|  0|
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1| visit|    101|   Sport|false|2021-11-25 06:44:05|  1|
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1|scroll|    101|Business|false|2021-11-22 23:23:46|  5|
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1| visit|    106| Culture| true|2021-11-23 00:00:23|  0|
|101|      1|Иванов Иван Иванович|1999-09-18|2022-06-02|  1| click|    103|   Sp

In [200]:
# выводим фамилии посетителей, которые читали хотя бы одну новость про спорт

df_all.registerTempTable("df_table")
spark.sql("SELECT DISTINCT fio FROM df_table WHERE tag = 'Sport' AND type = 'visit'").show()

+--------------------+
|                 fio|
+--------------------+
|Симонова Надежда ...|
|Иванов Иван Иванович|
+--------------------+

