### Импорт библиотек

In [126]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

import time
import names
import random
import datetime

### Анализ данных clickstream пользователей новостного Интернет-портала.

Создайте схему будущего фрейма данных. Схема должна включать следующие атрибуты:

* `id` -  уникальный идентификатор посетителя сайта. Тип – последовательность чисел фиксированной длины. Данное поле не является первичным ключом.
* `timestamp` – дата и время события в формате unix timestamp.
* `type` – тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).
* `page_id` – id текущей страницы. Тип - последовательность чисел фиксированной длины.
* `tag` – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.
* `sign` – наличие у пользователя личного кабинета. Значения – True/False

In [127]:
# Create schema.
schema = (
    T.StructType()
    .add('id', T.IntegerType(), False)
    .add('timestamp', T.IntegerType(), False)
    .add('type', T.StringType(), False)
    .add('page_id', T.IntegerType(), False)
    .add('tag', T.StringType(), False)
    .add('sign', T.BooleanType(), False)
)

Создайте датафрейм с описанной выше схемой данных.

In [128]:
# Create Spark session.
spark = (
    SparkSession.builder
    .master('local')
    .appName('Wprd count')
    .config('spark.driver.bindAdress', 'localhost')
    .config('spark.ui.port', '4040')
    .getOrCreate()
    )

In [129]:
# Create empty DataFrame
df = spark.createDataFrame([], schema)

Наполните датафрейм данными.

In [130]:
def generate_users(min_id, max_id, user_count):
    """Creates new users"""
    uid = random.sample(range(min_id, max_id), user_count)
    result = [(x, random.choice([True, False])) for x in uid]
    return result

def generate_pages(min_id, max_id, page_count):
    """Creates new users"""
    pid = random.sample(range(min_id, max_id), page_count)
    result = [(x, random.choice(['Sport', 'Science', 'Politics', 'Economics', 'Crime', 'Education', 'Entertainment'])) for x in pid]
    return result

def randomDate(start, end):
    """Generate random date"""
    frmt = '%Y-%m-%d %H:%M:%S'
    stime = time.mktime(time.strptime(start, frmt))
    etime = time.mktime(time.strptime(end, frmt))
    ptime = stime + random.random() * (etime - stime)
    dt = datetime.datetime.fromtimestamp(time.mktime(time.localtime(ptime)))
    return dt

def generate_user_clicks(user, max_axtions, pages):
    """Generate click stream data"""
    uid, sign = user
    user_actions_count = random.randint(1, max_axtions)
    click_stream = []
    for i in range(user_actions_count):
        ts = int(
            datetime.datetime.timestamp(
                randomDate("2022-01-01 00:00:00", "2022-12-31 23:59:59")
            )
        )
        tp = random.choice(['visit', 'click', 'scroll', 'move'])
        pid = random.choice(pages)[0]
        tg = random.choice(pages)[1]
        click_stream.append((uid, ts, tp, pid, tg, sign))
    return click_stream

def generate_data(users, max_axtions, min_pid, max_pid, page_count):
    """Generate overall data"""
    result = []
    for user in users:
        user_click_stream = generate_user_clicks(user, max_axtions, generate_pages(min_pid, max_pid, page_count))
        result.extend(user_click_stream)
    return result

In [131]:
# Set random state.
random.seed(43)

In [142]:
# Generate data.
users = generate_users(10000, 99999, 200)
data = generate_data(users, max_axtions=500, min_pid=1, max_pid=1000, page_count=100)
append_data = spark.createDataFrame(data, schema=schema)
# Append generated data.
df = df.union(append_data)

 Решение задач:

In [143]:
# Вывести топ-5 самых активных посетителей сайта
df.filter(df.type == "visit").groupBy('id').count().orderBy(F.col('count').desc()).show(5)

+-----+-----+
|   id|count|
+-----+-----+
|42046|  146|
|98851|  141|
|27545|  134|
|77381|  131|
|98064|  130|
+-----+-----+
only showing top 5 rows



In [144]:
# Посчитать процент посетителей, у которых есть ЛК
100 * df.filter(df.sign == True).agg(F.countDistinct('id')).first()[0]/df.agg(F.countDistinct('id')).first()[0]

53.5

In [145]:
# Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
df.filter(df.type == "click").groupBy('page_id').count().orderBy(F.col('count').desc()).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    318|   46|
|    211|   46|
|    440|   45|
|    453|   45|
|    189|   44|
+-------+-----+
only showing top 5 rows



In [146]:
# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)
df_hour = df.withColumn(
    "hour", (
        F.hour(
            F.from_unixtime(
                F.col("timestamp").cast(T.StringType())
            )
        )/4
    )
)
df_bin = df_hour.withColumn(
    "time_bin",
    F.when((F.col("hour") == 6) | (F.col("hour") < 1), F.lit("0-4"))
    .when((F.col("hour") <= 1) | (F.col("hour") < 2), F.lit("4-8"))
    .when((F.col("hour") <= 2) | (F.col("hour") < 3), F.lit("8-12"))
    .when((F.col("hour") <= 3) | (F.col("hour") < 4), F.lit("12-16"))
    .when((F.col("hour") <= 4) | (F.col("hour") < 5), F.lit("16-20"))
    .when((F.col("hour") <= 5) | (F.col("hour") < 6), F.lit("20-24"))
).drop(F.col('hour'))

In [147]:
df_bin.show(5)

+-----+----------+------+-------+---------+-----+--------+
|   id| timestamp|  type|page_id|      tag| sign|time_bin|
+-----+----------+------+-------+---------+-----+--------+
|15053|1666832744| click|    857|    Crime|false|     4-8|
|15053|1643897679|  move|    785| Politics|false|   16-20|
|15053|1641397629|scroll|    460| Politics|false|   16-20|
|15053|1646056198| click|    614|Education|false|   16-20|
|15053|1659499727|  move|    366|Economics|false|     4-8|
+-----+----------+------+-------+---------+-----+--------+
only showing top 5 rows



In [148]:
# Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.
df_bin.groupBy(F.col('time_bin')).count().orderBy(F.col('count').desc()).show(5)

+--------+-----+
|time_bin|count|
+--------+-----+
|     4-8|16692|
|     0-4|16651|
|   12-16|16566|
|   16-20|16470|
|   20-24|16422|
+--------+-----+
only showing top 5 rows



Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов
1. Id – уникальный идентификатор личного кабинета
2. User_id – уникальный идентификатор посетителя
3. ФИО посетителя
4. Дату рождения посетителя
5. Дата создания ЛК

In [164]:
# Create schema.
schema2 = (
    T.StructType()
    .add('id', T.IntegerType(), False)
    .add('user_id', T.IntegerType(), False)
    .add('name', T.StringType(), False)
    .add('dob', T.IntegerType(), False)
    .add('dosu', T.IntegerType(), False)
)

In [160]:
def generate_users_info(users, min_id):
    """Creates new users"""
    lkid = random.sample(range(min_id, min_id + len(users)), len(users))
    uid = [x[0] for x in users]
    name = [names.get_full_name() for x in users]
    dob = [int(
            datetime.datetime.timestamp(
                randomDate("1960-01-01 00:00:00", "2008-12-31 23:59:59")
            )
        ) for x in users]
    dosu = [int(
            datetime.datetime.timestamp(
                randomDate("2010-01-01 00:00:00", "2021-12-31 23:59:59")
            )
        )for x in users]
    result = list(zip(lkid, uid, name, dob, dosu))
    return result

In [162]:
# Generate inforamtion on users.
data2 = generate_users_info(users, min_id=100)

In [165]:
# Create DataFrame.
df2 = spark.createDataFrame(data2, schema2)

In [173]:
# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.
df.filter(df.tag == 'Sport').join(df2, df.id == df2.user_id, 'inner').select('name').distinct().show()

+----------------+
|            name|
+----------------+
|    George Levan|
|    Erika Hannah|
|  Frank Sterling|
|  William Shulda|
|  Walter Wallace|
|    Edwin Boller|
|    Angela Blake|
|  Beatrice James|
|    Mildred Rose|
| Blanche Brendal|
|  Kimberly Singh|
|  Elizabeth Huff|
|   Rita Sullivan|
| Charles Collins|
|   Lillie Mccall|
|Marcus Hutchison|
|       Joe Neary|
|   Angela Torres|
|     Lori Chavez|
|   Elena Corrado|
+----------------+
only showing top 20 rows

