<h2 style="text-align:center;font-size:200%;;">Лаб 2</h2>

# **1. Подготовка данных** <a class="anchor" id="1"></a>

Импорты.


In [84]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [85]:
csv = spark.read.csv('data/archive/clear_daily_dataset.csv', inferSchema=True, header=True)
csv.show(10)

+-------------------+-------------------+------------------+-------------------+------------------+----------+
|      energy_median|        energy_mean|        energy_max|         energy_std|        energy_sum|energy_min|
+-------------------+-------------------+------------------+-------------------+------------------+----------+
|             0.1415|0.29616666875000003|         1.1160001| 0.2814713178628203|14.216000100000002|     0.031|
|             0.1015|          0.1898125|             0.685| 0.1884046862418033|             9.111|     0.064|
|              0.114| 0.2189791666666666|0.6759999999999999|0.20291927853038208|10.510999999999996|     0.065|
|              0.191|0.32597916666666665|0.7879999999999999| 0.2592049619947409|15.646999999999998|     0.066|
|0.21800000000000005|             0.3575|             1.077|0.28759657027517305|             17.16|     0.066|
|             0.1305| 0.2350833333333333|             0.705| 0.2220696491599295|            11.284|     0.066|
|

In [86]:
print('Data frame describe:')
csv.describe().toPandas()

Data frame describe:


Unnamed: 0,summary,energy_median,energy_mean,energy_max,energy_std,energy_sum,energy_min
0,count,3079165.0,3079165.0,3079165.0,3079165.0,3079165.0,3079165.0
1,mean,0.1188058879598836,0.163190087908238,0.6950190911996289,0.1402995982633613,7.825446524206482,0.0428914760982337
2,stddev,0.0745428747636724,0.0943245219633481,0.4651897588350241,0.0978564628190887,4.52540514635374,0.0321893980785503
3,min,0.0,0.0,0.0,0.0,0.0,0.0
4,max,0.377,0.508968753125,2.303,0.4696022608472193,24.4100001,0.147


# **2. Обучение модели** <a class="anchor" id="1"></a>

Подготовка данных, создание бинарного признака. Бинарный признак - среднее значение energy_sum.


In [22]:
mean_sum = csv.select(mean('energy_sum')).collect()[0][0]
mean_sum

7.825446524206482

In [23]:
data = csv.select("energy_median", "energy_mean","energy_std", "energy_max", "energy_sum", "energy_min"
               , ((col("energy_sum") > mean_sum).cast("Int").alias("binary_sum")))
data.show(10)

+-------------------+-------------------+-------------------+------------------+------------------+----------+----------+
|      energy_median|        energy_mean|         energy_std|        energy_max|        energy_sum|energy_min|binary_sum|
+-------------------+-------------------+-------------------+------------------+------------------+----------+----------+
|             0.1415|0.29616666875000003| 0.2814713178628203|         1.1160001|14.216000100000002|     0.031|         1|
|             0.1015|          0.1898125| 0.1884046862418033|             0.685|             9.111|     0.064|         1|
|              0.114| 0.2189791666666666|0.20291927853038208|0.6759999999999999|10.510999999999996|     0.065|         1|
|              0.191|0.32597916666666665| 0.2592049619947409|0.7879999999999999|15.646999999999998|     0.066|         1|
|0.21800000000000005|             0.3575|0.28759657027517305|             1.077|             17.16|     0.066|         1|
|             0.1305| 0.

Разделение данных на выборку для обучения, и тестовую выборку в соотношении 70/30

In [24]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 2156142  Testing Rows: 923023


## Обучение.
Задача регрессии - линейная регрессия, метод LinearRegression
Задача классификации - случайный лес, метод RandomForestClassifier

In [72]:
# векторизация и нормализация данных
numVect = VectorAssembler(inputCols = ["energy_median", "energy_mean","energy_std", "energy_max", "energy_min"]
                          , outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="features")

#значения гиперпараметров выбраны случайно
lr = LinearRegression(labelCol="energy_sum",featuresCol="features",maxIter=1,regParam=0.5, elasticNetParam=0.5)
bc = RandomForestClassifier(labelCol="binary_sum",featuresCol="features", numTrees=50,maxDepth=10)

# подготовка пайплайна
regressionPipeline = Pipeline(stages=[numVect, minMax, lr])
classifierPipeline = Pipeline(stages=[numVect, minMax, bc])

#обучение
regressionModel = regressionPipeline.fit(train)
classifierModel = classifierPipeline.fit(train)

## Предсказания по задаче регрессии

In [73]:
regressionPrediction = regressionModel.transform(test)
regressionPredicted = regressionPrediction.select("features", "prediction", "energy_sum")
regressionPredicted.show(10, truncate=False)

+---------+-----------------+----------+
|features |prediction       |energy_sum|
+---------+-----------------+----------+
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
|(5,[],[])|6.323107510194627|0.0       |
+---------+-----------------+----------+
only showing top 10 rows



## Предсказания по задаче классификации

In [74]:
classifierPrediction = classifierModel.transform(test)
classifierPredicted = classifierPrediction.select("features", "prediction", "binary_sum")
classifierPredicted.show(10, truncate=False)

+---------+----------+----------+
|features |prediction|binary_sum|
+---------+----------+----------+
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
|(5,[],[])|0.0       |0         |
+---------+----------+----------+
only showing top 10 rows



## Проверка результатов классификации

In [67]:
tp = float(classifierPredicted.filter("prediction == 1.0 AND binary_sum == 1").count())
fp = float(classifierPredicted.filter("prediction == 1.0 AND binary_sum == 0").count())
tn = float(classifierPredicted.filter("prediction == 0.0 AND binary_sum == 0").count())
fn = float(classifierPredicted.filter("prediction == 0.0 AND binary_sum == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          398996.0|
|       FP|            1999.0|
|       TN|          522028.0|
|       FN|               0.0|
|Precision|0.9950149004351675|
|   Recall|               1.0|
|       F1|0.9975012218887461|
+---------+------------------+



In [44]:
evaluator = BinaryClassificationEvaluator(labelCol="binary_sum", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(classifierPrediction)
print ("AUR = ", aur)

AUR =  0.999613264978563


## Проверка результатов регрессии
Метрики:
* A Cредняя абсолютная ошибка: меньше - лучше
* A Корень из среднеквадратичной ошибки: меньше - лучше
* A Коэффициент детерминации: больше - лучше

In [92]:

regressionEvaluatorMAE = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="mae")
mae = regressionEvaluatorMAE.evaluate(regressionPrediction)
print("MAE(Cредняя абсолютная ошибка) = %g" % mae)

regressionEvaluatorRMSE = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="rmse")
rmse = regressionEvaluatorRMSE.evaluate(regressionPrediction)
print("RMSE(Среднеквадратичная ошибка) = %g" % rmse)

regressionEvaluatorR2 = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="r2")
r2 = regressionEvaluatorR2.evaluate(regressionPrediction)
print("R2(Коэффициент детерминации) = %g" % r2)

MAE(Cредняя абсолютная ошибка) = 2.97397
RMSE(Среднеквадратичная ошибка) = 3.70871
R2(Коэффициент детерминации) = 0.327094


## Кросс-валидация
Подбор гиперпараметров и обучение новой модели

In [76]:
paramGrid = ParamGridBuilder()
    .addGrid(lr.maxIter, [2, 3, 4, 6])
    .addGrid(lr.regParam, [0.0, 0.2, 0.4, 0.6, 1])
    .addGrid(lr.elasticNetParam, [0.0, 0.2, 0.5, 0.8, 1])
    .build()
cv = CrossValidator(estimator=regressionPipeline, evaluator=regressionEvaluatorMAE, estimatorParamMaps=paramGrid, 
                    numFolds=2)

newRegressionModel = cv.fit(train)

In [91]:
regressionNewPrediction = newRegressionModel.transform(test)

regressionBestModel = newRegressionModel.bestModel.stages[2] 
print("Модель линейной регрессии с параметрами MaxIter = %s, RegParam = %s, ElasticNet = %s" %
     (regressionBestModel.getMaxIter(),
      regressionBestModel.getRegParam(),
      regressionBestModel.getElasticNetParam()))

regressionEvaluatorMAE = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="mae")
mae = regressionEvaluatorMAE.evaluate(regressionNewPrediction)
print("MAE(Cредняя абсолютная ошибка) = %g" % mae)

regressionNewEvaluatorRMSE = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="rmse")
rmse = regressionNewEvaluatorRMSE.evaluate(regressionNewPrediction)
print("RMSE(Среднеквадратичная ошибка) = %g" % rmse)

regressionNewEvaluatorR2 = RegressionEvaluator(labelCol="energy_sum", predictionCol = "prediction", metricName="r2")
r2 = regressionNewEvaluatorR2.evaluate(regressionNewPrediction)
print("R2(Коэффициент детерминации) = %g" % r2)

Модель линейной регрессии с параметрами MaxIter = 3, RegParam = 0.0, ElasticNet = 0.2
MAE(Cредняя абсолютная ошибка) = 0.0157025
RMSE(Среднеквадратичная ошибка) = 0.183588
R2(Коэффициент детерминации) = 0.998351


## Сохранение моделей

In [78]:
newRegressionModel.save("./newRegressionModel")

In [79]:
classifierModel.save("./classifierModel")