<h1 style="text-align:center;font-size:200%;">Лабораторная работа №2</h1>
<h2 style="text-align:center;">Машинное обучение на больших данных</h2>

### Цели работы:
1. Познакомиться с базовыми алгоритмами машинного обучения;
2. Познакомиться с реализацией машинного обучения в библиотеке Spark ML;
3. Получить навыки разработки программного обеспечения для анализа данных с 
использованием` pyspark`.

### Постановка задачи
Выполнить анализ датасета с помощью двух алгоритмов машинного обучения в соответствии с индивидуальным вариантомм: 
|Индивидуальный вариант|Задача регрессии|Задача бинарной классификации|
|:--------------------:|:--------------:|:---------------------------:|
|          0           |LinearRegression|         RandomForest        |

Выполнить обучение и валидацию модели, рассчитать значения метрик классификации и 
регрессии. Выполнить подбор гиперпараметров моделей по сетке.


Вариант №2: "Датасет авиабилетов из Expedia".

Источник: https://www.kaggle.com/datasets/dilwong/flightprices

## Содержание <a class="anchor" id="content"></a>
1. [Инициализация PySpark фреймворка и загрузка данных в датафрейм pyspark](#1);
2. [Подготовка данных для модели классификации](#2);
3. [Определение пайплайна случайного леса](#3);
4. [Генерация предсказаний модели бинарной классификации](#4);
5. [Оценка модели бинарной классификации](#5);
6. [Настройка параметров бинарной классификации](#6).
7. [Подготовка данных для модели регрессии](#7);
8. [Определение пайплайна линейной регрессии](#8);
9. [Генерация предсказаний модели регрессии](#9);
10. [Оценка модели регрессии](#10);
11. [Настройка параметров регрессии](#11).

### **1. Инициализация PySpark фреймворка и загрузка данных в датафрейм pyspark** <a class="anchor" id="1"></a>

In [102]:
import numpy as np
import pandas as pd
import os

print(os.listdir("../data"))

['itineraries_mini.csv', 'itineraries_mini_fixed.csv']


### Импорт библиотек Spark SQL и Spark ML

In [103]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

### Загрузка исходных данные
Данные представляют из себя обработанный в рамках 1 лабораторной работы датасет, содержащий информацию о ценах на полеты в одну сторону самолетами согласно Expedia на период с 16.04.2022 до 05.10.2022.

In [104]:
csv = spark.read.csv('../data/itineraries_mini_fixed.csv', inferSchema=True, header=True)
csv.show(10)

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmen

### **2. Подготовка данных для модели классификации** <a class="anchor" id="2"></a>

### Подготовка данных для модели классификации (модель обучения дерева решений)
Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем *label* со значениями 1 или 0. В частности, **1** для рейса, который пролетел более 1400 миль, **0** для рейса, который пролетел менее 1400 миль.

In [105]:
data = csv.select(
    'startingAirport', 'destinationAirport', 'elapsedDays',
    ((col('isBasicEconomy')).cast('Int')),
    ((col('isRefundable')).cast('Int')),
    ((col('isNonStop')).cast('Int')),
    'baseFare', 'totalFare',
    ((col('totalTravelDistance') > 1400).cast('Int').alias('label')))
data.show(10)

+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|            ORD|               LAX|          0|             0|           0|        0|  322.79|    370.6|    1|
|            ATL|               DEN|          0|             0|           0|        0|  252.09|    293.1|    1|
|            BOS|               DEN|          0|             0|           0|        0|  278.14|    322.6|    1|
|            CLT|               DTW|          0|             0|           0|        0|  410.23|    467.6|    0|
|            IAD|               DFW|          0|             0|           0|        0|  252.09|    294.6|    0|
|            BOS|               EWR|          0|             0|           0|        0|  245.58|    287.6

### Разделим данные
Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбец *label* переименован в *trueLabel*, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [106]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed('label', 'trueLabel')
train_rows = train.count()
test_rows = test.count()
print(f'Training Rows: {train_rows} Testing Rows: {test_rows}')

Training Rows: 632303 Testing Rows: 270590


Проверим полученную обучающую выборку на баланс распределения классов.

In [107]:
train.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|311564|
|    0|320739|
+-----+------+



Как видно из результатов, классы распределены примерно поровну, что означает, что сэмплирование проводить нет необходимости.

### **3. Определение пайплайна случайного леса** <a class="anchor" id="3"></a>

### Определим пайплайн

Пайплайн состоит из ряда этапов преобразования и оценки, которые обычно подготавливают DataFrame для моделирования, а затем обучают прогнозную модель. В этом случае создадим пайплайн из восьми этапов:
* **StringIndexer**, преобразующий строковые значения в индексы для категориальных функций;
* **OneHotEncoder**, сопоставляющий столбец индексов категорий со столбцом двоичных векторов;
* **VectorAssembler**, объединяющий категориальные функции в один вектор;
* **VectorIndexer**, создающий индексы для вектора категориальных признаков;
* **VectorAssembler**, создающий вектор непрерывных числовых объектов;
* **MinMaxScaler**, нормализующий непрерывные числовые функции;
* **VectorAssembler**, создающий вектор категориальных и непрерывных функций;
* **RandomForestClassifier**, обучающая модель классификации на основе алгоритма случайного леса.

In [108]:
strIdx = StringIndexer(inputCols = ['startingAirport', 'destinationAirport'], outputCols = ['startingAirportIdx', 'destinationAirportIdx'])
oneHotEnc = OneHotEncoder(inputCols=['startingAirportIdx', 'destinationAirportIdx'], outputCols=['startingAirportEnc', 'destinationAirportEnc'])
catVect = VectorAssembler(inputCols=['startingAirportEnc', 'destinationAirportEnc', 'isBasicEconomy', 'isRefundable', 'isNonStop'], outputCol='catFeatures')
# catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol='idxCatFeatures')
numVect = VectorAssembler(inputCols=['baseFare', 'totalFare', 'elapsedDays'], outputCol='numFeatures')
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='normFeatures')
# featVect = VectorAssembler(inputCols=['idxCatFeatures', 'normFeatures'], outputCol='features')
featVect = VectorAssembler(inputCols=['catFeatures', 'normFeatures'], outputCol='features')
rf = RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=10, maxDepth=8, maxBins=16)
# pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, rf])
pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, numVect, minMax, featVect, rf])

### Запустим пайплайн для обучения модели
Запустим пайплайн в качестве оценщика данных обучения, чтобы обучить модель.

In [109]:
pipelineModel = pipeline.fit(train)

### **4. Генерация предсказаний модели бинарной классификации** <a class="anchor" id="4"></a>

### Создание прогнозов для меток
Преобразуем тестовые данные со всеми этапами и обученной моделью в пайплайне, чтобы генерировать прогнозы меток.

In [110]:
prediction = pipelineModel.transform(test)
predicted = prediction.select('features', 'prediction', 'trueLabel')
predicted.show(100, truncate=False)

+---------------------------------------------------------------------+----------+---------+
|features                                                             |prediction|trueLabel|
+---------------------------------------------------------------------+----------+---------+
|(36,[8,18,33,34],[1.0,1.0,0.02277136637306785,0.025314632382846036]) |0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.02277136637306785,0.025314632382846036]) |0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.02277136637306785,0.025314632382846036]) |0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.026724475575432425,0.028845067558407517])|0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.026724475575432425,0.028845067558407517])|0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.026724475575432425,0.028845067558407517])|0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.026724475575432425,0.029268719779474893])|0.0       |0        |
|(36,[8,18,33,34],[1.0,1.0,0.026724475575432425,0.029268719779474893])

Глядя на результаты, некоторые значения trueLabel 1 прогнозируются как 0. Оценим модель.

### **5. Оценка модели бинарной классификации** <a class="anchor" id="5"></a>

## Оценка модели классификации
Рассчитаем *Confusion Matrix* и *Область под ROC* (рабочую характеристику приемника), чтобы оценить модель.

### Вычисление Confusion Matrix
Классификаторы обычно оцениваются путем создания *Confusion Matrix*, которая указывает количество:
- True Positives
- True Negatives
- False Positives
- False Negatives

На основе этих основных показателей можно рассчитать другие показатели оценки, такие как *precision*, *recall* и *F1*.

In [111]:
def show_metrics(df):
    tp = float(df.filter('prediction == 1.0 AND truelabel == 1').count())
    fp = float(df.filter('prediction == 1.0 AND truelabel == 0').count())
    tn = float(df.filter('prediction == 0.0 AND truelabel == 0').count())
    fn = float(df.filter('prediction == 0.0 AND truelabel == 1').count())

    pr, re, f1 = 0.0, 0.0, 0.0

    tpPlusFp = tp + fp
    tpPlusFn = tp + fn

    if tpPlusFp != 0:
        pr = tp / (tp + fp)
    
    if tpPlusFn != 0:
        re = tp / (tp + fn)

    prPlusRe = pr + re
    
    if prPlusRe != 0:
        f1 = 2 * pr * re / prPlusRe
    
    metrics = spark.createDataFrame([
            ('TP', tp),
            ('FP', fp),
            ('TN', tn),
            ('FN', fn),
            ('Precision', pr),
            ('Recall', re),
            ('F1', f1)
        ],
        ['metric', 'value']
    )
    
    metrics.show()

In [112]:
show_metrics(predicted)

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          109901.0|
|       FP|           15896.0|
|       TN|          121062.0|
|       FN|           23731.0|
|Precision|0.8736376861133414|
|   Recall|0.8224152897509579|
|       F1|0.8472530056393078|
+---------+------------------+



*Precision* и *Recall* выглядят хорошо.

### Обзор области под ROC
Другой способ оценить производительность модели классификации — измерить площадь под кривой ROC (рабочей характеристики приемника) модели. Библиотека spark.ml включает класс **BinaryClassificationEvaluator**, который можно использовать для этого. Кривая ROC показывает уровни истинного положительного и ложноположительного результатов, построенные для различных пороговых значений.

In [113]:
def calculate_aur(df, labelCol, rawPredictionCol):
    evaluator = BinaryClassificationEvaluator(labelCol=labelCol, rawPredictionCol=rawPredictionCol, metricName='areaUnderROC')
    return evaluator.evaluate(df)

In [114]:
aur = calculate_aur(prediction, 'trueLabel', 'rawPrediction')
print(f'AUR = {aur}')

AUR = 0.9272509177931482


Итак, AUR показывает, что с моделью все в порядке. Посмотрим глубже.

### Просмотр необработанного прогноза и вероятности
Прогноз основан на количественной оценке степени уверенности модели в правильном ответе прогноза, которая описывает помеченную точку функции случайного леса. Это необработанное предсказание затем преобразуется в предсказанную метку 0 или 1 на основе вектора вероятности, который указывает достоверность для каждого возможного значения метки (в данном случае 0 и 1). В качестве прогноза выбирается значение с наибольшей достоверностью.

In [115]:
prediction.select('rawPrediction', 'probability', 'prediction', 'trueLabel').show(100, truncate=False)

+--------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                         |probability                             |prediction|trueLabel|
+--------------------------------------+----------------------------------------+----------+---------+
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829013665]|0.0       |0        |
|[7.302173517098633,2.6978264829013665]|[0.7302173517098633,0.26978264829

Обратим внимание, что результаты включают строки, в которых вероятность 0 (первое значение вектора вероятности) лишь немного выше вероятности 1 (второе значение вектора вероятности). Порог дискриминации по умолчанию (граница, которая решает, будет ли вероятность прогнозироваться как 1 или 0) установлен на 0,5; поэтому всегда используется прогноз с наибольшей вероятностью, независимо от того, насколько он близок к порогу.

И из приведенных выше результатов мы видим, что для тех *trueLabel* единиц, которые мы предсказали как 0, у многих из них вероятность 1 лишь немного меньше порога 0,5.

### **6. Настройка параметров бинарной классификации** <a class="anchor" id="6"></a>

## Настройка параметров
Чтобы найти наиболее эффективные параметры, мы можем использовать класс **CrossValidator** для оценки каждой комбинации параметров, определенных в **ParameterGrid**, по нескольким *сгибам* данных, разделенных на наборы данных для обучения и проверки. Обратим внимание, что это может занять много времени, поскольку каждая комбинация параметров проверяется несколько раз.

### Изменим порог дискриминации
Показатель AUC указывает на достаточно хорошую модель, но показатели производительности, по-видимому, указывают на то, что она прогнозирует большое количество *ложноотрицательных (False Negative)* меток (т. е. она прогнозирует 0, когда истинная метка равна 1), что приводит к низкому значению *recall* . Мы можем улучшить это, снизив порог. И наоборот, иногда нам может потребоваться устранить большое количество *ложноположительных (False Positive)* значений путем повышения порога.

В этом случае позволим **CrossValidator** найти лучшую максимальную глубину дерева от 2, 5 и 10, максимальное количество интервалов для дискретизации непрерывных функций от 8, 16 до 32 и количество деревьев от 5, 10 до 20.

In [116]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 5, 10]) \
    .addGrid(rf.maxBins, [8, 16, 32]) \
    .addGrid(rf.numTrees, [5, 10, 20]) \
    .build()

cv = CrossValidator(
    estimator=pipeline,
    evaluator=BinaryClassificationEvaluator(),
    estimatorParamMaps=paramGrid,
    numFolds=2
)

model = cv.fit(train)

In [117]:
newPrediction = model.transform(test)
newPredicted = newPrediction.select('features', 'prediction', 'trueLabel')
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0|        0|
|(36,[8,18,33,34],...|       0.0

In [118]:
# Пересчитаем Confusion Matrix
show_metrics(newPredicted)

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          112346.0|
|       FP|           14318.0|
|       TN|          122640.0|
|       FN|           21286.0|
|Precision|0.8869607781216446|
|   Recall|0.8407118055555556|
|       F1|0.8632172603497557|
+---------+------------------+



In [119]:
# Пересчитаем область под ROC
aur2 = calculate_aur(newPrediction, 'trueLabel', 'rawPrediction')
print(f'AUR = {aur2}')

AUR = 0.9429608607187636


Выглядит довольно хорошо! Новая модель улучшает показатель *recall* с 0,82 до 0,85, показатель *F1* с 0,84 до 0,87 без ущерба для других показателей.

## Сохранение модели

Сохраним лучшую модель для дальнейшего использования.

In [120]:
# model.save('./model')

### **7. Подготовка данных для модели регрессии** <a class="anchor" id="7"></a>

Проделаем тоже самое, но с только в рамках задачи с регрессией. Будем прогнозировать пройденное расстояние за рейс.

In [121]:
data = csv.select(
    'startingAirport', 'destinationAirport', 'elapsedDays',
    ((col('isBasicEconomy')).cast('Int')),
    ((col('isRefundable')).cast('Int')),
    ((col('isNonStop')).cast('Int')),
    'baseFare', 'totalFare',
    ((col('totalTravelDistance')).alias('label')))
data.show(10)

+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|            ORD|               LAX|          0|             0|           0|        0|  322.79|    370.6| 2186|
|            ATL|               DEN|          0|             0|           0|        0|  252.09|    293.1| 1574|
|            BOS|               DEN|          0|             0|           0|        0|  278.14|    322.6| 1806|
|            CLT|               DTW|          0|             0|           0|        0|  410.23|    467.6| 1157|
|            IAD|               DFW|          0|             0|           0|        0|  252.09|    294.6| 1266|
|            BOS|               EWR|          0|             0|           0|        0|  245.58|    287.6

### **8. Определение пайплайна линейной регрессии** <a class="anchor" id="8"></a>

Возвращаясь к построению предыдущего пайплайна, заменим *RandomForestClassifier* на *LinearRegression*:
* **LinearRegression**, обучающая модель линейной регрессии.

In [122]:
strIdx = StringIndexer(inputCols = ['startingAirport', 'destinationAirport'], outputCols = ['startingAirportIdx', 'destinationAirportIdx'])
oneHotEnc = OneHotEncoder(inputCols=['startingAirportIdx', 'destinationAirportIdx'], outputCols=['startingAirportEnc', 'destinationAirportEnc'])
catVect = VectorAssembler(inputCols = ['startingAirportEnc', 'destinationAirportEnc', 'isBasicEconomy', 'isRefundable', 'isNonStop'], outputCol='catFeatures')
# catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = 'idxCatFeatures')
numVect = VectorAssembler(inputCols = ['baseFare', 'totalFare', 'elapsedDays'], outputCol='numFeatures')
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol='normFeatures')
# featVect = VectorAssembler(inputCols=['idxCatFeatures', 'normFeatures'], outputCol='features')
featVect = VectorAssembler(inputCols=['catFeatures', 'normFeatures'], outputCol='features')
lr = LinearRegression(labelCol='label', featuresCol='features')
# pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, lr])
pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, numVect, minMax, featVect, lr])

In [123]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed('label', 'trueLabel')
train_rows = train.count()
test_rows = test.count()
print(f'Training Rows: {train_rows} Testing Rows: {test_rows}')

Training Rows: 631847 Testing Rows: 271046


In [124]:
pipelineModel = pipeline.fit(train)

### **9. Генерация предсказаний модели регрессии** <a class="anchor" id="9"></a>

In [125]:
linearPrediction = pipelineModel.transform(test)
linearPredicted = linearPrediction.select('features', 'prediction', 'trueLabel')
linearPredicted.show(100, truncate=False)

+---------------------------------------------------------------------+------------------+---------+
|features                                                             |prediction        |trueLabel|
+---------------------------------------------------------------------+------------------+---------+
|(36,[8,18,33,34],[1.0,1.0,0.019177943337894684,0.021955831214155584])|938.6597722177378 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |9

### **10.Оценка модели регрессии** <a class="anchor" id="10"></a>

Оценим качество полученной модели регрессии при помощи таких метрик, как *RMSE* и *R2*, а также выведем *std*.

In [126]:
def evaluate_model(prediction, labelCol, predictionCol, metricName):
    evaluator = RegressionEvaluator(labelCol=labelCol, predictionCol=predictionCol, metricName=metricName)
    return evaluator.evaluate(prediction)

In [127]:
rmse = evaluate_model(linearPrediction, 'trueLabel', 'prediction', 'rmse')
print(f'Metric "RMSE" on test data: {rmse:.3f}')

Metric "RMSE" on test data: 533.759


In [128]:
data.select(stddev('label')).show()

+-----------------+
|    stddev(label)|
+-----------------+
|831.9128474823857|
+-----------------+



In [129]:
r2 = evaluate_model(linearPrediction, 'trueLabel', 'prediction', 'r2')
print(f'Metric "R2" on test data: {r2:.3f}')

Metric "R2" on test data: 0.588


Полученная модель регрессии по результатам вышеопределенных метрик показывает хорошие результаты.

### **11. Настройка параметров регрессии** <a class="anchor" id="11"></a>

Показатель *RMSE* и *R2* указывает на достаточно хорошую модель, но попробуем улучшить это, позволив **CrossValidator** найти лучший регуляризационный параметр от 0.0, 0.3 и 0.5 и максимальное число итераций от 50, 100 до 150.

In [130]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.3, 0.5]) \
    .addGrid(lr.maxIter, [50, 100, 150]) \
    .build()

linearCv = CrossValidator(
    estimator=pipeline, 
    evaluator=RegressionEvaluator(), 
    estimatorParamMaps=paramGrid, 
    numFolds=2
)

model = linearCv.fit(train)

In [131]:
newLinearPrediction = model.transform(test)
newLinearPredicted = newLinearPrediction.select('features', 'prediction', 'trueLabel')
newLinearPredicted.show(100, truncate=False)

+---------------------------------------------------------------------+------------------+---------+
|features                                                             |prediction        |trueLabel|
+---------------------------------------------------------------------+------------------+---------+
|(36,[8,18,33,34],[1.0,1.0,0.019177943337894684,0.021955831214155584])|938.6597722177378 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |947      |
|(36,[8,18,33,34],[1.0,1.0,0.02534083422026253,0.028104843640742144]) |962.5942884189001 |9

In [132]:
rmse = evaluate_model(newLinearPrediction, 'trueLabel', 'prediction', 'rmse')
print(f'Metric "RMSE" on test data: {rmse:.3f}')

Metric "RMSE" on test data: 533.759


In [133]:
r2 = evaluate_model(newLinearPrediction, 'trueLabel', 'prediction', 'r2')
print(f'Metric "R2" on test data: {r2:.3f}')

Metric "R2" on test data: 0.588


Полученные результаты метрик *RMSE* и *R2* остались прежними, что означает, что прежние значения регуляризационного параметра и максимального количества итераций оказались лучшими.

### Выводы

В ходе лабораторной работы были ознакомлены с алгоритмами машинного обучения, познакомились с реализацией машинного обучения в библиотеке Spark ML и получили навыки разработки программного обеспечения для анализа данных с
использованием pyspark. 