In [12]:
# Import other modules not related to PySpark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline

In [13]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()
# Укажите путь к CSV файлу
csv_file_path = "archive/1.csv"

# Прочитайте CSV файл в DataFrame
df = spark.read.csv(csv_file_path, header=True, sep=';', inferSchema=True)

col = ['timestamp','site_id','period_id','actual_consumption','actual_pv', 
       'load_00', 'load_01', 'load_02', 'load_03', 'load_04', 'load_05', 'load_06', 'load_07', 'load_08', 'load_09',
       'pv_00', 'pv_01', 'pv_02', 'pv_03', 'pv_04', 'pv_05', 'pv_06', 'pv_07', 'pv_08', 'pv_09']

df = df[col]
limited_data_pandas = df.limit(10)

limited_data_pandas.toPandas()

Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_01,load_02,load_03,load_04,...,pv_00,pv_01,pv_02,pv_03,pv_04,pv_05,pv_06,pv_07,pv_08,pv_09
0,2014-07-19 18:45:00,1,0,51.625703,22.712489,52.816828,53.501688,54.079161,52.683472,52.590445,...,18.321836,13.912749,10.946568,9.243136,6.962653,5.466993,4.565274,4.082976,3.851306,3.638564
1,2014-07-19 19:30:00,1,0,52.281257,6.618605,51.452796,51.676287,51.329882,51.690879,51.538671,...,6.339899,4.295642,3.016987,2.314616,2.015446,1.952004,1.893801,1.840635,1.791986,1.747604
2,2014-07-19 20:00:00,1,0,50.719565,1.452209,51.313898,52.199835,52.340547,51.844138,52.661063,...,0.936193,0.403129,0.25949,0.338924,0.411971,0.479376,0.541489,0.598855,0.651823,0.700478
3,2014-07-19 20:15:00,1,0,51.901162,0.580877,51.950475,51.624345,50.867434,51.538997,51.331161,...,0.219761,0.091042,0.184182,0.26982,0.348792,0.421529,0.488656,0.550591,0.607483,0.659699
4,2014-07-19 21:00:00,1,0,51.250007,0.0,52.21882,52.176852,51.745211,51.809854,51.83571,...,0.143507,0.232756,0.314935,0.390735,0.460637,0.524848,0.583788,0.637915,0.687616,0.733272
5,2014-07-19 22:45:00,1,0,51.790326,0.0,51.857548,51.89193,51.912236,52.005984,51.929889,...,0.170069,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462
6,2014-07-19 23:00:00,1,0,52.460697,0.0,52.347502,52.28546,52.333257,52.231521,52.220741,...,0.170069,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462
7,2014-07-19 23:15:00,1,0,51.831117,0.0,52.024522,52.214861,52.192664,52.226268,52.920358,...,0.170069,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462
8,2014-07-19 23:45:00,1,0,52.030533,0.0,52.20284,52.344978,53.099632,52.468551,52.493247,...,0.170069,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462
9,2014-07-20 00:00:00,1,0,51.754934,0.0,51.995633,52.805283,52.204891,52.246713,52.270049,...,0.170069,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462


In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr
from pyspark.sql.types import IntegerType

# Предположим, что ваш DataFrame называется df
# Замените 'timestamp' на фактическое имя вашей колонки с временем
df = df.withColumn("timestamp", df["timestamp"].cast("timestamp"))

# Добавление новой колонки 'DAY' с условием
df = df.withColumn(
    "DAY",
    when(
        (expr("hour(timestamp) >= 8") & expr("hour(timestamp) < 20")),
        1
    ).otherwise(0)
)

# Если необходимо привести колонку 'DAY' к типу IntegerType
df = df.withColumn("DAY", df["DAY"].cast(IntegerType()))

# Вывод первых нескольких строк DataFrame для проверки результата
limited_data_pandas = df.limit(10)

limited_data_pandas.toPandas()


Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_01,load_02,load_03,load_04,...,pv_01,pv_02,pv_03,pv_04,pv_05,pv_06,pv_07,pv_08,pv_09,DAY
0,2014-07-19 18:45:00,1,0,51.625703,22.712489,52.816828,53.501688,54.079161,52.683472,52.590445,...,13.912749,10.946568,9.243136,6.962653,5.466993,4.565274,4.082976,3.851306,3.638564,1
1,2014-07-19 19:30:00,1,0,52.281257,6.618605,51.452796,51.676287,51.329882,51.690879,51.538671,...,4.295642,3.016987,2.314616,2.015446,1.952004,1.893801,1.840635,1.791986,1.747604,1
2,2014-07-19 20:00:00,1,0,50.719565,1.452209,51.313898,52.199835,52.340547,51.844138,52.661063,...,0.403129,0.25949,0.338924,0.411971,0.479376,0.541489,0.598855,0.651823,0.700478,0
3,2014-07-19 20:15:00,1,0,51.901162,0.580877,51.950475,51.624345,50.867434,51.538997,51.331161,...,0.091042,0.184182,0.26982,0.348792,0.421529,0.488656,0.550591,0.607483,0.659699,0
4,2014-07-19 21:00:00,1,0,51.250007,0.0,52.21882,52.176852,51.745211,51.809854,51.83571,...,0.232756,0.314935,0.390735,0.460637,0.524848,0.583788,0.637915,0.687616,0.733272,0
5,2014-07-19 22:45:00,1,0,51.790326,0.0,51.857548,51.89193,51.912236,52.005984,51.929889,...,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462,0
6,2014-07-19 23:00:00,1,0,52.460697,0.0,52.347502,52.28546,52.333257,52.231521,52.220741,...,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462,0
7,2014-07-19 23:15:00,1,0,51.831117,0.0,52.024522,52.214861,52.192664,52.226268,52.920358,...,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462,0
8,2014-07-19 23:45:00,1,0,52.030533,0.0,52.20284,52.344978,53.099632,52.468551,52.493247,...,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462,0
9,2014-07-20 00:00:00,1,0,51.754934,0.0,51.995633,52.805283,52.204891,52.246713,52.270049,...,0.257836,0.338463,0.412528,0.480568,0.543071,0.600489,0.653234,0.701688,0.7462,0


<h2 style="text-align:center;font-size:200%;;">GradientBoostingMachine</h2>

In [15]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Выберем только нужные колонки
selected_columns = ['actual_consumption', 'load_00', 'load_01', 'load_02']
data_subset = df.select(selected_columns)

# Создадим столбец features, который объединяет все признаки в один вектор
assembler = VectorAssembler(inputCols=selected_columns[1:], outputCol="features")
data_subset = assembler.transform(data_subset)

# Создадим столбец label, который содержит значения actual_consumption
data_subset = data_subset.withColumnRenamed("actual_consumption", "label")

# Разделим данные на обучающую и тестовую выборки
(training_data, test_data) = data_subset.randomSplit([0.8, 0.2], seed=42)

# Инициализация модели GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="label", seed=42)

# Создадим сетку параметров для подбора
param_grid = (ParamGridBuilder()
              .addGrid(gbt.maxDepth, [5, 10])
              .addGrid(gbt.maxIter, [10, 20])
              .build())

# Инициализация оценщика (evaluator)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")

# Инициализация кросс-валидации
cross_val = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# Обучение модели на обучающей выборке с кросс-валидацией
cv_model = cross_val.fit(training_data)

# Прогнозирование на тестовой выборке
predictions = cv_model.transform(test_data)

# Оценка качества модели - MSE
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error: {mse}")

# Оценка качества модели - MAE
mae_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error: {mae}")

# Оценка качества модели - RMSE
rmse_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Squared Error: {rmse}")

# Оценка качества модели - R²
r2_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R^2: {r2}")

# Вывод лучших параметров
best_params = cv_model.bestModel.extractParamMap()
print("Best Parameters:")
for param, value in best_params.items():
    print(f"{param.name}: {value}")


Mean Squared Error: 7.06849842334063
Mean Absolute Error: 1.5590172143607572
Root Mean Squared Error: 2.6586647820552014
R^2: 0.9761968549552151
Best Parameters:
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: label
leafCol: 
lossType: squared
maxBins: 32
maxDepth: 5
maxIter: 20
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: 42
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01


<h2 style="text-align:center;font-size:200%;;">LogisticRegression</h2>

In [16]:
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
from pyspark.ml.linalg import DenseVector
from pyspark.mllib.evaluation import MulticlassMetrics

# Выберем только нужные колонки
selected_columns = ['DAY', 'actual_consumption', 'load_00', 'load_01', 'load_02']
data_subset = df.select(selected_columns)

# Создадим столбец features, который объединяет все признаки в один вектор
assembler = VectorAssembler(inputCols=selected_columns[1:], outputCol="features")
data_subset = assembler.transform(data_subset)

# Создадим столбец label, который содержит значения DAY
data_subset = data_subset.withColumnRenamed("DAY", "label")

# Разделим данные на обучающую и тестовую выборки
(training_data, test_data) = data_subset.randomSplit([0.8, 0.2], seed=42)

# Инициализация модели LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Обучение модели на обучающей выборке
lr_model = lr.fit(training_data)

# Прогнозирование на тестовой выборке
predictions = lr_model.transform(test_data)

# Оценка качества модели
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(predictions)

print(f"Area under ROC curve: {area_under_roc}")

# Метрики классификации 
true_positives = predictions.filter("prediction = 1.0 AND label = 1").count()
true_negatives = predictions.filter("prediction = 0.0 AND label = 0").count()
false_positives = predictions.filter("prediction = 1.0 AND label = 0").count()
false_negatives = predictions.filter("prediction = 0.0 AND label = 1").count()

# Accuracy (точность)
accuracy = (true_positives + true_negatives) / (true_positives + true_negatives + false_positives + false_negatives)
print(f"Accuracy: {accuracy}")

# Precision (точность)
precision = true_positives / (true_positives + false_positives)
print(f"Precision: {precision}")

# Recall (полнота)
recall = true_positives / (true_positives + false_negatives)
print(f"Recall: {recall}")

# F1 Score (F-мера)
f1_score = 2 * (precision * recall) / (precision + recall)
print(f"F1 Score: {f1_score}")

# ROC-AUC (площадь под ROC-кривой) - для бинарной классификации
# В данном случае, с использованием PySpark, уже было вычислено area_under_roc

# Вывод матрицы ошибок (Confusion Matrix)
print("Confusion Matrix:")
print(f"True Positives: {true_positives}")
print(f"True Negatives: {true_negatives}")
print(f"False Positives: {false_positives}")
print(f"False Negatives: {false_negatives}")

Area under ROC curve: 0.8241304729265069
Accuracy: 0.8106200456075578
Precision: 0.9207335107956226
Recall: 0.6783612987578993
F1 Score: 0.7811794228356336
Confusion Matrix:
True Positives: 3113
True Negatives: 4352
False Positives: 268
False Negatives: 1476
