# Подготовка данных к обучению моделей

## Импорт библиотек и задание констант

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
TRAIN_SIZE = 0.8
TEST_SIZE = 0.2

TRAIN_PATH = 'train'
TEST_PATH = 'test'

In [3]:
spark = SparkSession.builder.appName('PySparkJobNotebook').getOrCreate()
spark

## Загрузка датафрейма

Посмотрим на данные, которые у нас есть.

In [5]:
df = spark.read.parquet('clickstream.parquet')
df.show(5)

+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|      date|               time|event|platform| ad_id|client_union_id|compaign_union_id|ad_cost_type|ad_cost|has_video|target_audience_count|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|2019-04-01|2019-04-01 00:00:48| view| android| 45061|          34734|            45061|         CPM|  200.6|        0|              1955269|
|2019-04-01|2019-04-01 00:00:48| view|     web|121288|         121288|           121288|         CPM|  187.4|        0|               232011|
|2019-04-01|2019-04-01 00:01:03| view| android|102737|         102535|           102564|         CPC|   60.7|        0|                 4410|
|2019-04-01|2019-04-01 00:01:03| view| android|107564|         106914|           107564|         CPM|  217.3|        0|                62711|
|2019-

In [8]:
df.groupby('date').count().orderBy('date').show()

+----------+------+
|      date| count|
+----------+------+
|2019-04-01| 22073|
|2019-04-02| 47117|
|2019-04-03| 59483|
|2019-04-04|275735|
|2019-04-05|519707|
|2019-04-06| 75885|
+----------+------+



## Схема требуемой таблицы

* `ad_id` — id рекламного объявления
* `target_audience_count` — размер аудитории, на которую таргетируется объявление
* `has_video` — 1, если есть видео, иначе 0
* `is_cpm` — 1, если тип объявления CPM, иначе 0
* `is_cpc` — 1, если тип объявления CPC, иначе 0
* `ad_cost` — стоимость объявления в рублях
* `day_count` — число дней, которое показывалась реклама
* `ctr` — отношение числа кликов к числу просмотров

## Признаки is_cpm и is_cpc

In [5]:
ndf = df.withColumn('is_cpm', F.when(F.col('ad_cost_type') == 'CPM', 1).otherwise(0))
ndf[['ad_id', 'ad_cost_type', 'is_cpm']].show(5)

+------+------------+------+
| ad_id|ad_cost_type|is_cpm|
+------+------------+------+
| 45061|         CPM|     1|
|121288|         CPM|     1|
|102737|         CPC|     0|
|107564|         CPM|     1|
|  4922|         CPC|     0|
+------+------------+------+
only showing top 5 rows



In [6]:
ndf = ndf.withColumn('is_cpc', F.when(F.col('ad_cost_type') == 'CPC', 1).otherwise(0))
ndf[['ad_id', 'ad_cost_type', 'is_cpm', 'is_cpc']].show(5)

+------+------------+------+------+
| ad_id|ad_cost_type|is_cpm|is_cpc|
+------+------------+------+------+
| 45061|         CPM|     1|     0|
|121288|         CPM|     1|     0|
|102737|         CPC|     0|     1|
|107564|         CPM|     1|     0|
|  4922|         CPC|     0|     1|
+------+------------+------+------+
only showing top 5 rows



## Вспомогательные признаки is_view и is_click

Чтобы посчитать CTR объявления, нам нужно найти число его просмотров и сколько кликов по нему было сделано. Для этого создадим булевы столбцы `is_view` и `is_click`. Тогда, когда мы сгруппируем данные по id объявлений, значения в этих столбцах можно будет сложить и получить нужные количество просмотров и кликов.

In [7]:
ndf = (ndf.withColumn('is_view', F.when(F.col('event') == 'view', 1).otherwise(0))
          .withColumn('is_click', F.when(F.col('event') == 'click', 1).otherwise(0)))

ndf[['ad_id', 'event', 'is_view', 'is_click']].show(10)

+------+-----+-------+--------+
| ad_id|event|is_view|is_click|
+------+-----+-------+--------+
| 45061| view|      1|       0|
|121288| view|      1|       0|
|102737| view|      1|       0|
|107564| view|      1|       0|
|  4922| view|      1|       0|
| 10325| view|      1|       0|
| 41458| view|      1|       0|
| 45831| view|      1|       0|
|101985| view|      1|       0|
| 16589| view|      1|       0|
+------+-----+-------+--------+
only showing top 10 rows



## Группировка по объявлениям

In [8]:
features = (ndf.groupBy('ad_id').agg(
    F.max(F.col('target_audience_count')).alias('target_audience_count'),
    F.max(F.col('has_video')).alias('has_video'),
    F.max(F.col('is_cpm')).alias('is_cpm'),
    F.max(F.col('is_cpc')).alias('is_cpc'),
    F.max(F.col('ad_cost')).alias('ad_cost'),
    F.sum(F.col('is_view')).alias('views_count'), # Суммируя, получим число просмотров.
    F.sum(F.col('is_click')).alias('clicks_count'), # Суммируя, получим число кликов.
    F.countDistinct(F.col('date')).alias('days_count')))

features.show(5)

+------+---------------------+---------+------+------+-------+-----------+------------+----------+
| ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|views_count|clicks_count|days_count|
+------+---------------------+---------+------+------+-------+-----------+------------+----------+
| 40515|                11533|        0|     1|     0|  214.8|        140|           4|         2|
| 33412|                 7195|        0|     1|     0|  214.1|         35|           0|         2|
| 47217|                 7121|        0|     1|     0|  225.6|         22|           1|         2|
| 33602|              3277386|        0|     1|     0|  187.8|        480|           9|         2|
|119169|                35019|        0|     1|     0|  202.6|        636|           5|         2|
+------+---------------------+---------+------+------+-------+-----------+------------+----------+
only showing top 5 rows



## Расчет CTR

In [9]:
features = features.withColumn('ctr', F.col('clicks_count') / F.col('views_count'))
features.show(5)

+------+---------------------+---------+------+------+-------+-----------+------------+----------+--------------------+
| ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|views_count|clicks_count|days_count|                 ctr|
+------+---------------------+---------+------+------+-------+-----------+------------+----------+--------------------+
| 40515|                11533|        0|     1|     0|  214.8|        140|           4|         2| 0.02857142857142857|
| 33412|                 7195|        0|     1|     0|  214.1|         35|           0|         2|                 0.0|
| 47217|                 7121|        0|     1|     0|  225.6|         22|           1|         2|0.045454545454545456|
| 33602|              3277386|        0|     1|     0|  187.8|        480|           9|         2|             0.01875|
|119169|                35019|        0|     1|     0|  202.6|        636|           5|         2|0.007861635220125786|
+------+---------------------+---------+

Избавимся от лишних столбцов `views_count` и `clicks_count`

In [10]:
features = features.drop('views_count', 'clicks_count')
features.show(5)

+------+---------------------+---------+------+------+-------+----------+--------------------+
| ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|days_count|                 ctr|
+------+---------------------+---------+------+------+-------+----------+--------------------+
| 40515|                11533|        0|     1|     0|  214.8|         2| 0.02857142857142857|
| 33412|                 7195|        0|     1|     0|  214.1|         2|                 0.0|
| 47217|                 7121|        0|     1|     0|  225.6|         2|0.045454545454545456|
| 33602|              3277386|        0|     1|     0|  187.8|         2|             0.01875|
|119169|                35019|        0|     1|     0|  202.6|         2|0.007861635220125786|
+------+---------------------+---------+------+------+-------+----------+--------------------+
only showing top 5 rows



## Разбивка на трейн- и тест-выборки и выгрузка датафреймов

Разбиваем данные на обучающую и тестовую выборки в заранее заданном соотношении.

In [11]:
train_df, test_df = features.randomSplit([TRAIN_SIZE, TEST_SIZE])

Сохраняем получившиеся выборки в соответсвующие папки.

In [12]:
train_df.coalesce(1).write.parquet(TRAIN_PATH)
test_df.coalesce(1).write.parquet(TEST_PATH)