In [3]:
import pyspark
from pyspark.sql.types import *
from typing import List
from pyspark.sql import SQLContext
from pyspark.context import SparkContext    
    

In [29]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

import pyspark.sql.types as T
import pyspark.sql.functions as F

from datetime import datetime

In [30]:
spark = SparkSession.\
        builder.\
        appName("Task3.3").\
        config("spark.driver.bindAddress", "localhost").\
        config("spark.executor.memory", "8024m").\
        config("spark.ui.port", "4040").\
        getOrCreate()   
print('Запущен Spark версии', spark.version)

Запущен Spark версии 3.3.1


In [31]:
# создаем схему для дата фрейма
schema = StructType([StructField('id', IntegerType(), True),
              StructField('timestamp', LongType(), True),
              StructField('type', StringType(), True),
              StructField('page_id', IntegerType(), True),
              StructField('tag', StringType(), True),
              StructField('sign', BooleanType(), True)                    
             ])

In [32]:
# список
data = [(1, 1627314601, "click", 1,"politic", True),
        (2, 1627314602, "scroll",2,"medicina", False),
        (1, 1627314603, "move",3,"sport",True),
        (1, 1627314604, "click",4,"sport",False),
        (2, 1627314605, "click", 5,"politic", True),
        (2, 1627314606, "scroll",6,"medicina", False),
        (1, 1627314607, "move",7,"sport",True),
        (3, 1627314608, "click",8,"sport",False),
        (3, 1627314609, "scroll",9,"medicina", False),
        (1, 1627314610, "move",10,"sport",True),
        (2, 16273146011, "click",11,"sport",True),
        (4, 16273146012, "click", 12,"politic", True)]
         

In [33]:
# создаем дата фрейм
df = spark.createDataFrame(data = data,schema = schema)
# print(df.schema)

In [34]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- type: string (nullable = true)
 |-- page_id: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- sign: boolean (nullable = true)



In [35]:
df.show(5)

+---+----------+------+-------+--------+-----+
| id| timestamp|  type|page_id|     tag| sign|
+---+----------+------+-------+--------+-----+
|  1|1627314601| click|      1| politic| true|
|  2|1627314602|scroll|      2|medicina|false|
|  1|1627314603|  move|      3|   sport| true|
|  1|1627314604| click|      4|   sport|false|
|  2|1627314605| click|      5| politic| true|
+---+----------+------+-------+--------+-----+
only showing top 5 rows



In [36]:
 #Вывести топ-5 самых активных посетителей сайта
df.groupby("id").count().orderBy("count",ascending=False).show(5)
    

+---+-----+
| id|count|
+---+-----+
|  1|    5|
|  2|    4|
|  3|    2|
|  4|    1|
+---+-----+



In [37]:
# Посчитать процент посетителей, у которых есть ЛК
percent = 100/(df.count()/df.filter(df.sign==True).count())
print(percent,'%')

58.333333333333336 %


In [38]:
# Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
df_click = df.filter(df.type=="click")
df_click.groupBy("page_id").count().orderBy("count",ascending=False).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|      1|    1|
|      4|    1|
|      5|    1|
|      8|    1|
|     11|    1|
+-------+-----+
only showing top 5 rows



In [39]:
#добавлеям новый столбец с нормальными датами
df = df.select(*[i for i in df.columns if i != "timestamp"],
                 F.from_unixtime("timestamp").alias("event_time"))

In [40]:
df.show(5)

+---+------+-------+--------+-----+-------------------+
| id|  type|page_id|     tag| sign|         event_time|
+---+------+-------+--------+-----+-------------------+
|  1| click|      1| politic| true|2021-07-26 22:50:01|
|  2|scroll|      2|medicina|false|2021-07-26 22:50:02|
|  1|  move|      3|   sport| true|2021-07-26 22:50:03|
|  1| click|      4|   sport|false|2021-07-26 22:50:04|
|  2| click|      5| politic| true|2021-07-26 22:50:05|
+---+------+-------+--------+-----+-------------------+
only showing top 5 rows



In [42]:
#Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток 
#с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)
dfTime = df.withColumn("new",F.floor(F.hour("event_time") / F.lit(4)))
dfTime.show(12)

+---+------+-------+--------+-----+-------------------+---+
| id|  type|page_id|     tag| sign|         event_time|new|
+---+------+-------+--------+-----+-------------------+---+
|  1| click|      1| politic| true|2021-07-26 22:50:01|  5|
|  2|scroll|      2|medicina|false|2021-07-26 22:50:02|  5|
|  1|  move|      3|   sport| true|2021-07-26 22:50:03|  5|
|  1| click|      4|   sport|false|2021-07-26 22:50:04|  5|
|  2| click|      5| politic| true|2021-07-26 22:50:05|  5|
|  2|scroll|      6|medicina|false|2021-07-26 22:50:06|  5|
|  1|  move|      7|   sport| true|2021-07-26 22:50:07|  5|
|  3| click|      8|   sport|false|2021-07-26 22:50:08|  5|
|  3|scroll|      9|medicina|false|2021-07-26 22:50:09|  5|
|  1|  move|     10|   sport| true|2021-07-26 22:50:10|  5|
|  2| click|     11|   sport| true|2485-09-03 21:20:11|  5|
|  4| click|     12| politic| true|2485-09-03 21:20:12|  5|
+---+------+-------+--------+-----+-------------------+---+



In [44]:
# Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.
# cгруппируем результаты таблицы предыдущего задания по значениям столбца new
df2 = dfTime.groupby("new")\
    .agg(F.count("*").alias("event_count"))\
    .orderBy("event_count",ascending=False)
df2.registerTempTable("df_table")
spark.sql("SELECT new FROM df_table WHERE event_count = (SELECT MAX(event_count) FROM df_table)").show()



+---+
|new|
+---+
|  5|
+---+



In [None]:
#Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов
# 1. Id – уникальный идентификатор личного кабинета
# 2. User_id – уникальный идентификатор посетителя
# 3. ФИО посетителя
# 4. Дату рождения посетителя 
# 5. Дата создания ЛК



In [45]:
 #cтруктура таблицы личного кабинета
esquemaAreaPersonal = T.StructType([
    T.StructField("id", T.IntegerType(),True),
    T.StructField("user_id", T.IntegerType(),True),
    T.StructField("fio", T.StringType(),True),
    T.StructField("dbd", T.DateType(),True),
    T.StructField("dpa", T.DateType(),True),
])

In [48]:
# заполним данными
from datetime import datetime
data_ap = [
    (101,1,"Иванов Иван Иванович",datetime.strptime("2002-06-06", "%Y-%m-%d"),datetime.strptime("2021-07-26", "%Y-%m-%d")),
    (102,2,"Петров Петр Петович",datetime.strptime("2003-07-07", "%Y-%m-%d"),datetime.strptime("2021-07-26", "%Y-%m-%d")),
    (105,3,"Сидоров Сидр Сидорович",datetime.strptime("2004-08-08", "%Y-%m-%d"),datetime.strptime("2021-11-23", "%Y-%m-%d"))
]
#Создаем дата фрейм
df_ap = spark.createDataFrame(data = data_ap, schema = esquemaAreaPersonal)

In [49]:
# таблица по личным кабинетам
df_ap.show()

+---+-------+--------------------+----------+----------+
| id|user_id|                 fio|       dbd|       dpa|
+---+-------+--------------------+----------+----------+
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|
|102|      2| Петров Петр Петович|2003-07-07|2021-07-26|
|105|      3|Сидоров Сидр Сидо...|2004-08-08|2021-11-23|
+---+-------+--------------------+----------+----------+



In [50]:
 #объединенная таблица
df_main = df_ap.alias("lk").join(dfTime.alias("web"),
                               on = [F.col("lk.user_id")==F.col("web.id")],
                               how = "left")

In [51]:
df_main.show()

+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+---+
| id|user_id|                 fio|       dbd|       dpa| id|  type|page_id|     tag| sign|         event_time|new|
+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+---+
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|  1|  move|     10|   sport| true|2021-07-26 22:50:10|  5|
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|  1|  move|      7|   sport| true|2021-07-26 22:50:07|  5|
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|  1| click|      4|   sport|false|2021-07-26 22:50:04|  5|
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|  1|  move|      3|   sport| true|2021-07-26 22:50:03|  5|
|101|      1|Иванов Иван Иванович|2002-06-06|2021-07-26|  1| click|      1| politic| true|2021-07-26 22:50:01|  5|
|102|      2| Петров Петр Петович|2003-07-07|2021-07-26|  2| click|     11|   sp

In [53]:
#Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.
df_main.registerTempTable("df_table")
spark.sql("SELECT DISTINCT fio FROM df_table WHERE tag = 'sport' AND type = 'move'").show()

+--------------------+
|                 fio|
+--------------------+
|Иванов Иван Иванович|
+--------------------+

