
# Работа с данными сервиса "Яндекс Книги" в PySpark

- Автор: Логинов Павел Александрович
- Дата: 07.11.2025

## Суть проекта

В этом проекте вам понадобится поработать с данными сервиса Яндекс Книги, который предоставляет доступ к контенту разных форматов, включая текст, аудио и не только. Руководство сервиса хочет лучше понимать поведение пользователей: какие типы контента они выбирают, как долго его слушают или читают, а также в какие дни недели и через какие платформы (мобильное приложение, веб-версия) это происходит. Эти инсайты позволят улучшить систему рекомендаций и принимать стратегические решения по развитию продукта. Для этого вам понадобится обработать и проанализировать реальные пользовательские данные с помощью PySpark, построить агрегаты и сделать бизнес-выводы на их основе.

Затем вы решите задачи отдела аналитики: нужно проанализировать, как различается суммарное время потребления контента в выходные и будние дни, а также выяснить, для какого типа контента наблюдается такая разница — для взрослого или невзрослого. Дополнительно вас просят преобразовать набор данных и записать его в ClickHouse, чтобы отдел аналитики смог проводить свой анализ на очищенных данных.

## Описание данных

1) Таблица `bookmate.audition` содержит данные об активности пользователей и включает столбцы:

- `audition_id` — уникальный идентификатор сессии чтения или прослушивания;
- `puid` — идентификатор пользователя;
- `usage_platform_ru` — название платформы, с помощью которой пользователь взаимодействует с контентом;
- `msk_business_dt_str` — дата и время события (строка, часовой пояс — МСК);
- `app_version` — версия приложения;
- `adult_content_flg` — значение, которое показывает, был ли контент для взрослых ( True или False );
- `hours` — длительность сессии чтения или прослушивания в часах;
- `hours_sessions_long` — длительность длинных сессий в часах;
- `kids_content_flg` — значение, которое показывает, был ли это детский контент ( True или False );
- `main_content_id` — идентификатор основного контента;
- `usage_geo_id` — идентификатор географического местоположения пользователя.

2) Таблица `bookmate.content` содержит данные об контенте и включает столбцы:

- `main_content_id` — идентификатор основного контента;
- `main_author_id` — идентификатор основного автора контента;
- `main_content_type` — тип контента: аудио, текст или другой;
- `main_content_name` — название контента;
- `main_content_duration_hours` — длительность контента в часах;
- `published_topic_title_list` — список жанров или тем контента.

## Шаг 1. Загрузка данных и знакомство с ними

Начнём с загрузки необходимых библиотек для работы фреймворка `PySpark`

In [1]:
# Загружаем необходимые библиотеки

from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date, dayofweek
from pyspark.sql import functions as F

Прочитаем обе таблицы с помощью `Spark`

In [2]:
# Создаём Spark-сессию

spark = SparkSession.builder \
    .appName("Create DataFrame and Read from S3") \
    .config("fs.s3a.endpoint", "storage.yandexcloud.net") \
    .getOrCreate()

# Считываем данные из S3

audition_df = (spark.read.
		option("header", False)         
		.option("inferSchema", True)   
		.csv("/home/jovyan/work/audition.csv")
)

content_df = (spark.read.
		option("header", False)        
		.option("inferSchema", True)   
		.csv("/home/jovyan/work/content.csv")
)

25/11/08 10:49:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

Познакомимся с таблицами и сравним их структуру с описанием данных - выведем по 10 строк каждой таблицы

In [3]:
# Выводим первые 10 строк

audition_df.show(10)

content_df.show(10)

                                                                                

+-----+---+--------------------+---------------+----------+----------+-----+------------------+------------------+-----+--------+--------------------+---------+
|  _c0|_c1|                 _c2|            _c3|       _c4|       _c5|  _c6|               _c7|               _c8|  _c9|    _c10|                _c11|     _c12|
+-----+---+--------------------+---------------+----------+----------+-----+------------------+------------------+-----+--------+--------------------+---------+
|  162|  0|68296628-f9d6-11e...|        Станция|2024-11-26|      null|false|0.0377777777777777|0.0377777777777777| true|oCURrBKV|              Алматы|Казахстан|
|  213|  1|682966dc-f9d6-11e...|        Станция|2024-11-26|      null|false| 8.333333333333E-4|               0.0| true|qOL0JJL5|              Москва|   Россия|
|   63|  2|682966dc-f9d6-11e...|        Станция|2024-11-26|      null|false|0.0044444444444444|               0.0| true|ndM5nzgT|             Иркутск|   Россия|
|    2|  4|68296704-f9d6-11e...|  

Структура данных из двух таблиц соответствуют заявленному описанию данных. Однако стоит обратить внимание на то, что столбцы датафреймов не имеют названий. Перед тем как исправить это - познакомимся со схемами данных

In [4]:
# Выводим схемы данных

audition_df.printSchema()

content_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: boolean (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: boolean (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



Из-за отсутствия названий столбцов мы не можем проверить соответствие типов данных с характером данных - поэтому добавим добавим корректные названия столбцов

In [5]:
# Переименуем столбцы

audition_df = (audition_df.withColumnRenamed("_c0", "audition_id")
                          .withColumnRenamed("_c2", "puid")
                          .withColumnRenamed("_c3", "usage_platform_ru")
                          .withColumnRenamed("_c4", "msk_business_dt_str")
                          .withColumnRenamed("_c5", "app_version")
                          .withColumnRenamed("_c6", "adult_content_flg")
                          .withColumnRenamed("_c7", "hours")
                          .withColumnRenamed("_c8", "hours_sessions_long")
                          .withColumnRenamed("_c9", "kids_content_flg")
                          .withColumnRenamed("_c10", "main_content_id")
                          .withColumnRenamed("_c11", "usage_geo_id_name")
                          .withColumnRenamed("_c12", "usage_country_name"))
                   
content_df = (content_df.withColumnRenamed("_c0", "main_content_id")
                          .withColumnRenamed("_c1", "main_content_type")
                          .withColumnRenamed("_c2", "main_content_name")
                          .withColumnRenamed("_c3", "main_content_duration_hours")
                          .withColumnRenamed("_c4", "published_topic_title_list")
                          .withColumnRenamed("_c5", "main_author_name"))

# Выводим первые 10 строк

audition_df.show(10)

content_df.show(10)

+-----------+---+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+
|audition_id|_c1|                puid|usage_platform_ru|msk_business_dt_str|app_version|adult_content_flg|             hours|hours_sessions_long|kids_content_flg|main_content_id|   usage_geo_id_name|usage_country_name|
+-----------+---+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+
|        162|  0|68296628-f9d6-11e...|          Станция|         2024-11-26|       null|            false|0.0377777777777777| 0.0377777777777777|            true|       oCURrBKV|              Алматы|         Казахстан|
|        213|  1|682966dc-f9d6-11e...|          Станция|         2024-11-26|       null|            false| 8.333333333333E-4

Теперь каждый столбец имеет своё корректное названием. Обратим внимание, что столбец `_c1` в датафрейме `audition_df` не включает в себя значимую информацию - поэтому мы его удалим

In [6]:
# Удаляем ненужный столбец

audition_df = audition_df.drop("_c1") 

После выполненных преобразований мы можем полноценно познакомиться со схемами данных

In [7]:
# Выводим схемы данных

audition_df.printSchema()

content_df.printSchema()

root
 |-- audition_id: integer (nullable = true)
 |-- puid: string (nullable = true)
 |-- usage_platform_ru: string (nullable = true)
 |-- msk_business_dt_str: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- adult_content_flg: boolean (nullable = true)
 |-- hours: double (nullable = true)
 |-- hours_sessions_long: double (nullable = true)
 |-- kids_content_flg: boolean (nullable = true)
 |-- main_content_id: string (nullable = true)
 |-- usage_geo_id_name: string (nullable = true)
 |-- usage_country_name: string (nullable = true)

root
 |-- main_content_id: string (nullable = true)
 |-- main_content_type: string (nullable = true)
 |-- main_content_name: string (nullable = true)
 |-- main_content_duration_hours: double (nullable = true)
 |-- published_topic_title_list: string (nullable = true)
 |-- main_author_name: string (nullable = true)



Отметим, что каждый из столбцов может принимать нулевое значение

Несоответствие характера и типа данных наблюдается в столбце `msk_business_dt_str` (таблица `audition_df`) — дата и время события

Информация, представленная в формате даты и времени события, должна иметь тип данных `DateType`, а не строковый `StringType`

Изменим тип данных в этом столбце

In [8]:
# Изменяем тип данных

audition_df = (audition_df
    .withColumn("msk_business_dt_str",
                to_date("msk_business_dt_str", "yyyy-MM-dd")
    )
)

# Выводим первые 5 строк столбца msk_business_dt_str и его тип данных

audition_df.select("msk_business_dt_str").show(5)

audition_df.select("msk_business_dt_str").printSchema()

+-------------------+
|msk_business_dt_str|
+-------------------+
|         2024-11-26|
|         2024-11-26|
|         2024-11-26|
|         2024-11-26|
|         2024-11-26|
+-------------------+
only showing top 5 rows

root
 |-- msk_business_dt_str: date (nullable = true)



Теперь все столбцы двух таблиц имеют подходящий тип данных, который соответствует характеру заявленных данных

Проверим количество строк в каждой из таблиц

In [9]:
# Изучаем объём

print(f'Количество строк в таблице audition_df: {audition_df.count()}')
print()
print(f'Количество строк в таблице content_df: {content_df.count()}')

                                                                                

Количество строк в таблице audition_df: 1002896

Количество строк в таблице content_df: 31668


Мы видим, что количество строк в датафрейме `audition_df` превышает количество строк в датафрейме `content_df` более, чем в **30** раз. Но эта разница - обоснована, поскольку в первом датафрейме содержится информация о каждом действии каждого пользователя за определённый промежуток времени, что говорит о большом количестве различных событий. А во второй таблице содержится информация только о контенте в формате **"1 произведение - 1 строка"**. К тому же, информация о контенте изменяется крайне редко, поскольку произведения искусства появляются гораздо реже, чем события прослушивания или прочтения, количество сильно увеличивается по мере роста популярности сервиса "Яндекс Книги"

**Прожуточные выводы:**
    
- Созданы два датафрейма - `audition_df` и `content_df`
- Изучена структура данных датафреймов `audition_df` и `content_df`
- Столбцам присвоены корректные названия в соответствии с заявленным описанием данных
- Удалён 1 лишний столбец
- Изменён тип данных у 1 столбца в датафрейме `audition_df`
- Приведено обоснование существенной разницы в количестве строк между датафреймами `audition_df` и `content_df`

Таким образом, мы делаем вывод о том, что **Шаг 1. Знакомство с данными** - завершён. Можно переходить к **Шаг 2. Трансформация и преобразование таблиц**

## Шаг 2. Трансформация и преобразование таблиц

На основе таблицы `audition_df` создадим новый столбец `minutes_sessions_long`, в котором значение `hours_sessions_long` будет умножено на 60 для расчётов в минутах. Заменим тип данных этого столбца на `int`. Выведем первые десять строк полученной таблицы

In [10]:
# Создаем новый столбец minutes_sessions_long и преобразуем тип данных

df = audition_df.withColumn("minutes_sessions_long", (F.col("hours_sessions_long") * 60).cast("int"))
    
# Выводим первые 10 строк

df.show(10)

+-----------+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+---------------------+
|audition_id|                puid|usage_platform_ru|msk_business_dt_str|app_version|adult_content_flg|             hours|hours_sessions_long|kids_content_flg|main_content_id|   usage_geo_id_name|usage_country_name|minutes_sessions_long|
+-----------+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+---------------------+
|        162|68296628-f9d6-11e...|          Станция|         2024-11-26|       null|            false|0.0377777777777777| 0.0377777777777777|            true|       oCURrBKV|              Алматы|         Казахстан|                    2|
|        213|682966dc-f9d6-11e...|          Станция|

Таким образом, мы получаем датафрейм `df`, который содержит информацию о том, в какой день `msk_business_dt_str`, сколько часов `hours_sessions_long` и минут `minutes_sessions_long` длилось прослушивание/прочтение пользователем какого-либо контента, а также показывает - был контент для взрослых или нет (`adult_content_flg`)

К предыдущему запросу добавим новый столбец `is_weekend`, который покажет, был ли этот день рабочим (`False` — рабочий, `True` — выходной). Выведем первые десять строк полученной таблицы

In [11]:
# Добавляем столбец is_weekend

df = df.withColumn("is_weekend", F.dayofweek(F.col("msk_business_dt_str")).isin([1, 7]))

# Выводим первые 10 строк

df.show(10)

+-----------+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+---------------------+----------+
|audition_id|                puid|usage_platform_ru|msk_business_dt_str|app_version|adult_content_flg|             hours|hours_sessions_long|kids_content_flg|main_content_id|   usage_geo_id_name|usage_country_name|minutes_sessions_long|is_weekend|
+-----------+--------------------+-----------------+-------------------+-----------+-----------------+------------------+-------------------+----------------+---------------+--------------------+------------------+---------------------+----------+
|        162|68296628-f9d6-11e...|          Станция|         2024-11-26|       null|            false|0.0377777777777777| 0.0377777777777777|            true|       oCURrBKV|              Алматы|         Казахстан|                    2|     false|
|       

Таким образом, мы получаем датафрейм `df`, который содержит все данные из датафрейма `audition_df` и информацию о том, сколько минут `minutes_sessions_long` длилось прослушивание/прочтение пользователем какого-либо контента

Далее рассчитаем суммы значений `minutes_sessions_long` для выходных и будних дней. Назовём этот столбец `total_minutes`. Отсортируем по `is_weekend` и сравним значения.

In [12]:
# Группируем по is_weekend и считаем сумму minutes_sessions_long

df_grouped_1 = (df
    .groupBy("is_weekend")
    .agg(F.sum("minutes_sessions_long").alias("total_minutes"))
    .orderBy("is_weekend"))

# Выводим результат

df_grouped_1.show()



+----------+-------------+
|is_weekend|total_minutes|
+----------+-------------+
|     false|     17995153|
|      true|      6598993|
+----------+-------------+



                                                                                

Таким образом, мы получаем датафрейм `df_grouped_1`, который содержит информацию о суммарной длительности сессий для будних дней (`False`) и выходных дней (`True`). На основе этого датафрейма мы можем сделать вывод о том, что суммарная длительность сессий в будние дни почти в **2.7** раза превышает суммурную длительность сессий в выходные дни

Теперь проделаем аналогичный анализ отдельно для взрослого и невзрослого контента на основе столбца `adult_content_flg`. Избавимся от пропусков в итоговой таблице. В результате должна получиться таблица с четырьмя строками. Отсортируем её сначала по возрастному рейтингу, а затем по выходным. Проанализируем полученные результаты

In [13]:
# Избавимся от пропусков в итоговой таблице

df = df.filter(F.col("adult_content_flg").isNotNull())

# Группируем по adult_content_flg и is_weekend и считаем сумму minutes_sessions_long

df_grouped_2 = (df
    .groupBy("adult_content_flg", "is_weekend")
    .agg(F.sum("minutes_sessions_long").alias("total_minutes"))
    .orderBy("adult_content_flg", "is_weekend"))

# Выводим результат

df_grouped_2.show()



+-----------------+----------+-------------+
|adult_content_flg|is_weekend|total_minutes|
+-----------------+----------+-------------+
|            false|     false|      3369978|
|            false|      true|      1401035|
|             true|     false|     14625175|
|             true|      true|      5197958|
+-----------------+----------+-------------+



                                                                                

Таким образом, мы получаем датафрейм `df_grouped_2`, который содержит информацию о суммарной длительности сессий для 4 сочетаний:

- невзрослый контент (`False`) и будний день (`False`)
- невзрослый контент (`False`) и выходной день (`True`)
- взрослый контент (`True`) и будний день (`False`)
- взрослый контент (`True`) и выходной день (`True`)
    
На основе датафрейма `df_grouped_2` мы можем сделать вывод о том, что: 

- Максимальная суммарная длительность сессий принадлежит сочетанию `взрослый контент и будний день` - **14625175** минут. 
- Минимальная суммарная длительность сессий принадлежит сочетанию `невзрослый контент и выходной день` - **1401035** минут.

`Невзрослый контент в будний день` просматривается **3369978** минуты, а `взрослый контент в выходной день` просматривается **5197958** минуты

**Промежуточный вывод:**

- В наибольшей степени пользователи предпочитают прослушивание/прочтение `взрослого контента по будням дням`
- В наименьшей степени пользователи предпочитают прослушивание/прочтение `невзрослого контента по выходным дням`

Таким образом, мы делаем вывод о том, что **Шаг 2. Трансформация и преобразование таблиц** - завершён. Можно переходить к **Шаг 3. Соединение таблиц**

## Шаг 3. Соединение таблиц

Объединим таблицы `audition_df` и `content_df` по столбцу `main_content_id`. Посмотрим, сколько строк получилось после объединения. Убедимся, что количество строк соответствует ожиданиям

In [14]:
# Объединяем таблицы по main_content_id

merged_df = audition_df.join(
    content_df, 
    on="main_content_id", 
    how="left"  
)

# Считаем количество строк после объединения
 
print(f'Количество строк после объединения: {merged_df.count()}')

# Сравниваем с исходными таблицами

print(f'Количество строк в таблице audition_df: {audition_df.count()}')
print(f'Количество строк в таблице content_df: {content_df.count()}')

                                                                                

Количество строк после объединения: 1002896


                                                                                

Количество строк в таблице audition_df: 1002896
Количество строк в таблице content_df: 31668


Количество строк после объединения и в исходном датафрейме `audition_df` - совпадает

Удалим все лишние столбцы из объединённой таблицы, которые не нужны для дальнейшего анализа: `main_author_id`, `app_version`, `usage_geo_id`. Выведем первые десять строк

In [15]:
# Удаляем лишние столбцы - оставляем только нужные для анализа

merged_df = merged_df.drop("main_author_id", "app_version", "usage_geo_id")

# Выводим первые 10 строк

merged_df.show(10)

# Выводим схему данных

merged_df.printSchema()

                                                                                

+---------------+-----------+--------------------+-----------------+-------------------+-----------------+------------------+-------------------+----------------+--------------------+------------------+-----------------+--------------------+---------------------------+--------------------------+--------------------+
|main_content_id|audition_id|                puid|usage_platform_ru|msk_business_dt_str|adult_content_flg|             hours|hours_sessions_long|kids_content_flg|   usage_geo_id_name|usage_country_name|main_content_type|   main_content_name|main_content_duration_hours|published_topic_title_list|    main_author_name|
+---------------+-----------+--------------------+-----------------+-------------------+-----------------+------------------+-------------------+----------------+--------------------+------------------+-----------------+--------------------+---------------------------+--------------------------+--------------------+
|       oCURrBKV|        162|68296628-f9d6-11e

На основе схемы данных мы видим, что лишние столбцы - удалены

Посчитаем количество уникальных пользователей `puid` в объединённой таблице. Сравним это с количеством пользователей в изначальной таблице `audition_df`. Объясним разницу, если она есть

In [16]:
# Количество уникальных пользователей в объединенной таблице

users_merged_df = merged_df.select(F.countDistinct("puid")).first()[0]
print(f"Количество уникальных пользователей в объединенной таблице: {users_merged_df}")

# Количество уникальных пользователей в исходной таблице audition_df

users_audition_df = audition_df.select(F.countDistinct("puid")).first()[0]
print(f"Количество уникальных пользователей в audition_df: {users_audition_df}")

                                                                                

Количество уникальных пользователей в объединенной таблице: 31063




Количество уникальных пользователей в audition_df: 31063


                                                                                

Количество уникальных польователей в объединённой и изначальной таблицах - совпадают, поскольку при присоединении датафреймов `audition_df` и `content_df` мы использовали левый тип присоединения `left`, который сохранил данные обо всех действиях пользователей и добавил информацию об имеющемся контенте

Используя `collect()`, выведем на экран все уникальные значения поля `main_content_type`.

In [17]:
# Выведем все уникальные значения main_content_type

unique_main_content_type = (merged_df
    .select("main_content_type")
    .distinct()
    .collect()
)

clean_list = [row.main_content_type for row in unique_main_content_type]

print(clean_list)   

                                                                                

['Audiobook', None, 'Book', 'Comicbook']


Исходя из применения метода `collect()`, мы можем увидеть, что в наших данных присутствует три типа контента:
    
- `Audiobook` - аудиокнига
- `Book` - книга
- `Comicbook` - комикс

**Промежуточный вывод:**
    
Мы создали датафрейм `merged_df`:
    
- Объединён по столбцу `main_content_id`
- Количество строк и уникальных пользователей в данном датафрейме и исходном датафрейме `audition_df` - совпадает
- Удалены лишние столбцы:
    - `main_author_id` - идентификатор основного автора контента
    - `app_version` - версия приложения
    - `usage_geo_id` - идентификатор географического местоположения пользователя

Также определили, что в в наших данных присутствует три типа контента - аудиокниги (`Audiobook`), книги (`Book`) и комиксы (`Comicbook`)

Дополнительно нас попросили преобразовать набор данных и записать его в ClickHouse, чтобы отдел аналитики смог проводить свой анализ на очищенных данных

In [None]:
# Записываем набор данных в ClickHouse

merged_df.write \
   .format("parquet") \
   .save("/mnt/data/joined_data")

merged_df.write \
   .format("jdbc") \
   .option("url", "jdbc:clickhouse://analytics.bookmate.io:9000") \
   .option("dbtable", "analytics.bookmate_sessions") \
   .option("user", "etl_user") \
   .option("password", "ETL_2025!") \
   .save()

merged_df.write \
   .mode("overwrite") \
   .save("/mnt/data/final_output")

## Итоговый вывод

На основе анализа данных сервиса "Яндекс Книги", мы делаем вывод о том, что в наибольшей степени пользователи предпочитают прослушивание/прочтение `взрослого контента по будням дням`, а в наименьшей - предпочитают прослушивание/прочтение `невзрослого контента по выходным дням`

Такая тенденция объясняется тем, что взрослые пользователи сервиса "Яндекс Книги" предпочитают слушать или читать контент по пути на учёбу/работу или возвращаясь домой, а юные пользователи не могут себе позволить этого, поскольку по будним дням они находятся в детском саду/школе, а дома выполняют домашнее задание - значит, слушать или читать контент они могут только по выходным. Поэтому суммарная длительность сессии для взрослого контента превышает суммарную длительность сессии для детского контента в **4.1** раза