## Лабораторная работа №2 по дисциплине "Системы обработки больших данных"
### Подключаем библиотеки и отфильтрованный датасет из ЛР № 1
#### Постановка задачи:
**Цель и задачи работы:**
1. Познакомиться с базовыми алгоритмами машинного обучения;

2. Познакомиться с реализацией машинного обучения в библиотеке Spark ML.

3. Получить навыки разработки программного обеспечения для анализа данных с использованием pyspark.

**Необходимо выполнить анализ обработанного датасета с помощью двух алгоритмов машинного обучения:**

*Задача регрессии* - RandomForest

*Задача бинарной классификации* - LogisticRegression

**При анализе датасета предпочтительно использовать признаки, показавшие наилучшую корреляцию при выполнении разведочного анализа. Для задачи классификации использовать бинарный признак.
Необходимо выполнить обучение и валидацию модели, рассчитайте значения метрик классификации и регрессии. Выполните подбор гиперпараметров моделей по сетке.**

### Задача регрессии - RandomForest

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator


spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = 'processed_work/data/sampled.csv'
df = spark.read.csv(filename_data, inferSchema=True, header=True)
df.show(10)
df.printSchema()

+---------------+------------------+-----------+--------------+---------+--------+---------+--------------+-------------------+----------------------------+
|startingAirport|destinationAirport|elapsedDays|isBasicEconomy|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|totalFlightDurationInSeconds|
+---------------+------------------+-----------+--------------+---------+--------+---------+--------------+-------------------+----------------------------+
|            ATL|               BOS|        0.0|         false|     true|  367.98|   410.18|           1.0|              947.0|                      9840.0|
|            ATL|               DEN|        0.0|         false|    false|  262.33|   305.61|           7.0|             1375.0|                     16800.0|
|            ATL|               DFW|        0.0|         false|    false|  174.88|    211.6|           2.0|             1399.0|                     17160.0|
|            ATL|               MIA|        0.0|         f

##### Будем использовать только числовые признаки показавшие наилучшую корреляцию

In [2]:
data = df.select("totalFlightDurationInSeconds", "totalTravelDistance", "baseFare", "totalFare")
data.show(10)

+----------------------------+-------------------+--------+---------+
|totalFlightDurationInSeconds|totalTravelDistance|baseFare|totalFare|
+----------------------------+-------------------+--------+---------+
|                      9840.0|              947.0|  367.98|   410.18|
|                     16800.0|             1375.0|  262.33|   305.61|
|                     17160.0|             1399.0|  174.88|    211.6|
|                     20040.0|             1866.0|  503.51|   564.87|
|                     25680.0|             2393.0|  528.37|    596.1|
|                      7320.0|              667.0|  636.28|    698.6|
|                     21900.0|             2161.0|  496.74|    557.6|
|                      8940.0|              728.0|  734.88|    804.6|
|                     22200.0|             2054.0|   301.4|   347.61|
|                     10620.0|              862.0|  185.58|    214.1|
+----------------------------+-------------------+--------+---------+
only showing top 10 

#### Разделим данные на обучающую и тестовую выборку

##### Будем использовать 75% данных для обучения и 25% для тестирования.

In [3]:
splits = data.randomSplit([0.75, 0.25])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Тренировочные строки:", train_rows, " Тестовые сроки:", test_rows)

Тренировочные строки: 561774  Тестовые сроки: 187691


#### Определим конвейер
##### Конвейер состоит из серии этапов преобразования и оценки, которые обычно подготавливают фрейм данных для моделирования, а затем обучают прогнозирующую модель. В этом случае создадем конвейер с двумя этапами:

1. **VectorAssembler:** Создает вектор непрерывных числовых признаков.

2. **RandomForest:** Обучает модель регрессии.

In [4]:
numVect = VectorAssembler(inputCols = ["totalFlightDurationInSeconds", "baseFare", "totalFare"], outputCol="features")

rf = RandomForestRegressor(featuresCol = 'features', labelCol='totalTravelDistance')

pipeline = Pipeline(stages=[numVect, rf])

##### Запускаем конвейер

In [5]:
piplineModel = pipeline.fit(train)

##### Создаем предсказанные значения

In [6]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "totalTravelDistance")
predicted.show(20)

+--------------------+-----------------+-------------------+
|            features|       prediction|totalTravelDistance|
+--------------------+-----------------+-------------------+
|[2760.0,185.12,21...|701.3266018343759|               97.0|
|[2880.0,147.91,17...| 635.289249234229|               97.0|
|[2880.0,157.21,18...|618.3293170203896|               97.0|
|[2880.0,157.21,18...|618.3293170203896|               97.0|
|[2880.0,157.21,18...|618.3293170203896|               97.0|
|[2880.0,199.07,22...|699.1229732911586|               97.0|
|[2880.0,226.98,25...| 724.254770711324|               97.0|
|[2880.0,385.12,42...|1010.290848164777|               97.0|
|[2940.0,282.79,31...|929.2455577626982|               97.0|
|[3120.0,166.51,19...|618.3293170203896|               97.0|
|[3840.0,171.16,19...|644.0572175363257|              228.0|
|[4020.0,171.16,19...|644.0572175363257|              228.0|
|[4020.0,189.77,21...|699.1229732911586|              228.0|
| [4080.0,45.58,63.6]|38

##### Метрики для оценки модели

In [7]:
rfEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="totalTravelDistance", metricName="rmse")
# RMSE
rmse = rfEvaluator.evaluate(prediction)
print(f"The RMSE for the RandomForest regression model is {rmse:0.2f}")
# MSE
mse = rfEvaluator.setMetricName("mse").evaluate(prediction)
print(f"The MSE for the RandomForest regression model is {mse:0.2f}")
# R2
r2 = rfEvaluator.setMetricName("r2").evaluate(prediction)
print(f"The R2 for the RandomForest regression model is {r2:0.2f}")
# MAE
mae = rfEvaluator.setMetricName("mae").evaluate(prediction)
print(f"The MAE for the RandomForest regression model is {mae:0.2f}")

The RMSE for the RandomForest regression model is 301.71
The MSE for the RandomForest regression model is 91030.46
The R2 for the RandomForest regression model is 0.87
The MAE for the RandomForest regression model is 233.95


##### Создадим параметрическую сетку для настройки модели

In [8]:
rfparam_grid = (ParamGridBuilder()
    .addGrid(rf.maxDepth, [2, 5, 10])
    .addGrid(rf.maxBins, [5, 10, 20])
    .addGrid(rf.numTrees, [5, 20, 50])
    .build())

##### Создадим кросс-валидатор

In [14]:
rfcv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=rfparam_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="totalTravelDistance", \
                                metricName="rmse"), \
                    numFolds=5)

##### Запускаем конвейер с поиском оптимальных параметров

In [15]:
rfcvModel = rfcv.fit(train)

##### Создаем предсказанные значения

In [16]:
rfPredictions = rfcvModel.transform(test)

##### Метрики для оценки модели

In [18]:
# RMSE
rmse = rfEvaluator.evaluate(rfPredictions)
print(f"The RMSE for the RandomForest regression model is {rmse:0.2f}")
# MSE
mse = rfEvaluator.setMetricName("mse").evaluate(rfPredictions)
print(f"The MSE for the RandomForest regression model is {mse:0.2f}")
# R2
r2 = rfEvaluator.setMetricName("r2").evaluate(rfPredictions)
print(f"The R2 for the RandomForest regression model is {r2:0.2f}")
# MAE
mae = rfEvaluator.setMetricName("mae").evaluate(rfPredictions)
print(f"The MAE for the RandomForest regression model is {mae:0.2f}")

The RMSE for the RandomForest regression model is 186.48
The MSE for the RandomForest regression model is 60709.41
The R2 for the RandomForest regression model is 0.92
The MAE for the RandomForest regression model is 186.48


### Задача классификации - LogisticRegression
##### Используем бинарный признак isNonStop т.к. он более сбалансирован в отличии от isBasicEconomy
Однако сначала добавим колонку label с преобразованными значениями и узнаем их соотношение.

In [19]:
df = df.withColumn("label", when(col("isNonStop") == True, 1).otherwise(0))

In [20]:
label_counts = df.groupBy("label").count()
label_counts.show()

+-----+------+
|label| count|
+-----+------+
|    1|211175|
|    0|538290|
+-----+------+



Соотношение несбалансированное, поэтому применим oversampling.

In [22]:
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
# Подсчет количества элементов с меткой 1 и 0
count_label_1 = df.filter(col("label") == 1).count()
count_label_0 = df.filter(col("label") == 0).count()

df_label_1 = df.filter(col("label") == 1)

# Определение коэффициента oversampling
oversample_factor = int(count_label_0 / count_label_1)

# Применение oversampling только к данным с label = 1
df_oversampled_label_1 = df_label_1.withColumn("dummy", F.explode(F.array([F.lit(x) for x in range(oversample_factor)]))).drop("dummy")

# Объединение исходного DataFrame с увеличенными данными с label = 1
df_oversampled = df.union(df_oversampled_label_1)


In [23]:
label_counts = df_oversampled.groupBy("label").count()
label_counts.show()

+-----+------+
|label| count|
+-----+------+
|    1|633525|
|    0|538290|
+-----+------+



Теперь строк с меткой 1 стало намного больше. Можем приступать к дальнейшим действиям.

#### Разделим данные на обучающую и тестовую выборку
##### Будем использовать 75% данных для обучения и 25% для тестирования.

In [24]:
data = df_oversampled.select("elapsedDays", "baseFare", "totalFare", "seatsRemaining", "totalTravelDistance", "totalFlightDurationInSeconds", "label")
data.show(10)

+-----------+--------+---------+--------------+-------------------+----------------------------+-----+
|elapsedDays|baseFare|totalFare|seatsRemaining|totalTravelDistance|totalFlightDurationInSeconds|label|
+-----------+--------+---------+--------------+-------------------+----------------------------+-----+
|        0.0|  367.98|   410.18|           1.0|              947.0|                      9840.0|    1|
|        0.0|  262.33|   305.61|           7.0|             1375.0|                     16800.0|    0|
|        0.0|  174.88|    211.6|           2.0|             1399.0|                     17160.0|    0|
|        0.0|  503.51|   564.87|           5.0|             1866.0|                     20040.0|    0|
|        0.0|  528.37|    596.1|           2.0|             2393.0|                     25680.0|    0|
|        0.0|  636.28|    698.6|           3.0|              667.0|                      7320.0|    1|
|        0.0|  496.74|    557.6|           2.0|             2161.0|      

In [25]:
splits = data.randomSplit([0.75, 0.25])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Тренировочные строки:", train_rows, " Тестовые сроки:", test_rows)

Тренировочные строки: 880009  Тестовые сроки: 291806


#### Определим конвейер
##### Конвейер состоит из серии этапов преобразования и оценки, которые обычно подготавливают фрейм данных для моделирования, а затем обучают прогнозирующую модель. В этом случае вы создадите конвейер с тремя этапами:
1. **VectorAssembler:** Создает вектор непрерывных числовых признаков.

2. **MinMaxScaler:** Нормализует непрерывные числовые характеристики.

3. **Logistic Regression:** Обучает модель классификации.

In [26]:
numVect = VectorAssembler(inputCols = ["elapsedDays", "baseFare", "totalFare", "seatsRemaining", "totalTravelDistance", "totalFlightDurationInSeconds"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="features")

lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=3,
                        regParam=0.3)

pipeline = Pipeline(stages=[numVect, minMax, lr])

##### Запускаем конвейер

In [27]:
lrModel = pipeline.fit(train)

##### Создаем предсказываемые значения

In [28]:
prediction = lrModel.transform(test)
predicted = prediction.select("features", "prediction", "label")
predicted.show(20)

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|[0.0,0.0,0.0,0.4,...|       1.0|    1|
|[0.0,0.0123483017...|       1.0|    1|
|[0.0,0.0123483017...|       1.0|    1|
|[0.0,0.0148179621...|       1.0|    1|
|[0.0,0.0148179621...|       1.0|    1|
|[0.0,0.0148179621...|       1.0|    1|
|[0.0,0.0148179621...|       1.0|    1|
|[0.0,0.0148179621...|       1.0|    1|
|[0.0,0.0180710093...|       1.0|    1|
|[0.0,0.0180710093...|       1.0|    1|
|[0.0,0.0180710093...|       1.0|    1|
|[0.0,0.0185357304...|       1.0|    1|
|[0.0,0.0185357304...|       1.0|    1|
|[0.0,0.0205406697...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
|[0.0,0.0242451602...|       1.0|    1|
+--------------------+----------+-----+
only showing top 20 rows



##### Метрики оценки модели классификации

In [29]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          128538.0|
|       FP|           52597.0|
|       TN|           81387.0|
|       FN|           29284.0|
|Precision|0.7096254175062798|
|   Recall|0.8144491895933393|
|       F1|0.7584324855365134|
+---------+------------------+



##### Оценка качества модели бинарной классификации с использованием метрики AUR

In [30]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.8346604801319116


##### Настройка модели классификации с использованием кросс-валидации:

In [31]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, np.arange(0, 1, 0.05)).addGrid(lr.maxIter, [5, 10]).build()
lrcv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

##### Запускаем конвейер

In [32]:
lrcvModel = lrcv.fit(train)

##### Создаем предсказанные значения

In [33]:
newPrediction = lrcvModel.transform(test)

##### Метрики оценки обновленной модели классификации

In [34]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND label == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND label == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND label == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND label == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          151831.0|
|       FP|            5661.0|
|       TN|          128323.0|
|       FN|            5991.0|
|Precision|0.9640553170954715|
|   Recall|0.9620395128689283|
|       F1|0.9630463601362451|
+---------+------------------+



##### Оценка качества модели бинарной классификации с использованием метрики AUR после кросс-валидации

In [35]:
newEvaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
newAur = newEvaluator.evaluate(newPrediction)
print( "AUR2 = ", newAur)

AUR2 =  0.959894099639623
