#Установка Spark в Colab


In [258]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


#Импорт библиотек и подключение SparkSession

In [259]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
import datetime
import time
import random
spark = SparkSession.builder.master('local[*]').getOrCreate()

#Схема

In [260]:
schema = T.StructType([
                       T.StructField('user_id', T.IntegerType(), True),
                       T.StructField('timestamp', T.LongType(), True),
                       T.StructField('type', T.StringType(), True),
                       T.StructField('page_id', T.IntegerType(), True),
                       T.StructField('tag', T.ArrayType(T.StringType(), True), True),
                       T.StructField('sign', T.BooleanType(), True),
                       ])

#Генератор данных

In [261]:
def generate_data(count):
    gen_list = []
    id_list = []
    page_id = []
    tags = ['Sport', 'Music', 'News', 'Cripto', 'Economic', 'Games', 'Money',
            'Work']
    action_type = ['visit', 'click', 'scroll', 'move']
    for i in range(1,int(count/3)):
        id_list.append([])
        id_list[i-1].append(i)
        id_list[i-1].append(random.choice([True, False]))

    for i in range(0, count * 2):
        inside_list = []
        temp_numb = random.randint(1, count * 2)

        page_id.append([])
        page_id[i].append(temp_numb)
        for j in range (0, random.randint(1, 3)):
            temp_tag = random.choice(tags)
            inside_list.append(temp_tag)
        page_id[i].append(inside_list)

    for i in range(0,count + 1):
        temp_id = random.choice(id_list)
        temp_page = random.choice(page_id)
        gen_list.append([])
        gen_list[i].append(temp_id[0])
        gen_list[i].append(random.randint(946674000, int(time.time())))
        gen_list[i].append(random.choice(action_type))
        gen_list[i].append(temp_page[0])
        gen_list[i].append(temp_page[1])
        gen_list[i].append(temp_id[1])
    return gen_list
count = 300
data = generate_data(count)
print(data)

[[33, 1564230822, 'move', 191, ['Work', 'Cripto'], False], [38, 1015003862, 'visit', 45, ['Music', 'News', 'Sport'], True], [64, 1521337113, 'visit', 297, ['Work'], False], [66, 1543118715, 'scroll', 283, ['Games'], False], [95, 1552363200, 'scroll', 350, ['Sport'], True], [2, 969123720, 'scroll', 84, ['Sport', 'Economic'], True], [66, 1092994118, 'click', 440, ['Money', 'Money'], False], [43, 1314531544, 'move', 451, ['Music', 'Work', 'Games'], True], [42, 1560796885, 'click', 532, ['Work', 'Cripto'], False], [39, 1657844684, 'scroll', 561, ['News', 'Sport'], False], [36, 1492187767, 'move', 514, ['News'], True], [19, 990392287, 'scroll', 417, ['Cripto', 'Sport', 'Cripto'], True], [10, 1182145411, 'scroll', 365, ['Games', 'Economic', 'News'], False], [53, 1325051453, 'click', 302, ['Economic', 'Economic'], False], [49, 1252944201, 'click', 38, ['News'], False], [49, 1556018211, 'scroll', 356, ['Economic'], False], [19, 1449526728, 'move', 189, ['News', 'Money', 'Games'], True], [55, 1

# Объявление DataFrame


In [262]:
df = spark.createDataFrame(data,schema=schema)
df = df.select(*[i for i in df.columns if i != 'timestamp'],
          F.from_unixtime('timestamp').alias('date_time'))
df.show()

+-------+------+-------+--------------------+-----+-------------------+
|user_id|  type|page_id|                 tag| sign|          date_time|
+-------+------+-------+--------------------+-----+-------------------+
|     33|  move|    191|      [Work, Cripto]|false|2019-07-27 12:33:42|
|     38| visit|     45|[Music, News, Sport]| true|2002-03-01 17:31:02|
|     64| visit|    297|              [Work]|false|2018-03-18 01:38:33|
|     66|scroll|    283|             [Games]|false|2018-11-25 04:05:15|
|     95|scroll|    350|             [Sport]| true|2019-03-12 04:00:00|
|      2|scroll|     84|   [Sport, Economic]| true|2000-09-16 17:02:00|
|     66| click|    440|      [Money, Money]|false|2004-08-20 09:28:38|
|     43|  move|    451|[Music, Work, Games]| true|2011-08-28 11:39:04|
|     42| click|    532|      [Work, Cripto]|false|2019-06-17 18:41:25|
|     39|scroll|    561|       [News, Sport]|false|2022-07-15 00:24:44|
|     36|  move|    514|              [News]| true|2017-04-14 16

# Топ 5 самых активных пользователей сайта

In [263]:
df.groupBy('user_id').agg({'type':'count'}).orderBy('count(type)', ascending = False).show(5)

+-------+-----------+
|user_id|count(type)|
+-------+-----------+
|     66|          9|
|     53|          6|
|     19|          6|
|     47|          6|
|     87|          6|
+-------+-----------+
only showing top 5 rows



# Процент зарегистрированных пользователей

In [264]:
registed_users = df.select('*').where(df['sign']=='true').groupBy('user_id', 'sign').count().distinct().count()
all_users = df.groupBy('user_id').count().distinct().count()
print(f'Quantity of registered users = {registed_users}')
print(f'Quantity of all users = {all_users}')
percent_registred_users = registed_users / all_users * 100
print(f'Percent registred users equals {int(percent_registred_users)}%')

Quantity of registered users = 54
Quantity of all users = 93
Percent registred users equals 58%


# Топ 5 страниц по кликам

In [265]:
df.select('*').where(df['type']=='click').groupBy('page_id').count().orderBy('count', ascending = False).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    530|    2|
|    351|    2|
|    538|    2|
|    510|    2|
|    597|    1|
+-------+-----+
only showing top 5 rows



# Добавляем столбец с временными диапозонами 0-4, 4-8, 8-12 и т.д.

In [266]:
df_hours = df.select('*', F.floor(F.hour('date_time')/4).alias('time_range'))
df_hours.show()

+-------+------+-------+--------------------+-----+-------------------+----------+
|user_id|  type|page_id|                 tag| sign|          date_time|time_range|
+-------+------+-------+--------------------+-----+-------------------+----------+
|     33|  move|    191|      [Work, Cripto]|false|2019-07-27 12:33:42|         3|
|     38| visit|     45|[Music, News, Sport]| true|2002-03-01 17:31:02|         4|
|     64| visit|    297|              [Work]|false|2018-03-18 01:38:33|         0|
|     66|scroll|    283|             [Games]|false|2018-11-25 04:05:15|         1|
|     95|scroll|    350|             [Sport]| true|2019-03-12 04:00:00|         1|
|      2|scroll|     84|   [Sport, Economic]| true|2000-09-16 17:02:00|         4|
|     66| click|    440|      [Money, Money]|false|2004-08-20 09:28:38|         2|
|     43|  move|    451|[Music, Work, Games]| true|2011-08-28 11:39:04|         2|
|     42| click|    532|      [Work, Cripto]|false|2019-06-17 18:41:25|         4|
|   

#Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [267]:
@udf(T.StringType())
def conversion(value):
  for i, v in dict_range.items():
    if int(value) == i:
      return v

df_time_range = df_hours.groupBy('time_range').count().orderBy('count', ascending = False)
dict_range = {0 : '0-4',
              1 : '4-8',
              2 : '8-12',
              3 : '12-16',
              4 : '16-20',
              5 : '20-0'}
df_time_range.select( conversion('time_range').alias('time_range'),'count').show(1)


+----------+-----+
|time_range|count|
+----------+-----+
|       4-8|   59|
+----------+-----+
only showing top 1 row



#Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов:
1.       Id – уникальный идентификатор личного кабинета

2.       User_id – уникальный идентификатор посетителя

3.       ФИО посетителя

4.    Дату рождения посетителя 

5.       Дата создания ЛК

In [268]:
#Генератор данных пользователя
def generate_user_list(count):
  users = []
  sex = ['male', 'female']
  #Диапозон дат рождения и создания аккаунта в формате unix timestamp
  range_birthday = [315532800, 1072915200]
  range_create_account = [1262304000, 1669161600]
  #счётчик ID аккаунта
  accounts = 0
  last_name_female = []
  first_name_female = []
  patronymic_female = []

  last_name_male = []
  first_name_male = []
  patronymic_male = []

  #Список ФИО сгенерированных на https://randomus.ru/

  fio_female = "Попова Мария Даниэльевна,Панова Александра Марковна,\
Сорокина Есения Матвеевна,Токарева Анна Филипповна,\
Григорьева Алёна Ивановна,Сидорова София Александровна,\
Демина Ксения Данииловна,Николаева Татьяна Артёмовна,\
Андреева Арина Александровна,Овчинникова Мария Тимофеевна,\
Захарова Ева Павловна,Кондратьева Анастасия Егоровна,\
Леонова Эмилия Олеговна,Данилова София Львовна,\
Ушакова Диана Артёмовна,Михеева Валерия Кирилловна,\
Смирнова Елена Кирилловна,Казакова Ева Кирилловна,\
Смирнова Екатерина Сергеевна,Сергеева Злата Леонидовна"
  fio_male = "Абрамов Матвей Маркович,Трофимов Александр Артёмович,\
Соловьев Роман Львович,Тарасов Максим Викторович,\
Голубев Захар Максимович,Кудрявцев Альберт Васильевич,\
Степанов Вячеслав Викторович,Некрасов Мирон Артемьевич,\
Белоусов Леонид Львович,Иванов Артём Леонович,\
Фомин Данил Маркович,Корчагин Иван Даниилович,Давыдов Ярослав Павлович,\
Иванов Фёдор Макарович,Аникин Алексей Игоревич,Сухарев Тимофей Давидович,\
Лопатин Арсен Львович,Панфилов Матвей Николаевич,\
Алексеев Александр Алексеевич,Бирюков Андрей Анатольевич"
  
  #Разделение ФИО на Фамилию, Имя и Отчество
  temp_arr = fio_male.split(',')
  for i in temp_arr:
    first_name_male.append(i.split(' ')[0])
    last_name_male.append(i.split(' ')[1])
    patronymic_male.append(i.split(' ')[2])
    
  temp_arr = fio_female.split(',')
  for i in temp_arr:
    first_name_female.append(i.split(' ')[0])
    last_name_female.append(i.split(' ')[1])
    patronymic_female.append(i.split(' ')[2])

  #Генерация данных пользователя
  for i in range(1, int(count/3)):
    
    users.append([])
    tmp_sex = random.choice(sex)
    users[i-1].append(i)

    # Находим в сгенерированных ранее данных id пользователя,
    # если у него есть личный кабинет увеличиваем номер аккаунта на 1
    # и добавляем его к нашему пользователю
    for j, v in enumerate(data):
      if v[0] == i:
        if v[5] == True:
          accounts += 1
          tmp_account = accounts
        elif v[5] == False:
          tmp_account = None
        break
    users[i-1].append(tmp_account)
    
    
    if (tmp_sex == 'female'):
      fio = f'{random.choice(first_name_female)} {random.choice(last_name_female)} {random.choice(patronymic_female)}'
    elif (tmp_sex == 'male'):
      fio = f'{random.choice(first_name_male)} {random.choice(last_name_male)} {random.choice(patronymic_male)}'

    users[i-1].append(fio)
    birthday = random.randint(range_birthday[0], range_birthday[1])
    birthday = datetime.datetime.fromtimestamp(birthday).strftime("%Y-%m-%d")
    users[i-1].append(birthday)
    date_create_account = random.randint(range_create_account[0], range_create_account[1])
    date_create_account = datetime.datetime.fromtimestamp(date_create_account ).strftime("%Y-%m-%d")
    users[i-1].append(date_create_account)
  return users

schema_users = T.StructType([
                       T.StructField('user_id', T.IntegerType(), True),
                       T.StructField('account_id', T.IntegerType(), True),
                       T.StructField('full_name', T.StringType(), True),
                       T.StructField('birthday', T.StringType(), True),
                       T.StructField('date_create_account', T.StringType(), True),
                       ])



data_users = generate_user_list(count)
print(data_users)
df_users = spark.createDataFrame(data_users, schema = schema_users)
df_users.show()

[[1, None, 'Смирнова Валерия Кирилловна', '1980-03-30', '2017-08-06'], [2, 1, 'Бирюков Леонид Леонович', '1990-06-22', '2012-03-31'], [3, 2, 'Андреева Анастасия Леонидовна', '2002-08-21', '2021-10-13'], [4, None, 'Демина Александра Павловна', '1983-04-27', '2011-05-31'], [5, None, 'Белоусов Вячеслав Артемьевич', '1991-06-18', '2016-05-03'], [6, None, 'Соловьев Вячеслав Викторович', '1985-10-14', '2021-03-22'], [7, 3, 'Ушакова Александра Львовна', '2001-07-25', '2014-05-02'], [8, None, 'Давыдов Данил Даниилович', '1988-01-16', '2021-08-05'], [9, 4, 'Данилова Александра Александровна', '2003-03-14', '2018-12-25'], [10, None, 'Давыдов Мирон Николаевич', '1982-01-27', '2015-09-23'], [11, 5, 'Корчагин Максим Артемьевич', '1989-10-11', '2015-02-09'], [12, 6, 'Сорокина Валерия Александровна', '1984-07-20', '2020-06-29'], [13, None, 'Степанов Артём Львович', '1993-08-19', '2017-12-31'], [14, 7, 'Фомин Фёдор Леонович', '2002-10-07', '2021-09-05'], [15, None, 'Белоусов Арсен Алексеевич', '1996-0

#Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [287]:
df.join(df_users, df.user_id == df_users.user_id).where(F.array_contains('tag', 'Sport')).groupBy(df.user_id, 'full_name').count().orderBy(df.user_id).show(100)

+-------+--------------------+-----+
|user_id|           full_name|count|
+-------+--------------------+-----+
|      2|Бирюков Леонид Ле...|    2|
|      3|Андреева Анастаси...|    1|
|      7|Ушакова Александр...|    3|
|      8|Давыдов Данил Дан...|    1|
|     10|Давыдов Мирон Ник...|    1|
|     11|Корчагин Максим А...|    1|
|     13|Степанов Артём Ль...|    1|
|     14|Фомин Фёдор Леонович|    3|
|     16|Некрасов Леонид Л...|    1|
|     18|Сергеева Елена Ле...|    1|
|     19|Корчагин Мирон Ль...|    3|
|     20|Корчагин Иван Льв...|    2|
|     21|Бирюков Данил Вас...|    1|
|     22|Иванов Мирон Викт...|    1|
|     23|Сорокина Анна Арт...|    1|
|     24|Михеева Алёна Лео...|    2|
|     26|Алексеев Данил Ль...|    1|
|     28|Кондратьева Алекс...|    2|
|     29|Степанов Роман Ни...|    1|
|     32|Бирюков Вячеслав ...|    1|
|     34|Демина Злата Павл...|    1|
|     37|Михеева Ксения Да...|    2|
|     38|Токарева София Ар...|    1|
|     39|Кондратьева Ксени...|    1|
|