# 1. Инициализация PySpark фреймворка

In [1]:
import numpy as np
import pandas as pd
import os

Импорт библиотек Spark SQL и Spark ML

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder.master("local[*]").getOrCreate()

Загрузка исходных данных

In [3]:
dataframe = spark.read.csv('filtered_data/filtered_data.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_19,load_38,load_57,load_76,load_95,pv_00,pv_19,pv_38,pv_57,pv_76,pv_95
0,2016-03-19 10:30:00,16,34,159.990084,1.912269,143.861074,123.932534,118.152543,134.324776,134.830698,137.857285,3.958022,146.204244,35.181755,3.695826,3.697584,9.838237
1,2016-03-19 10:45:00,16,34,119.992563,8.295386,134.50729,123.893909,121.15145,135.891911,135.030675,133.859616,28.042454,137.577075,22.077018,3.700424,3.698064,12.065509
2,2016-03-19 12:30:00,16,34,111.993059,99.026948,133.860285,121.596185,131.914748,136.694796,133.149629,132.059423,104.301363,134.664941,4.161038,3.699189,3.697935,140.617869
3,2016-03-19 21:15:00,16,34,127.992067,50.036455,125.280619,137.628109,135.918246,137.457092,111.513904,124.255575,40.708166,7.077827,4.050567,75.12606,236.982773,57.04749
4,2016-03-19 22:15:00,16,34,135.991572,3.100612,135.799826,136.082864,136.820771,133.138293,113.043872,133.516198,2.768181,3.600765,3.687663,145.890814,231.163563,11.48168


# Задача регрессии

### Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В качестве метки выбран столбец load_95

In [4]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_95")).alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+------------------+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|             label|
+-----------+-------+---------+------------------+------------------+------------------+------------------+
|1.4583726E9|     16|       34|159.99008431763994|1.9122687392575448|143.86107411849176|137.85728490103136|
|1.4583735E9|     16|       34|119.99256323822993| 8.295385807660148|134.50728978179805|133.85961632355335|
|1.4583798E9|     16|       34|111.99305902234796| 99.02694764955635|133.86028507820527|132.05942315683393|
|1.4584113E9|     16|       34|127.99206745411195| 50.03645545014334| 125.2806185386988|124.25557530010748|
|1.4584149E9|     16|       34|  135.991571669994| 3.100611978437967|135.79982553848714| 133.5161980204173|
|1.4584185E9|     16|       34|119.99256323822993|               0.0|135.60065681063034|133.37563961079266|
|1.4584221E9|     16|       

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбец *binary_load_00l переименован  в trueLale*l, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [5]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 430396  Testing Rows: 184946


Задача регрессии

In [6]:
# Создание столбца признаков для задачи регрессии

# Объединение входных столбцов в один выходной
catVect = VectorAssembler(inputCols=["timestamp", "site_id", "period_id"], outputCol="reg_catFeatures")
# Определение категориальных признаков и преобразование их в индексы
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
# Объединение указанных числовых столбцов в один выходной столбец
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv", "load_00"], outputCol="reg_numFeatures")
# Масштабирование числовых признаков в диапазоне от 0 до 1
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
# Объединение категориальных и числовых признаков в один общий вектор
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели RandomForestRegressor
rf = RandomForestRegressor(labelCol="label", featuresCol="reg_features", numTrees=10)

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, rf])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "trueLabel").show(10, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|reg_features                                                                                                                              |prediction        |trueLabel         |
+------------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|[1.4374368E9,0.0,0.0,159.99008431763994,0.0,156.92691490700227,0.5105613981784742,0.0,0.5938496399923633]                                 |153.1892031608797 |138.49239879171867|
|[1.4374377E9,0.0,0.0,175.98909274940388,0.0,179.53710182006398,0.5617332120276427,0.0,0.6631072869841385]                                 |170.45150035245928|154.04011823642662|
|[1.4374395E9,0.0,0.0,159.99008431763994,0.0,153.58133339759013,0.5105613981784742,0.0,0.5836017325313585

In [7]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  32.961393119968015


In [8]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.6841142106752328


In [9]:
data.select(stddev('label')).show()

+-----------------+
|    stddev(label)|
+-----------------+
|58.60305528763398|
+-----------------+



CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [10]:
param_grid = (ParamGridBuilder()
              .addGrid(rf.maxDepth, [2, 5, 10])
              .addGrid(rf.numTrees, [5, 10, 20])
              .build())

cv = CrossValidator(estimator=reg_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=RegressionEvaluator(),
                            numFolds=3) 

# Обучение и подбор гиперпараметров
cv_model = cv.fit(train)

best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test)

In [11]:
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(cv_prediction)
print("RMSE = ", rmse)

RMSE =  28.759282550299112


In [12]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(cv_prediction)
print("R² = ", r2)

R² =  0.7595221578936747


# Задача бинарной классификации

## Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В частности, *1* для показателей нагрузок более 123.5 в *load_95* и *0* для нагрузок менее 123.5.

In [67]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_95") > 123.2).cast("Int").alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+-----+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|label|
+-----------+-------+---------+------------------+------------------+------------------+-----+
|1.4583726E9|     16|       34|159.99008431763994|1.9122687392575448|143.86107411849176|    1|
|1.4583735E9|     16|       34|119.99256323822993| 8.295385807660148|134.50728978179805|    1|
|1.4583798E9|     16|       34|111.99305902234796| 99.02694764955635|133.86028507820527|    1|
|1.4584113E9|     16|       34|127.99206745411195| 50.03645545014334| 125.2806185386988|    1|
|1.4584149E9|     16|       34|  135.991571669994| 3.100611978437967|135.79982553848714|    1|
|1.4584185E9|     16|       34|119.99256323822993|               0.0|135.60065681063034|    1|
|1.4584221E9|     16|       34|119.99256323822993|               0.0|142.30728908853442|    1|
|1.4584239E9|     16|       34|151.99058010175796|

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован в* trueLablel, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [68]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 430704  Testing Rows: 184638


## Вычисление отношения между классами

In [69]:
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 0.9948588975809028


Если balance_ratio близко к 1, это свидетельствует о балансе. Если значительно больше или меньше 1, это может указывать на дисбаланс.

In [70]:
catVect = VectorAssembler(inputCols = ["timestamp", "site_id", "period_id"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["actual_consumption", "actual_pv", "load_00"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])

In [71]:
pipelineModel = pipeline.fit(train)

In [72]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+---------------------------------------------------------------------------------+----------+---------+
|features                                                                         |prediction|trueLabel|
+---------------------------------------------------------------------------------+----------+---------+
|[1.4374377E9,0.0,0.0,0.5617332120276427,0.0,0.6631072869841385]                  |1.0       |1        |
|[1.4374386E9,0.0,0.0,0.5105613981784742,0.0,0.6229895374027743]                  |1.0       |1        |
|[1.4374395E9,0.0,0.0,0.5105613981784742,0.0,0.5836017325313585]                  |1.0       |1        |
|[1.4374422E9,0.0,0.0,0.5617332120276427,0.0,0.5798990762851773]                  |1.0       |1        |
|[1.4374449E9,0.0,0.0,0.5361473051030586,0.0,0.5512339444661548]                  |1.0       |1        |
|[1.4374476E9,0.0,0.0,0.5105613981784742,0.0,0.5158912745314218]                  |1.0       |1        |
|[1.4374494E9,0.0,0.0,0.45938958432930577,0.0,0.5578308

In [73]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           73737.0|
|       FP|            9481.0|
|       TN|           82765.0|
|       FN|           18655.0|
|Precision|0.8860703213247134|
|   Recall|0.7980885790977574|
|       F1|0.8397813336370367|
+---------+------------------+



In [74]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUROC = ", aur)

AUROC =  0.9045940960093065


## Подбор гиперпараметров

In [75]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.3, 0.1])
             .addGrid(lr.maxIter, [10, 5])
             .addGrid(lr.threshold,[0.4, 0.3])
             .build())

cv = CrossValidator(estimator=pipeline, 
                    evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [76]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[1.4374377E9,0.0,...|       1.0|        1|
|[1.4374386E9,0.0,...|       1.0|        1|
|[1.4374395E9,0.0,...|       1.0|        1|
|[1.4374422E9,0.0,...|       1.0|        1|
|[1.4374449E9,0.0,...|       1.0|        1|
|[1.4374476E9,0.0,...|       1.0|        1|
|[1.4374494E9,0.0,...|       1.0|        1|
|[1.4374503E9,0.0,...|       1.0|        1|
|[1.4374602E9,0.0,...|       1.0|        1|
|[1.437462E9,0.0,0...|       1.0|        1|
|[1.4374629E9,0.0,...|       1.0|        1|
|[1.4374638E9,0.0,...|       1.0|        1|
|[1.4374674E9,0.0,...|       1.0|        1|
|[1.4375016E9,0.0,...|       1.0|        1|
|[1.4375052E9,0.0,...|       1.0|        1|
|[1.4375079E9,0.0,...|       1.0|        1|
|[1.4375133E9,0.0,...|       1.0|        1|
|[1.4375169E9,0.0,...|       1.0|        1|
|[1.4375187E9,0.0,...|       1.0|        1|
|[1.437525E9,0.0,0...|       1.0

In [77]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           84018.0|
|       FP|           23624.0|
|       TN|           68622.0|
|       FN|            8374.0|
|Precision|0.7805317626948589|
|   Recall|0.9093644471382804|
|       F1|0.8400371936770749|
+---------+------------------+



In [78]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)

AUR2 =  0.9045964860123676
