In [165]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import to_timestamp,col
from pyspark.sql.functions import when
import datetime

In [2]:
spark = SparkSession.builder.master("local").\
                            appName("Word Count").\
                            config("spark.driver.bindAddress","localhost").\
                            config("spark.ui.port", "4040").\
                            getOrCreate()

22/11/30 19:13:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
schema = T.StructType([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("tag", T.StringType(), True),
    T.StructField("sign", T.BooleanType(), True) 
])

In [182]:
data = [(12345, 1667627426, "click", 101, 'Sport', True),
       (12345, 1667629826, "visit", 204, 'Politic', True),
       (12378, 1646741426, "move", 504, 'Medicine', False),
       (13547, 1646741426, 'scroll', 103, 'Econimics', True),
       (13547, 1698741426, 'scroll', 403, 'Sport', True),
       (12888, 1691741426, "click", 101, 'Medicine', False),
       (12146, 1697829826, "move", 414, 'Politic', False),
       (13888, 1664741426, 'scroll', 503, 'Econimics', True),
       (13888, 1691745926, "click", 101, 'Medicine', True),
       (12888, 1691741316, "move", 601, 'Autonews', True),
       (12887, 1691641316, "click", 601, 'Sport', True),
       (13547, 1698742426, 'scroll', 504, 'Politic', True),
       (12146, 1697829416, "visit", 104, 'Science', False),
       (12114, 1697835416, "move", 204, 'Sport', True),
       (12114, 1694435416, "scroll", 104, 'Sport', True)       
        ]

df = spark.createDataFrame(data=data, schema = schema)

In [183]:
df.show()

+-----+----------+------+-------+---------+-----+
|   id| timestamp|  type|page_id|      tag| sign|
+-----+----------+------+-------+---------+-----+
|12345|1667627426| click|    101|    Sport| true|
|12345|1667629826| visit|    204|  Politic| true|
|12378|1646741426|  move|    504| Medicine|false|
|13547|1646741426|scroll|    103|Econimics| true|
|13547|1698741426|scroll|    403|    Sport| true|
|12888|1691741426| click|    101| Medicine|false|
|12146|1697829826|  move|    414|  Politic|false|
|13888|1664741426|scroll|    503|Econimics| true|
|13888|1691745926| click|    101| Medicine| true|
|12888|1691741316|  move|    601| Autonews| true|
|12887|1691641316| click|    601|    Sport| true|
|13547|1698742426|scroll|    504|  Politic| true|
|12146|1697829416| visit|    104|  Science|false|
|12114|1697835416|  move|    204|    Sport| true|
|12114|1694435416|scroll|    104|    Sport| true|
+-----+----------+------+-------+---------+-----+



In [184]:
df_new = df.withColumn("timestamp_2", from_unixtime(col("timestamp")))

In [185]:
df_new.show()

+-----+----------+------+-------+---------+-----+-------------------+
|   id| timestamp|  type|page_id|      tag| sign|        timestamp_2|
+-----+----------+------+-------+---------+-----+-------------------+
|12345|1667627426| click|    101|    Sport| true|2022-11-05 10:50:26|
|12345|1667629826| visit|    204|  Politic| true|2022-11-05 11:30:26|
|12378|1646741426|  move|    504| Medicine|false|2022-03-08 17:10:26|
|13547|1646741426|scroll|    103|Econimics| true|2022-03-08 17:10:26|
|13547|1698741426|scroll|    403|    Sport| true|2023-10-31 13:37:06|
|12888|1691741426| click|    101| Medicine|false|2023-08-11 13:10:26|
|12146|1697829826|  move|    414|  Politic|false|2023-10-21 00:23:46|
|13888|1664741426|scroll|    503|Econimics| true|2022-10-03 01:10:26|
|13888|1691745926| click|    101| Medicine| true|2023-08-11 14:25:26|
|12888|1691741316|  move|    601| Autonews| true|2023-08-11 13:08:36|
|12887|1691641316| click|    601|    Sport| true|2023-08-10 09:21:56|
|13547|1698742426|sc

Вывести топ-5 самых активных посетителей сайта

In [186]:
(df_new
    .groupby(df_new.id)
    .count()
    .orderBy('count', ascending=False)
    .show(5))

+-----+-----+
|   id|count|
+-----+-----+
|13547|    3|
|12345|    2|
|12888|    2|
|12146|    2|
|13888|    2|
+-----+-----+
only showing top 5 rows



Посчитать процент посетителей, у которых есть ЛК

In [187]:
sign_true = (df_new
                .filter(col('sign')== True)
                .count())
sign_total = df_new.count()
print(f"Доля зарегистрированных пользователей: {sign_true/sign_total * 100}%")

Доля зарегистрированных пользователей: 73.33333333333333%


In [188]:
sign_total

15

Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [189]:
(df_new
    .groupby(df_new.page_id)
    .count()
    .orderBy('count', ascending=False)
    .show(5))

+-------+-----+
|page_id|count|
+-------+-----+
|    101|    3|
|    601|    2|
|    504|    2|
|    104|    2|
|    204|    2|
+-------+-----+
only showing top 5 rows



Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [190]:
df_new = df_new.withColumn('hour', F.hour('timestamp_2'))
df_new.show()

+-----+----------+------+-------+---------+-----+-------------------+----+
|   id| timestamp|  type|page_id|      tag| sign|        timestamp_2|hour|
+-----+----------+------+-------+---------+-----+-------------------+----+
|12345|1667627426| click|    101|    Sport| true|2022-11-05 10:50:26|  10|
|12345|1667629826| visit|    204|  Politic| true|2022-11-05 11:30:26|  11|
|12378|1646741426|  move|    504| Medicine|false|2022-03-08 17:10:26|  17|
|13547|1646741426|scroll|    103|Econimics| true|2022-03-08 17:10:26|  17|
|13547|1698741426|scroll|    403|    Sport| true|2023-10-31 13:37:06|  13|
|12888|1691741426| click|    101| Medicine|false|2023-08-11 13:10:26|  13|
|12146|1697829826|  move|    414|  Politic|false|2023-10-21 00:23:46|   0|
|13888|1664741426|scroll|    503|Econimics| true|2022-10-03 01:10:26|   1|
|13888|1691745926| click|    101| Medicine| true|2023-08-11 14:25:26|  14|
|12888|1691741316|  move|    601| Autonews| true|2023-08-11 13:08:36|  13|
|12887|1691641316| click|

In [191]:
df_new = df_new.withColumn("time_interval",
                          when((col('hour') >= 0) & (col('hour') < 4), '0-4')
                          .when((col('hour') >= 4) & (col('hour') < 8), '4-8')
                          .when((col('hour') >= 8) & (col('hour') < 12), '8-12')
                          .when((col('hour') >= 12) & (col('hour') < 16), '12-16')
                          .when((col('hour') >= 16) & (col('hour') < 20), '16-20')
                          .when((col('hour') >= 20) & (col('hour') < 24), '20-24'))   


In [192]:
df_new.show()

+-----+----------+------+-------+---------+-----+-------------------+----+-------------+
|   id| timestamp|  type|page_id|      tag| sign|        timestamp_2|hour|time_interval|
+-----+----------+------+-------+---------+-----+-------------------+----+-------------+
|12345|1667627426| click|    101|    Sport| true|2022-11-05 10:50:26|  10|         8-12|
|12345|1667629826| visit|    204|  Politic| true|2022-11-05 11:30:26|  11|         8-12|
|12378|1646741426|  move|    504| Medicine|false|2022-03-08 17:10:26|  17|        16-20|
|13547|1646741426|scroll|    103|Econimics| true|2022-03-08 17:10:26|  17|        16-20|
|13547|1698741426|scroll|    403|    Sport| true|2023-10-31 13:37:06|  13|        12-16|
|12888|1691741426| click|    101| Medicine|false|2023-08-11 13:10:26|  13|        12-16|
|12146|1697829826|  move|    414|  Politic|false|2023-10-21 00:23:46|   0|          0-4|
|13888|1664741426|scroll|    503|Econimics| true|2022-10-03 01:10:26|   1|          0-4|
|13888|1691745926| cl

Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [193]:
(df_new
    .groupby(df_new.time_interval)
    .count()
    .orderBy('count', ascending=False)
    .show(1))

+-------------+-----+
|time_interval|count|
+-------------+-----+
|        12-16|    5|
+-------------+-----+
only showing top 1 row



Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

In [194]:
schema_account = T.StructType([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("User_id", T.IntegerType(), True),
    T.StructField("lastname", T.StringType(), True),
    T.StructField("firstname", T.StringType(), True),
    T.StructField("middlename", T.StringType(), True),
    T.StructField("dob", T.DateType(), True),
    T.StructField("date_reg", T.DateType(), True)
])

In [195]:
data_account = [(55671, 12345, 'Иванов', 'Иван', 'Иванович', datetime.datetime.strptime('1990-11-01', "%Y-%m-%d"), datetime.datetime.strptime('2017-01-12', "%Y-%m-%d")), 
                (55987, 13547, 'Петров', 'Артем', 'Янович', datetime.datetime.strptime('1992-10-11', "%Y-%m-%d"), datetime.datetime.strptime('2018-06-12', "%Y-%m-%d")),
                (56987, 13888, 'Клюева', 'Валерия', 'Петровна', datetime.datetime.strptime('1993-12-05', "%Y-%m-%d"), datetime.datetime.strptime('2019-02-22', "%Y-%m-%d")),
                (56077, 12887, 'Иванова', 'Елена', 'Алексеевна', datetime.datetime.strptime('1968-10-25', "%Y-%m-%d"), datetime.datetime.strptime('2019-01-01', "%Y-%m-%d")),
                (57004, 12114, 'Дуров', 'Олег', 'Викторович', datetime.datetime.strptime('1985-04-11', "%Y-%m-%d"), datetime.datetime.strptime('2020-04-26', "%Y-%m-%d"))
               ]
df_account = spark.createDataFrame(data=data_account, schema = schema_account)

In [196]:
df_account.show()

+-----+-------+--------+---------+----------+----------+----------+
|   id|User_id|lastname|firstname|middlename|       dob|  date_reg|
+-----+-------+--------+---------+----------+----------+----------+
|55671|  12345|  Иванов|     Иван|  Иванович|1990-11-01|2017-01-12|
|55987|  13547|  Петров|    Артем|    Янович|1992-10-11|2018-06-12|
|56987|  13888|  Клюева|  Валерия|  Петровна|1993-12-05|2019-02-22|
|56077|  12887| Иванова|    Елена|Алексеевна|1968-10-25|2019-01-01|
|57004|  12114|   Дуров|     Олег|Викторович|1985-04-11|2020-04-26|
+-----+-------+--------+---------+----------+----------+----------+



Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [206]:
(df.join(df_account, df.id == df_account.User_id, 'inner')
    .select(df_account.lastname)
    .filter(df.tag == "Sport")
    .distinct()
    .collect())

[Row(lastname='Петров'),
 Row(lastname='Иванова'),
 Row(lastname='Дуров'),
 Row(lastname='Иванов')]