In [1]:
import numpy as np
import pandas as pd
import os

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import LinearRegression

from pyspark.ml.classification import GBTClassifier

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
dataframe = spark.read.csv('filtered_data/filtered_data.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_15,load_30,load_45,load_60,load_75,load_90,pv_00,pv_15,pv_30,pv_45,pv_60,pv_75,pv_90
0,2014-07-19 18:45:00,1,0,51.625703,22.712489,52.816828,53.389783,53.389783,53.389783,53.391791,53.389783,53.398708,18.321836,2.681188,1.646203,1.357879,76.186577,101.239714,53.792765
1,2014-07-19 19:30:00,1,0,52.281257,6.618605,51.452796,53.389783,53.391368,53.389783,53.407345,53.43707,53.406378,6.339899,1.547058,1.33026,4.771444,77.650275,97.718869,34.159983
2,2014-07-19 20:00:00,1,0,50.719565,1.452209,51.313898,53.389783,53.393106,53.389783,53.393111,53.389783,53.398708,0.936193,0.919032,1.155305,8.527937,100.038989,94.236474,24.634119
3,2014-07-19 20:15:00,1,0,51.901162,0.580877,51.950475,53.389783,53.393106,53.460944,53.826278,53.389783,54.098676,0.219761,0.894574,1.148492,15.937884,98.785139,96.410975,19.146747
4,2014-07-19 21:00:00,1,0,51.250007,0.0,52.21882,53.389783,53.396435,54.021914,53.843979,53.905183,54.122371,0.143507,0.938702,1.160785,35.972993,97.065151,84.195736,5.62244


# Решим задачу регрессии

In [4]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_90")).alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+-----------------+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|            label|
+-----------+-------+---------+------------------+------------------+------------------+-----------------+
|1.4057955E9|      1|        0| 51.62570299494799| 22.71248932566911| 52.81682785868848|53.39870789573359|
|1.4057982E9|      1|        0| 52.28125674965801| 6.618605013254157|51.452796259410526|53.40637826612612|
|   1.4058E9|      1|        0| 50.71956514220455|1.4522094578011435| 51.31389786752856|53.39870789573359|
|1.4058009E9|      1|        0| 51.90116154382357|0.5808771932574699| 51.95047496345374|54.09867645977295|
|1.4058036E9|      1|        0| 51.25000680775122|               0.0|  52.2188201830341|54.12237134751928|
|1.4058099E9|      1|        0| 51.79032626969339|               0.0| 51.85754836350091|53.39870789573359|
|1.4058108E9|      1|        0|52.460

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбец label переименован в rightLabel, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [5]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "rightLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 23377  Testing Rows: 10128


In [6]:
# Создание столбца признаков для задачи регрессии
catVect = VectorAssembler(inputCols=["timestamp", "site_id", "period_id"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv", "load_00"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели LinearRegression
lr = LinearRegression(labelCol="label", featuresCol="reg_features")

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "rightLabel").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------+------------------+-----------------+
|reg_features                                                                                                                            |prediction        |rightLabel       |
+----------------------------------------------------------------------------------------------------------------------------------------+------------------+-----------------+
|[1.4057955E9,0.0,0.0,51.62570299494799,22.71248932566911,52.81682785868848,0.23079175531723833,0.29912919798128706,0.1944704199932404]  |59.725990341183305|53.39870789573359|
|[1.4057982E9,0.0,0.0,52.28125674965801,6.618605013254157,51.452796259410526,0.23453010227346452,0.08716869300328517,0.18247084493937699]|58.406070584566805|53.40637826612612|
|[1.4058036E9,0.0,0.0,51.25000680775122,0.0,52.2188201830341,0.22864931819780782,0.0,0.18920966357777347]               

In [7]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="rightLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  11.494353810756488


In [8]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="rightLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.4099043652184545


In [9]:
data.select(stddev('label')).show()

+------------------+
|     stddev(label)|
+------------------+
|15.024492674635598|
+------------------+



CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [10]:
param_grid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.01, 0.1, 1.0])
              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
              .addGrid(lr.maxIter, [10, 50, 100])
              .build())

cv = CrossValidator(estimator=reg_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=RegressionEvaluator(),
                            numFolds=3) 

# Обучение и подбор гиперпараметров
cv_model = cv.fit(train)

best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test)

In [11]:
reg_evaluator = RegressionEvaluator(labelCol="rightLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(cv_prediction)
print("RMSE = ", rmse)

RMSE =  11.493872223032922


In [12]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="rightLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(cv_prediction)
print("R² = ", r2)

R² =  0.4099538115579022


# Решим задачу бинарной классификации

### Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В частности, 1 для показателей нагрузок более 75 в load_90 и 0 для нагрузок менее 75.

In [13]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_90") > 58).cast("Int").alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+-----+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|label|
+-----------+-------+---------+------------------+------------------+------------------+-----+
|1.4057955E9|      1|        0| 51.62570299494799| 22.71248932566911| 52.81682785868848|    0|
|1.4057982E9|      1|        0| 52.28125674965801| 6.618605013254157|51.452796259410526|    0|
|   1.4058E9|      1|        0| 50.71956514220455|1.4522094578011435| 51.31389786752856|    0|
|1.4058009E9|      1|        0| 51.90116154382357|0.5808771932574699| 51.95047496345374|    0|
|1.4058036E9|      1|        0| 51.25000680775122|               0.0|  52.2188201830341|    0|
|1.4058099E9|      1|        0| 51.79032626969339|               0.0| 51.85754836350091|    0|
|1.4058108E9|      1|        0|52.460696718996665|               0.0|52.347502377808425|    0|
|1.4058117E9|      1|        0|  51.8311171697882|

In [14]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "rightLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 23396  Testing Rows: 10109


### Вычисление отношения между классами

In [15]:
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 0.9377174093092596


In [16]:
catVect = VectorAssembler(inputCols = ["timestamp", "site_id", "period_id"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["actual_consumption", "actual_pv", "load_00"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10, maxDepth=5, stepSize=0.1)

pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, gbt])
model = pipeline.fit(train)
prediction = model.transform(test)

predicted = prediction.select("features", "prediction", "rightLabel")
predicted.show(10, truncate=False)

+--------------------------------------------------------------------------------+----------+----------+
|features                                                                        |prediction|rightLabel|
+--------------------------------------------------------------------------------+----------+----------+
|[1.4058108E9,0.0,0.0,0.3496619637349427,0.0,0.22958901750264155]                |0.0       |0         |
|[1.4058135E9,0.0,0.0,0.3460205966676067,0.0,0.22805399569532128]                |0.0       |0         |
|[1.4058144E9,0.0,0.0,0.34368763695538185,0.0,0.2258553092861994]                |0.0       |0         |
|[1.4058432E9,0.0,0.0,0.3502983947382051,0.22915654459842116,0.2329009423290168] |0.0       |0         |
|[1.4058756E9,0.0,0.0,0.3550939700532131,0.7209437233742669,0.24170177494060768] |1.0       |1         |
|[1.4058765E9,0.0,0.0,0.3484632391697555,0.5984913956788418,0.2269658695545072]  |0.0       |1         |
|[1.405881E9,0.0,0.0,0.34813656048988945,0.405064994813

In [17]:
tp = float(predicted.filter("prediction == 1.0 AND rightLabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND rightLabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND rightLabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND rightLabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            3800.0|
|       FP|            1233.0|
|       TN|            4061.0|
|       FN|            1015.0|
|Precision|0.7550168885356646|
|   Recall|0.7892004153686397|
|       F1|0.7717303005686434|
+---------+------------------+



In [18]:
evaluator = BinaryClassificationEvaluator(labelCol="rightLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.847648388955776


### Подбор гиперпараметров

In [19]:
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 5, 10])
             .addGrid(gbt.maxBins, [20, 32])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator=pipeline, 
                    evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [20]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "rightLabel")
newPredicted.show()

+--------------------+----------+----------+
|            features|prediction|rightLabel|
+--------------------+----------+----------+
|[1.4058108E9,0.0,...|       0.0|         0|
|[1.4058135E9,0.0,...|       0.0|         0|
|[1.4058144E9,0.0,...|       0.0|         0|
|[1.4058432E9,0.0,...|       0.0|         0|
|[1.4058756E9,0.0,...|       1.0|         1|
|[1.4058765E9,0.0,...|       0.0|         1|
|[1.405881E9,0.0,0...|       0.0|         1|
|[1.4058837E9,0.0,...|       0.0|         1|
|[1.4058981E9,0.0,...|       0.0|         0|
|[1.4059008E9,0.0,...|       0.0|         0|
|[1.4059062E9,0.0,...|       0.0|         0|
|[1.4059161E9,0.0,...|       0.0|         0|
|[1.4059314E9,0.0,...|       1.0|         1|
|[1.405935E9,0.0,1...|       1.0|         1|
|[1.4059818E9,0.0,...|       0.0|         0|
|[1.4059872E9,0.0,...|       0.0|         0|
|[1.4059908E9,0.0,...|       0.0|         0|
|[1.4060052E9,0.0,...|       0.0|         0|
|[1.4060538E9,0.0,...|       1.0|         1|
|[1.406061

In [25]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND rightLabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND rightLabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND rightLabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND rightLabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            3838.0|
|       FP|            1244.0|
|       TN|            4050.0|
|       FN|             977.0|
|Precision|0.7552144824872098|
|   Recall| 0.797092419522326|
|       F1|0.7755885621905628|
+---------+------------------+



In [26]:
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)

AUR2 =  0.8476483889557761
