
<h2 style="text-align:center;font-size:200%;;">Лабораторная работа №2 </h2>
<h3  style="text-align:center;"><span class="label label-success">Машинное обучение на больших данных</span>


## Цель и задачи работы:
1. Познакомиться с базовыми алгоритмами машинного обучения;
2. Познакомиться с реализацией машинного обучения в библиотеке Spark ML.
3. Получить навыки разработки программного обеспечения для анализа данных с
использованием pyspark.

## Выполнение работы
### 1. Импорт необходимых библиотек и датасета

In [129]:
import numpy as np 
import pandas as pd
import os
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import seaborn as sns

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

print(os.listdir("./data"))

['.ipynb_checkpoints', 'endomondoHR.json', 'flights.csv', 'lab2.ipynb', 'new_data.csv']


### Загрузка данных
Мною был использован датасет из предыдущей работы

In [77]:
csv = spark.read.csv('data/new_data.csv', inferSchema=True, header=True)
csv.show()

+-------+---------+------------------+------------------+------------------+-------------------+
|site_id|period_id|actual_consumption|         actual_pv|           load_01|              pv_01|
+-------+---------+------------------+------------------+------------------+-------------------+
|     14|        0|7.4411982744932175|               0.0|  6.72386622388764|0.02844860772623234|
|     14|        0|7.4411982744932175|               0.0| 7.139108644895107|0.02844860772623234|
|     14|        0| 5.580898705869913|               0.0| 6.919577543883818|0.02844860772623234|
|     14|        0|7.4411982744932175|               0.0| 7.229174824131117|0.02844860772623234|
|     14|        0| 5.580898705869913|               0.0|  7.00607084262522|0.02844860772623234|
|     14|        0|7.4411982744932175|               0.0| 6.946253004954323|0.02844860772623234|
|     14|        0|7.4411982744932175|               0.0|7.8689995679947415|0.02844860772623234|
|     14|        0|7.441198274

## Выполнение анализа датасета с помощью:
### 1. Задача регресии (LinearRegression)
Линейная регрессия — это метод анализа данных, который предсказывает ценность неизвестных данных с помощью другого связанного и известного значения данных. Он математически моделирует неизвестную или зависимую переменную и известную или независимую переменную в виде линейного уравнения.


### Подготовка данных

In [79]:
data = csv.select("site_id", "period_id", "actual_consumption", "actual_pv",
                     "pv_01",((col("load_01") > 10).cast("Int").alias("label")))
data.show(10)

+-------+---------+------------------+-----------------+-------------------+-----+
|site_id|period_id|actual_consumption|        actual_pv|              pv_01|label|
+-------+---------+------------------+-----------------+-------------------+-----+
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0| 5.580898705869913|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0| 5.580898705869913|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|   

### Разделение данных
70% данных будут использованы для обучения, а 30% - для тестирования. В данных тестирования переименуем столбец меток в trueLabel, чтобы использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [80]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 53834  Testing Rows: 23112


### Создание пайплайна для обучения модели
Пайплайн состоит из ряда этапов преобразования и оценки, которые обычно подготавливают DataFrame для моделирования, а затем обучают прогнозную модель. 

In [103]:
catVect = VectorAssembler(inputCols=[ "site_id", "period_id"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv", "pv_01"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
reg_pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])

### Обучение пайплайна и получение предсказаний

In [104]:
reg_model = reg_pipeline.fit(train)

### Создание прогнозов

In [112]:
reg_predictions = reg_model.transform(test)
reg_predicted = reg_predictions.select("features", "prediction", "trueLabel")
reg_predicted.show(10, truncate=False)

+---------------------------------------------------------------------+----------+---------+
|features                                                             |prediction|trueLabel|
+---------------------------------------------------------------------+----------+---------+
|(5,[2,4],[0.0857142857142857,0.16386691372286882])                   |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.16605913054494645])                   |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.1807438470882614])                    |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.19123321019504805])                   |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.20773066396307965])                   |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.22157196508454452])                   |0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.006791452845966323,0.1602882272974758] |0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.014605862485948884,0.20024982690032406]

### Расчитываем метрики для регресии

In [118]:
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_predicted)
print("RMSE = ", rmse)

reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_predicted)
print("R² = ", r2)

RMSE =  0.4376097004334626
R² =  0.20444522053609115


In [119]:
data.select(stddev('label')).show()

+------------------+
|     stddev(label)|
+------------------+
|0.4896732691791974|
+------------------+



## 2. Задачи бинарной классификации
С использованием алгоритма Gradient Boosting Machine. Градиентный бустинг — это популярный алгоритм бустинга в машинном обучении, используемый для задач классификации и регрессии. Бустинг — это один из методов ансамблевого обучения, который обучает модель последовательно, и каждая новая модель пытается исправить предыдущую. Он объединяет несколько слабых учеников в сильных. Существует два самых популярных алгоритма бустинга, т.е.

In [125]:
data = csv.select("site_id", "period_id", "actual_consumption", "actual_pv",
                     "pv_01",((col("load_01") > 10).cast("Int").alias("label")))
data.show(10)

+-------+---------+------------------+-----------------+-------------------+-----+
|site_id|period_id|actual_consumption|        actual_pv|              pv_01|label|
+-------+---------+------------------+-----------------+-------------------+-----+
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0| 5.580898705869913|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0| 5.580898705869913|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|     14|        0|7.4411982744932175|              0.0|0.02844860772623234|    0|
|   

In [126]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 54002  Testing Rows: 22944


In [134]:
catVect = VectorAssembler(inputCols=[ "site_id", "period_id"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="idxCatFeatures")
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv", "pv_01"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='normFeatures')
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, gbt])

In [135]:
piplineModel = pipeline.fit(train)

In [138]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(10, truncate=False)

+----------------------------------------------------------------------+----------+---------+
|features                                                              |prediction|trueLabel|
+----------------------------------------------------------------------+----------+---------+
|(5,[2,4],[0.0857142857142857,0.16531738409356708])                    |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.20773066396307965])                    |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.22157196508454452])                    |0.0       |0        |
|(5,[2,4],[0.0857142857142857,0.3589852964288024])                     |0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.0015848728481653574,0.16827708451285758]|0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.005350349520991073,0.15600175970521876] |0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.007504402217784269,0.22677710970009074] |0.0       |0        |
|[0.0,0.0,0.0857142857142857,0.020939228076570865,0.13872772

### Оценка прогноза

In [139]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            8015.0|
|       FP|             927.0|
|       TN|           12811.0|
|       FN|            1191.0|
|Precision|0.8963319167971371|
|   Recall|0.8706278514012601|
|       F1|0.8832929248402027|
+---------+------------------+



### Оценка модели с использованием метрики AUR

In [146]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

#Прогноз основан на необработанной оценке прогноза, которая описывает помеченную точку логистической функции. 
# Это необработанное предсказание затем преобразуется в предсказанную метку 0 или 1 на основе вектора вероятности, 
# который указывает достоверность для каждого возможного значения метки (в данном случае 0 и 1).
# В качестве прогноза выбирается значение с наибольшей достоверностью.
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(10, truncate=False)

AUR =  0.9628242341460674
+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.306153843463416,-1.306153843463416]  |[0.9316494932734865,0.06835050672651355]|0.0       |0        |
|[1.2736315143022174,-1.2736315143022174]|[0.9273894271507156,0.0726105728492844] |0.0       |0        |
|[1.2736315143022174,-1.2736315143022174]|[0.9273894271507156,0.0726105728492844] |0.0       |0        |
|[1.2736315143022174,-1.2736315143022174]|[0.9273894271507156,0.0726105728492844] |0.0       |0        |
|[1.306153843463416,-1.306153843463416]  |[0.9316494932734865,0.06835050672651355]|0.0       |0        |
|[1.3186016396853792,-1.3186016396853792]|[0.9332178774803361,0.06678212251966387]|0.0       |0        |
|[1.2880779539592164,-1.28807

### Подбор гиперпарамтеров
Чтобы найти наиболее эффективные параметры, мы можем использовать класс CrossValidator для оценки каждой комбинации параметров, определенных в ParameterGrid, по нескольким сгибам данных, разделенных на наборы данных для обучения и проверки.

In [143]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [147]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|(5,[2,4],[0.08571...|       0.0|        0|
|(5,[2,4],[0.08571...|       0.0|        0|
|(5,[2,4],[0.08571...|       0.0|        0|
|(5,[2,4],[0.08571...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0|        0|
|[0.0,0.0,0.085714...|       0.0

## Оцениваем прогноз после изменения

In [149]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            8015.0|
|       FP|             927.0|
|       TN|           12811.0|
|       FN|            1191.0|
|Precision|0.8963319167971371|
|   Recall|0.8706278514012601|
|       F1|0.8832929248402027|
+---------+------------------+



In [150]:
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print( "AUR2 = ", aur2)

AUR2 =  0.9628242341460675
