In [171]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
import datetime
import time
import math
from pyspark.sql.window import Window

In [156]:
spark = SparkSession.builder.master("local") \
                    .appName("Homework") \
                    .config("spark.jars", "~/spark-3.3.1-bin-hadoop3/jars/postgresql-42.5.0.jar") \
                    .config("spark.driver.bindAddress", "localhost") \
                    .config("spark.ui.port", "4040") \
                    .getOrCreate()

In [157]:
schema_web = T.StructType([
                T.StructField("id", T.IntegerType(), True),
                T.StructField("timestamp", T.LongType(), True),    
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),                
                T.StructField("tag", T.ArrayType(T.StringType()), True),
                T.StructField("sign", T.BooleanType(), True)
])

schema_lk = T.StructType([
                T.StructField("id", T.IntegerType(), False),
                T.StructField("user_id", T.IntegerType(), False),    
                T.StructField("full_name", T.StringType(), True),
                T.StructField("birth_date", T.DateType(), True),
                T.StructField("create_date", T.DateType(), True)
])

In [158]:
# 2000-01-01...

data_web = [
    (1, 946683757, 'visit', 1, ['политика', 'спорт'], True),
    (1, 946687357, 'click', 2, ['спорт'], True),
    (3, 946784557, 'scroll', 3, ['медицина'], True),
    (4, 946975357, 'move', 3, ['медицина'], False)
]

data_lk = [
    (1, 1, 'Логинов Алексей Алексеевич', datetime.datetime(1990, 7, 5), datetime.datetime.strptime('1999-04-01', "%Y-%m-%d").date()),
    (2, 2, 'Калугин Максим Львович', datetime.datetime(1981, 3, 14),datetime.datetime.strptime('1999-05-19', "%Y-%m-%d").date()),
    (3, 3, 'Кудрявцева Варвара Денисовна', datetime.datetime(1982, 8, 15),datetime.datetime.strptime('1999-09-05', "%Y-%m-%d").date()),
    (4, 4, 'Титова Василиса Сергеевна', datetime.datetime(1985, 4, 25),datetime.datetime.strptime('1999-12-01', "%Y-%m-%d").date()),
    (5, 5, 'Смирнова Екатерина Львовна', datetime.datetime(1987, 4, 13),datetime.datetime.strptime('1999-01-01', "%Y-%m-%d").date())
]

#columns = ["id","timestamp","type","page_id","tag","sign"]
df_web = spark.createDataFrame(data = data_web, schema = schema_web)
df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [159]:
from SlavicNames.parse_fio import gender_parse

def get_gender(name):
    return gender_parse(name).get('gender')

getGenderUDF = F.udf(lambda h: get_gender(h))

df_lk = df_lk.withColumn('gender', getGenderUDF(F.col('full_name')))

In [160]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [161]:
df_web.printSchema()
df_lk.printSchema()

root
 |-- id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- page_id: integer (nullable = true)
 |-- tag: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sign: boolean (nullable = true)
 |-- event_time: string (nullable = true)

root
 |-- id: integer (nullable = false)
 |-- user_id: integer (nullable = false)
 |-- full_name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- create_date: date (nullable = true)
 |-- gender: string (nullable = true)



In [162]:
# Вывести топ-5 самых активных посетителей сайта

# tmp = df_web.groupBy("id") \
#         .count() \
#         .orderBy(F.col("count").desc())

tmp = df_web.groupBy("id") \
        .agg(F.count("*").alias("event_count")) \
        .orderBy("event_count", ascending = False)
    
tmp.show(5)

+---+-----------+
| id|event_count|
+---+-----------+
|  1|          2|
|  3|          1|
|  4|          1|
+---+-----------+



In [163]:
# Посчитать процент посетителей, у которых есть ЛК

df_web.filter(df_web.sign == True).count() / df_web.count() * 100

75.0

In [164]:
# Вывести топ-5 страниц сайта по показателю среднего кол-ва кликов на данной странице

tmp.groupBy("id") \
    .agg(F.avg("event_count").alias("avg")) \
    .orderBy(F.col("avg").desc()) \
    .show(5)

+---+---+
| id|avg|
+---+---+
|  1|2.0|
|  3|1.0|
|  4|1.0|
+---+---+



In [165]:
# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

# через udf
@F.udf(T.StringType())
def get_hour_window(hour):
    step = 4
    for x in range(0, 24, step):
        if hour < x:
            return f'{x-step}-{x}'
    return "20-0"

df_web = df_web.withColumn("hour", F.hour(F.col("event_time"))) \
        .withColumn("hour_window", get_hour_window(F.col("hour"))) \
        .select(*[i for i in df_web.columns if i != "hour"], "hour_window")

df_web.show()

# без udf можно типа F.when(F.col("hour").between(0, 4), "0-4").otherwise("")


+---+------+-------+-----------------+-----+-------------------+-----------+
| id|  type|page_id|              tag| sign|         event_time|hour_window|
+---+------+-------+-----------------+-----+-------------------+-----------+
|  1| visit|      1|[политика, спорт]| true|2000-01-01 02:42:37|        0-4|
|  1| click|      2|          [спорт]| true|2000-01-01 03:42:37|        0-4|
|  3|scroll|      3|       [медицина]| true|2000-01-02 06:42:37|        4-8|
|  4|  move|      3|       [медицина]|false|2000-01-04 11:42:37|       8-12|
+---+------+-------+-----------------+-----+-------------------+-----------+



In [166]:
# Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

df_web \
    .groupBy('hour_window') \
    .agg(F.count('hour_window').alias("cnt")) \
    .orderBy(F.col("cnt").desc()) \
    .show(1)

+-----------+---+
|hour_window|cnt|
+-----------+---+
|        0-4|  2|
+-----------+---+
only showing top 1 row



In [167]:
# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

(
    df_web
    .filter(F.array_contains(F.col("tag"), "спорт"))
    .join(df_lk, df_web.id == df_lk.user_id, 'inner')
    .select(df_lk.full_name)
    .distinct()
    .show(truncate = False)
)

+--------------------------+
|full_name                 |
+--------------------------+
|Логинов Алексей Алексеевич|
+--------------------------+



In [168]:
# Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения.

df_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "inner")

total = df_all.count()
 
(
    df_all
    .filter(F.col('type') == 'visit')
    .withColumn('diff', F.datediff(F.col("event_time"), F.col("create_date")))
    .select('diff', F.col("event_time").alias('visit_date'), "create_date")
    .orderBy(F.col("diff").desc())
    .show(math.ceil(total * 10 / 100))
)


+----+-------------------+-----------+
|diff|         visit_date|create_date|
+----+-------------------+-----------+
| 275|2000-01-01 02:42:37| 1999-04-01|
+----+-------------------+-----------+



In [178]:
# Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины.

(
    df_all
        .select("page_id", "gender")
        .groupBy("page_id", "gender")
        .count()
        .withColumn("max_cnt", F.max(F.col("count")).over(Window.partitionBy("gender")))
        .filter("count = max_cnt")
        .dropDuplicates(["gender"])
        .select("gender", "page_id")
        .show(100)
)

+------+-------+
|gender|page_id|
+------+-------+
|     f|      3|
|     m|      2|
+------+-------+



In [133]:
# df.show(), df2.show()

In [None]:
url = "jdbc:postgresql://localhost:5432/postgres"
creds = {"user": "postgres", "password": "myPassword"}

df_web = spark.read.jdbc(url, "de_sprint.actions", properties = creds)
df_lk = spark.read.jdbc(url, "de_sprint.profiles", properties = creds)

df_web.printSchema()
df_lk.printSchema()

In [None]:
# Создайте витрину данных в Postgres со следующим содержанием
#  1. Id посетителя
#  2. Возраст посетителя
#  3. Пол посетителя (постарайтесь описать логику вычисления пола в отдельной пользовательской функции)
#  4. Любимая тематика новостей
#  5. Любимый временной диапазон посещений
#  6. Id личного кабинета
#  7. Разница в днях между созданием ЛК и датой последнего посещения. (-1 если ЛК нет)
#  8. Общее кол-во посещений сайта
#  9. Средняя длина сессии(сессией считаем временной промежуток, который охватывает последовательность событий, которые происходили подряд с разницей не более 5 минут).
# 10. Среднее кол-во активностей в рамках одной сессии

df_web.select("id").distinct().show()