In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, col, floor


In [2]:
spark = SparkSession.builder.master("local").\
                                    appName("Word count"). \
                                    config("spark.driver.bindAddress","localhost").\
                                    config("spark.ui.port","4040").\
                                    getOrCreate() 

тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).

·   page_id – id текущей страницы. Тип - последовательность чисел фиксированной длины.

·   tag – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.

·   sign – наличие у пользователя личного кабинета. Значения – True/False.*/

In [3]:
data = [(12345, 1667627426, "click", 101, 'Sport', False),
        (12346, 1667627427, "visit", 102, 'Медицина', False),
        (12347, 8667627428, "click", 103, 'Sport', True),
        (12346, 1667627429, "click", 104, 'Политика', False),
        (12349, 4667627430, "click", 103, 'Sport', True),
        ]
column =  ['id','timestamp','type', 'page_id','tag', 'sign'] 
comment = ''''''

df = spark.createDataFrame(data, column)

         

In [4]:
#·   Вывести топ-5 самых активных посетителей сайта
df.groupBy('id').count().orderBy('count', ascending=False).show(5)

+-----+-----+
|   id|count|
+-----+-----+
|12346|    2|
|12349|    1|
|12347|    1|
|12345|    1|
+-----+-----+



In [5]:
#·   Посчитать процент посетителей, у которых есть ЛК
df.groupBy('sign').count().withColumn('percent', round(col('count')/df.count()*100, 2)).show()




+-----+-----+-------+
| sign|count|percent|
+-----+-----+-------+
| true|    2|   40.0|
|false|    3|   60.0|
+-----+-----+-------+



In [6]:
# Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
df.filter(df.type == 'click').groupBy('page_id').count().orderBy('count', ascending=False).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    103|    2|
|    104|    1|
|    101|    1|
+-------+-----+



In [7]:
# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)
df.withColumn('time', floor(df.timestamp % 86400 / 14400)).show()
#df.withColumn('time', df.timestamp % 86400 / 14400).groupBy('time').count().show()
#df.withColumn('time', round(df.timestamp % 86400 / 14400)).groupBy('time').count().show()

+-----+----------+-----+-------+--------+-----+----+
|   id| timestamp| type|page_id|     tag| sign|time|
+-----+----------+-----+-------+--------+-----+----+
|12345|1667627426|click|    101|   Sport|false|   1|
|12346|1667627427|visit|    102|Медицина|false|   1|
|12347|8667627428|click|    103|   Sport| true|   4|
|12346|1667627429|click|    104|Политика|false|   1|
|12349|4667627430|click|    103|   Sport| true|   2|
+-----+----------+-----+-------+--------+-----+----+



In [8]:
#Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.
df.withColumn('time', floor(df.timestamp % 86400 / 14400)).groupBy('time').count().orderBy('count', ascending=False).show(1)

+----+-----+
|time|count|
+----+-----+
|   1|    3|
+----+-----+
only showing top 1 row



'''·   Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

1.       Id – уникальный идентификатор личного кабинета

2.       User_id – уникальный идентификатор посетителя

3.       ФИО посетителя

4.    Дату рождения посетителя 

5.       Дата создания ЛК'''

In [9]:

column2 =  ['id','user_id','fio', 'birthday','date_create']
data2 = [(101, 12345, "Иванов Иван Иванович", '1990-01-01', '2020-01-01'),
        (102, 12346, "Петров Петр Петрович", '1990-01-02', '2020-01-02'),
        (103, 12347, "Сидоров Сидор Сидорович", '1990-01-03', '2020-01-03'),
        (104, 12348, "Алексеев Алексей Алексеевич", '1990-01-04', '2020-01-04'),
        (105, 12349, "Андреев Андрей Андреевич", '1990-01-05', '2020-01-05')]
df2 = spark.createDataFrame(data2, column2)


In [10]:
#·   Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.
df.join(df2, df.id == df2.user_id).filter(df.tag == 'Sport').select('fio').show()

+--------------------+
|                 fio|
+--------------------+
|Иванов Иван Иванович|
|Сидоров Сидор Сид...|
|Андреев Андрей Ан...|
+--------------------+



In [11]:
#·   Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения.
df2.join(df, df2.id == df.page_id).filter(df.sign == True).withColumn('diff', df2.date_create - df.timestamp).orderBy('diff', ascending=False).show(1)

+---+-------+--------------------+----------+-----------+-----+----------+-----+-------+-----+----+----+
| id|user_id|                 fio|  birthday|date_create|   id| timestamp| type|page_id|  tag|sign|diff|
+---+-------+--------------------+----------+-----------+-----+----------+-----+-------+-----+----+----+
|103|  12347|Сидоров Сидор Сид...|1990-01-03| 2020-01-03|12347|8667627428|click|    103|Sport|true|null|
+---+-------+--------------------+----------+-----------+-----+----------+-----+-------+-----+----+----+
only showing top 1 row



In [12]:
#·   Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины.
# -  у нас никаких полов нет, 

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 60031)


e.       Создайте в Postgres таблицы аналогичной структуры и выполните следующие задания с помощью Spark.

·    Создайте витрину данных в Postgres со следующим содержанием

1.       Id посетителя

2.       Возраст посетителя

3.       Пол посетителя (постарайтесь описать логику вычисления пола в отдельной пользовательской функции)

4.       Любимая тематика новостей

5.       Любимый временной диапазон посещений

6.       Id личного кабинета

7.       Разница в днях между созданием ЛК и датой последнего посещения. (-1 если ЛК нет)

8.       Общее кол-во посещений сайта

9.       Средняя длина сессии(сессией считаем временной промежуток, который охватывает последовательность событий, которые происходили подряд с разницей не более 5 минут).

10.   Среднее кол-во активностей в рамках одной сессии

In [4]:
#·    Создайте витрину данных в Postgres со следующим содержанием
# 1.       Id посетителя



