<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Импорты-и-создание-сессии" data-toc-modified-id="Импорты-и-создание-сессии-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Импорты и создание сессии</a></span></li><li><span><a href="#Изучение-данных" data-toc-modified-id="Изучение-данных-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Изучение данных</a></span></li><li><span><a href="#DecisionTreeRegressor" data-toc-modified-id="DecisionTreeRegressor-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>DecisionTreeRegressor</a></span></li><li><span><a href="#RandomForestRegressor" data-toc-modified-id="RandomForestRegressor-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>RandomForestRegressor</a></span></li><li><span><a href="#GBTRegressor" data-toc-modified-id="GBTRegressor-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>GBTRegressor</a></span></li><li><span><a href="#Подготовка-snippets" data-toc-modified-id="Подготовка-snippets-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Подготовка snippets</a></span></li></ul></div>

<footer id="footer"></footer>

<p align='center'>Ml Engeenering</p>
<p align="center"><img src="https://drive.google.com/uc?id=1X5HPpSb2Bk2QRXZzZy-Xp_vfwMyKF8ly" width=500 border="0"></a></p>



Нам необходимо провести исследование, выбрать лучший тип модели и реализовать распределенную модель, для этого, вы можете задействовать все модели регрессии (`DecisionTreeRegressor`, `RandomForestRegressor`, `GBTRegressor`), подобрать оптимальные гиперпараметры, сравнить результаты и выбрать лучшую для дальнейшего применения.



<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## Импорты и создание сессии

In [1]:
import warnings
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import (DecisionTreeRegressor,
                                       RandomForestRegressor,
                                       GBTRegressor)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import (ParamGridBuilder, 
                               TrainValidationSplit)
SEED=42
warnings.filterwarnings("ignore")

Создадим спарк-сессию и приступим к работе

In [2]:
spark = SparkSession.builder.appName("ML_project").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/05 21:24:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## Изучение данных

Загрузим наши выборки и посмотрим первые строки. Поймем, что за тип данных у нас в работе и какие потребуетются преобразование в дальнейшем

In [4]:
train = spark.read.parquet("train.parquet", header=True)
test = spark.read.parquet("test.parquet", header=True)

                                                                                

In [5]:
train.show(5)

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    1|     10707.2440058622|        1|     1|     0|201.829292651124|       15|0.431740082807281|
|    5|     10643.3872649482|        1|     1|     0|192.577221699704|       15|0.809264519216201|
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
|    7|     10109.3278687796|        1|     1|     0|194.255798599684|       12|0.941221039774456|
|    8|     10665.1119991977|        1|     1|     0|202.658042557742|       14|0.986790019690954|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 5 rows



Структура данных, которую мы будем использовать:

| Имя признака  | Тип признака   | Описание  |
|:---:|:---:|:---:|
|  ad_id |  integer |  id рекламного объявления | 
|  target_audience_count	 | decimal  | размер аудитории, на которую таргетируется объявление  |
| has_video  | integer  |  1 если есть видео, иначе 0 |
|  is_cpm |  integer |	1 если тип объявления CPM, иначе 0   | 
|  is_cpc |   integer|   1 если тип объявления CPC, иначе 0| 
|  ad_cost |   double|  стоимость объявления в рублях | 
|  day_count |  integer |  Число дней, которое показывалась реклама | 
|  ctr |  double |  Отношение числа кликов к числу просмотров | 



Типы данных нас устраивают. Первое что нам необходимо сделать - привести данные к векторному виду, с которым работает `sparkML`. Делить выборку на валидацию и обучение не будем - сделаем это в процессе подбора параметров. Можно исключить признак `ad_id` из вектора. Также добавим стандартизацию для нашего вектора


Теперь приступим к построению пайплайна. Определим шаги:


1. Нам необходимо преобразовать в вектор наши фреймы. 
2. Создать модель. 
3. Получить предсказания

Но перед этим, мы попробуем модели на тесте и проведем подбор гиперпараметров - поэтому наш пайплайн будет пока включать только шаг `1`

<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## DecisionTreeRegressor

Составим пайплайн и попробуем подобрать параметры, затем и получим предсказания на тесте

In [6]:
vector = VectorAssembler(inputCols=train.columns[1:-1], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol="ctr", metricName="rmse")

dt_model = DecisionTreeRegressor(labelCol="ctr", featuresCol="scaled_features", seed=SEED)
pipeline = Pipeline(stages=[vector, scaler, dt_model])

In [7]:
params = (ParamGridBuilder()
          .addGrid(dt_model.maxDepth, [2, 3, 4, 5])
          .addGrid(dt_model.minInfoGain, [0.1, 0.2, 0.4])
          .build()
         )

tvs = TrainValidationSplit(estimator=pipeline,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          trainRatio=0.8)

In [8]:
model_tree = tvs.fit(train)

                                                                                

In [9]:
model_tree.validationMetrics

[0.49793226083037,
 0.49793226083037,
 0.6964096428567128,
 0.3156563505343809,
 0.3156563505343809,
 0.6964096428567128,
 0.3156563505343809,
 0.3156563505343809,
 0.6964096428567128,
 0.3156563505343809,
 0.3156563505343809,
 0.6964096428567128]

In [10]:
model_best = model_tree.bestModel
pred_test = model_best.transform(test)
# считаем rmse
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol="ctr", metricName="rmse")
rmse = evaluator.evaluate(pred_test)


print("RMSE на тесте:", round(rmse, 3))

RMSE на тесте: 0.315


<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## RandomForestRegressor

In [11]:
rf_model = RandomForestRegressor(labelCol="ctr", featuresCol="scaled_features", seed=SEED)
pipeline = Pipeline(stages=[vector, scaler, rf_model])

In [12]:
params = (ParamGridBuilder()
          .addGrid(rf_model.maxDepth, [2, 3, 4, 5])
          .addGrid(rf_model.numTrees, [5, 10, 12])
          .build()
         )

tvs = TrainValidationSplit(estimator=pipeline,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          trainRatio=0.8)

In [13]:
model_forest = tvs.fit(train)

                                                                                

In [14]:
# считаем rmse
model_best = model_forest.bestModel
preds = model_best.transform(test)
# считаем rmse
rmse = evaluator.evaluate(preds)

print("RMSE на валидации:", round(rmse, 3))

RMSE на валидации: 0.319


На данной задаче дерево работает лучше, чем лес - попробуем модель с градиентным бустингом и тогда, выбрав лучшую - приступим к построению пайплайна

<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## GBTRegressor

In [15]:
gbt_model = GBTRegressor(labelCol="ctr", featuresCol="scaled_features", seed=SEED)
pipeline = Pipeline(stages=[vector, scaler, gbt_model])

In [16]:
params = (ParamGridBuilder()
          .addGrid(gbt_model.maxDepth, [2, 3, 4, 5])
          .addGrid(gbt_model.maxIter, [15, 20, 25])
          .build()
         )

tvs = TrainValidationSplit(estimator=pipeline,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          trainRatio=0.8)

In [17]:
model_gbt = tvs.fit(train)

22/06/05 21:25:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/06/05 21:25:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [18]:
# считаем rmse
model_best = model_gbt.bestModel
pred_test = model_best.transform(test)
# считаем rmse
rmse = evaluator.evaluate(pred_test)

print("RMSE на валидации:", round(rmse, 3))

RMSE на валидации: 0.256


Таким образом - лучшая модель - модель на основе Градиентного бустинга - остановимся на ней. Сформируем итоговый пайплайн на базе этой модели и подготовим необходимые скрипты. Проверим, что все работает как надо. Если все нормально - сохраняем pipeline в виде модели

Все работает как задумано. Теперь мы можем переходить к подготовке скриптов. Напишем сниппеты, которые лягут в основу и затем соберем скрипты

<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___


## Подготовка snippets

Нам необходимо подготовить два скрипта - один на обучение, подбор гиперапараметров и выбор лучшей модели, второй на получения предсказаний

In [19]:
from datetime import datetime
from tqdm import tqdm
from pathlib import Path
from typing import List
import numpy as np
import logging
SEED = 42
MODEL_PATH = 'spark_ml_model'


log_classes = {
    'init': 'INIT',
    'metric': 'METRIC',
    'model': 'MODEL',
    'process': 'PROCESSING',
    'complete': 'COMPLETED'
}

def process(spark:SparkSession, train_data: Path, test_data: Path) -> None:
    """основной скрипт по валидации и получению параметров на тестовой выборке
       создает три модели, подбирает параметры и получает окончательные результаты на тестовой выборке
       сохраняет лучшую модель"""
    # читаем train и test
    train = spark.read.parquet(train_data, header=True)
    test = spark.read.parquet(test_data, header=True)
    
    _log(log_classes['init'], 'Train and test are loaded')
    # создаем эвалуатора для подсчета RMSE
    evaluator = RegressionEvaluator(
        predictionCol='prediction', labelCol="ctr", metricName="rmse")
    _log(log_classes['init'], 'Evaluator created. Metric is RMSE')
    # добавляем необходимые трансформации
    vector = VectorAssembler(
        inputCols=train.columns[1:-1], outputCol="features")
    _log(log_classes['init'], f'Features for processing are {train.columns[1:-1]}')
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)
    # получаем список моделей и параметров
    models = get_models()
    _log(log_classes['init'], 'Models are loaded')
    metrics = []
    best_models = {}
    # перебираем ключи в словаре моделей и производим валидацию и сбор результатов
    for key in models.keys():
        model = models[key]["model"]
        model_name = model.__class__.__name__
        _log(log_classes['process'], f'Working with {model_name}')
        params = models[key]["params"]
        # создаем пайплайн и tvs
        pipeline = Pipeline(stages=[vector, scaler, model])
        tvs = TrainValidationSplit(estimator=pipeline,
                                   estimatorParamMaps=params,
                                   evaluator=evaluator,
                                   trainRatio=0.8)
        _log(log_classes['process'], 'Starting parametrs tuning')
        # собираем метрики
        fit_model = tvs.fit(train)
        best_model = fit_model.bestModel
        best_metric = evaluator.evaluate(best_model.transform(test))
        _log(log_classes['metric'], f'The best metric on test for model {model_name} is {best_metric}')
        metrics.append(best_metric)
        best_models[key] = best_model
    # выбираем лучшую модель
    choose_model = get_best(metrics, best_models)
    _log(log_classes['model'], f'The best model is {model_name}')
    _log(log_classes['metric'], f'The best metric on test is {min(metrics)}')
    choose_model.write().overwrite().save(MODEL_PATH)
    _log(log_classes['complete'], f'Model is saved. Path is {MODEL_PATH}')
    spark.stop()
    _log(log_classes['complete'], 'Spark session is stoped. }')
    
def get_best(metrics: List[float], best_model:PipelineModel) -> PipelineModel:
    """отбираем лучшую модель"""
    idx = np.argmax(metrics)
    keys_list = list(best_model)
    key = keys_list[idx]
    return best_model[key]


def get_models():
    """создаем модели и фиксируем их сетку параметров для валидации"""
    dt = DecisionTreeRegressor(
        labelCol='ctr', featuresCol="scaled_features", seed=SEED)
    rf = RandomForestRegressor(
        labelCol='ctr', featuresCol="scaled_features", seed=SEED)
    gbt = GBTRegressor(labelCol='ctr', featuresCol="scaled_features", seed=SEED)

    models_info = {
        'dt': {'model': dt,
               'params': ParamGridBuilder()
                 .addGrid(dt.maxDepth, [2, 3, 4, 5])
                 .addGrid(dt.minInfoGain, [0.1, 0.2, 0.4])
                 .build()},
        'rf': {'model': rf,
               'params': ParamGridBuilder()
                 .addGrid(rf.maxDepth, [2, 3, 4, 5])
                 .addGrid(rf.numTrees, [5, 10, 12])
                 .build()},
        'gbt': {'model': gbt,
                'params': ParamGridBuilder()
                  .addGrid(gbt.maxDepth, [2, 3, 4, 5])
                  .addGrid(gbt.maxIter, [15, 20, 25])
                  .build()}
    }
    return models_info

def _log( cat: str, info: str) -> None:
    """дял логирования результатов"""
    record = f'{datetime.now()} {cat} {info}'
    print(record)
        

In [20]:
process(spark, "train.parquet", "test.parquet")

2022-06-05 21:27:09.807132 INIT Train and test are loaded
2022-06-05 21:27:09.811926 INIT Evaluator created. Metric is RMSE
2022-06-05 21:27:09.816217 INIT Features for processing are ['target_audience_count', 'has_video', 'is_cpm', 'is_cpc', 'ad_cost', 'day_count']
2022-06-05 21:27:09.826906 INIT Models are loaded
2022-06-05 21:27:09.826918 PROCESSING Working with DecisionTreeRegressor
2022-06-05 21:27:09.827046 PROCESSING Starting parametrs tuning
2022-06-05 21:27:29.870218 METRIC The best metric on test for model DecisionTreeRegressor is 0.31487318624194816
2022-06-05 21:27:29.870307 PROCESSING Working with RandomForestRegressor
2022-06-05 21:27:29.870613 PROCESSING Starting parametrs tuning
2022-06-05 21:27:56.511255 METRIC The best metric on test for model RandomForestRegressor is 0.3186750437140377
2022-06-05 21:27:56.511333 PROCESSING Working with GBTRegressor
2022-06-05 21:27:56.511535 PROCESSING Starting parametrs tuning


[Stage 3951:>                                                       (0 + 2) / 2]                                                                                

2022-06-05 21:29:26.053088 METRIC The best metric on test for model GBTRegressor is 0.25587559375275926
2022-06-05 21:29:26.053211 MODEL The best model is GBTRegressor
2022-06-05 21:29:26.053218 METRIC The best metric on test is 0.25587559375275926
2022-06-05 21:29:29.855091 COMPLETED Model is saved. Path is spark_ml_model
2022-06-05 21:29:30.140378 COMPLETED Spark session is stoped. }


Здесь все работает. Подготовим второй скрипт - для получения предсказаний и сохранения результатов

In [21]:
from datetime import datetime
from tqdm import tqdm
from pathlib import Path
from typing import List
import numpy as np
import logging


MODEL_PATH = 'spark_ml_model'

log_classes = {
    'init': 'INIT',
    'metric': 'METRIC',
    'model': 'MODEL',
    'process': 'PROCESSING',
    'complete': 'COMPLETED'
}


def process(spark: SparkSession, input_file: Path, output_file: Path):
    # input_file - путь к файлу с данными для которых нужно предсказать ctr
    # output_file - путь по которому нужно сохранить файл с результатами [ads_id, prediction]
    inputs = spark.read.parquet(input_file)
    _log(log_classes['init'], 'Assets are loaded')
    # грузим модель
    model = PipelineModel.load(MODEL_PATH)
    _log(log_classes['process'], 'Getting predicions')
    # прогоняем инпуты и получаем предсказания
    outputs = model.transform(inputs)
    # сохраняем в csv
    interes = outputs.select('ad_id', 'prediction')
    _log(log_classes['process'], 'Example for preds data')
    interes.show(1)
    # сжимаем до одной партиции
    _log(log_classes['process'], 'Saving predicions')
    interes.coalesce(1) \
           .write.format("com.databricks.spark.csv") \
           .option("header", "true") \
           .save(output_file) \
    # останавливаем сессию
    spark.stop()
    _log(log_classes['complete'], 'Spark session is stoped')


def _log(cat: str, info: str) -> None:
    """для логирования результатов"""
    record = f'{datetime.now()} {cat} {info}'
    print(record)

In [23]:
spark = SparkSession.builder.appName("ML_project").getOrCreate()
process(spark, "test.parquet", "outputs.csv")

2022-06-05 21:30:21.904164 INIT Assets are loaded
2022-06-05 21:30:24.986072 PROCESSING Getting predicions
2022-06-05 21:30:25.134014 PROCESSING Example for preds data
+-----+------------------+
|ad_id|        prediction|
+-----+------------------+
|    2|3.4507970810654127|
+-----+------------------+
only showing top 1 row

2022-06-05 21:30:25.371469 PROCESSING Saving predicions
2022-06-05 21:30:26.582905 COMPLETED Spark session is stoped


Проверим, что все работает как задумано

In [24]:
spark = SparkSession.builder.appName("ML_project").getOrCreate()
df = spark.read.csv("outputs.csv", header=True)

In [25]:
df.show(5)

+-----+------------------+
|ad_id|        prediction|
+-----+------------------+
|    2|3.4507970810654127|
|    3|3.3604116301025093|
|    4|3.4629504926752888|
|   10|3.4559802785927745|
|   13|3.3604116301025093|
+-----+------------------+
only showing top 5 rows



In [26]:
df.columns

['ad_id', 'prediction']

Все отработало как необходимо - можно приступить к подготовке самих скриптов

<div style="float:left;margin:0 10px 10px 0" markdown="1">
    <a href="#footer"><img src='https://img.shields.io/badge/К содержанию-&#x21A9-blue'></a>
</div>

___
