Выполните анализ Вами выбранного датасета с помощью двух алгоритмов машинного
обучения в соответствии с индивидуальным вариантом:

Вариант: 1  
Задача регрессии: RandomForest  
Задача бинарной классификации: LogisticRegression

Датасет: 5. Датасет исторических данных по фотоэлектричеству и нагрузке.

In [148]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, IntegerType, DoubleType
import pyspark.sql.functions as F

In [149]:
MAX_MEMORY = '20G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()

filename_data1 = '1.csv'
filename_data2 = '2.csv'

df1 = spark.read.csv(filename_data1, header=True, inferSchema=True, sep=';')
df2 = spark.read.csv(filename_data2, header=True, inferSchema=True, sep=';')

df = df1.union(df2)

print('Data frame type: ' + str(type(df)))

Data frame type: <class 'pyspark.sql.dataframe.DataFrame'>


In [150]:
# Define column names and data types
columns = [
    ("timestamp", TimestampType()),
    ("site_id", IntegerType()),
    ("period_id", IntegerType()),
    ("actual_consumption", DoubleType()),
    ("actual_pv", DoubleType()),
    ("load_00", DoubleType()),
    ("load_01", DoubleType()),
    ("load_02", DoubleType()),
    ("load_03", DoubleType()),
    ("load_04", DoubleType()),
    ("load_05", DoubleType()),
    ("pv_00", DoubleType()),
    ("pv_01", DoubleType()),
    ("pv_02", DoubleType()),
    ("pv_03", DoubleType()),
    ("pv_04", DoubleType()),
    ("pv_05", DoubleType())
]

# Apply the schema to the PySpark DataFrame
for col, data_type in columns:
    df = df.withColumn(col, df['`{}`'.format(col)].cast(data_type))

# Select specific columns from the DataFrame
selected_columns = ["timestamp", "site_id", "period_id", "actual_consumption", "actual_pv",
                    "load_00", "load_01", "load_02", "load_03", "load_04", "load_05",
                    "pv_00", "pv_01", "pv_02", "pv_03", "pv_04", "pv_05"]

selected_df = df.select(*selected_columns)

# Define conditions for filtering zeros and negative numbers
conditions = (
    (F.col("actual_consumption") > 0) &
    (F.col("actual_pv") > 0) &
    (F.col("load_00") > 0) &
    (F.col("load_01") > 0) &
    (F.col("load_02") > 0) &
    (F.col("load_03") > 0) &
    (F.col("load_04") > 0) &
    (F.col("load_05") > 0) &
    (F.col("pv_00") > 0) &
    (F.col("pv_01") > 0) &
    (F.col("pv_02") > 0) &
    (F.col("pv_03") > 0) &
    (F.col("pv_04") > 0) &
    (F.col("pv_05") > 0)
)

# Apply the filter to the DataFrame
filtered_df = selected_df.filter(conditions)

# Specify the columns where you want to handle outliers (assuming these are numeric columns)
numerical_columns = ["actual_consumption", "actual_pv", "load_00", "load_01", "load_02", "load_03", "load_04", "load_05",
                     "pv_00", "pv_01", "pv_02", "pv_03", "pv_04", "pv_05"]

# Define the lower and upper limits based on 3 standard deviations from the mean
std_dev_limits = {col_name: 3 * filtered_df.agg({col_name: "stddev"}).collect()[0][0] for col_name in numerical_columns}
mean_values = {col_name: filtered_df.agg({col_name: "mean"}).collect()[0][0] for col_name in numerical_columns}

# Filter the data, keeping only the rows where values are within 3 standard deviations from the mean
for col_name in numerical_columns:
    lower_limit = mean_values[col_name] - std_dev_limits[col_name]
    upper_limit = mean_values[col_name] + std_dev_limits[col_name]
    filtered_df = filtered_df.filter((filtered_df[col_name] >= lower_limit) & (filtered_df[col_name] <= upper_limit))
    
    
# Show the resulting DataFrame
filtered_df.show()

+-------------------+-------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+
|          timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|           load_01|           load_02|           load_03|           load_04|           load_05|              pv_00|              pv_01|              pv_02|             pv_03|             pv_04|              pv_05|
+-------------------+-------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+
|2014-07-19 18:45:00|      1|        0| 51.62570299494799| 22.71248932

In [151]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Определите категориальные признаки
categorical_features = ["site_id", "period_id"]

# Инициализируйте StringIndexer для индексации категориальных признаков
indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_index", handleInvalid="skip") for column in categorical_features]

# Инициализируйте OneHotEncoder для кодирования категориальных признаков
encoder_rf = OneHotEncoder(inputCols=[f"{col}_index" for col in categorical_features],
                           outputCols=[f"{col}_encoded" for col in categorical_features])

# Примените VectorAssembler для объединения всех признаков, включая закодированные
feature_columns_rf = ["site_id_encoded", "period_id_encoded", "load_00", "load_01", "load_02", "load_03", "load_04", "load_05",
                      "pv_00", "pv_01", "pv_02", "pv_03", "pv_04", "pv_05"]
vector_assembler_rf = VectorAssembler(inputCols=feature_columns_rf, outputCol="features_new")

# Разделите данные на обучающий и тестовый наборы для RandomForestRegressor
(train_data_rf, test_data_rf) = filtered_df.randomSplit([0.8, 0.2], seed=123)

# Инициализируйте модель случайного леса
rf = RandomForestRegressor(featuresCol="features_new", labelCol="actual_consumption")

# Создайте конвейер для последовательного выполнения шагов
pipeline_rf = Pipeline(stages=indexers + [encoder_rf, vector_assembler_rf, rf])

# Параметры для подбора
param_grid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Оценщик для оценки качества модели (RMSE)
evaluator_rf = RegressionEvaluator(labelCol="actual_consumption", predictionCol="prediction", metricName="rmse")

# Кросс-валидация для RandomForestRegressor
crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=param_grid_rf,
                             evaluator=evaluator_rf,
                             numFolds=3)

# Обучение модели
model_cv_rf = crossval_rf.fit(train_data_rf)
predictions_cv_rf = model_cv_rf.transform(test_data_rf)

# Оценка модели с использованием метрик регрессии
rmse_cv_rf = evaluator_rf.evaluate(predictions_cv_rf)
print("Random Forest (CV) - Root Mean Squared Error (RMSE) on test data = %g" % rmse_cv_rf)


Random Forest (CV) - Root Mean Squared Error (RMSE) on test data = 2.20308


In [152]:
import pandas as pd

# Преобразуйте результаты предсказаний в DataFrame
predictions_df = predictions_cv_rf.select("actual_consumption", "prediction").limit(20).toPandas()

# Выведите первые 20 строк DataFrame
print(predictions_df)


    actual_consumption  prediction
0            50.719565   53.313102
1            53.116910   53.030801
2            52.925833   53.056896
3            52.271559   53.279963
4            52.951707   53.080376
5            52.280497   52.554910
6            90.469777   89.799751
7            90.177203   90.878125
8            88.147575   88.605385
9            89.307277   88.605385
10           89.937456   89.786369
11           71.385435   73.930008
12           69.325334   63.047479
13           90.764232   91.795022
14           87.251615   89.419231
15           93.231641   91.444040
16           91.196495   93.051722
17           92.045886   91.188915
18           89.515550   85.497058
19           91.700063   92.797852


In [153]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, mean, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# Рассчитаем среднее значение actual_consumption
mean_actual_consumption = filtered_df.select(mean("actual_consumption")).collect()[0][0]

# Создадим новый столбец "label" на основе условия (> среднего значения)
filtered_df = filtered_df.withColumn("label", when(col("actual_consumption") > mean_actual_consumption, 1).otherwise(0))

# Определим категориальные признаки
categorical_features = ["site_id", "period_id"]

# Инициализируем StringIndexer для индексации категориальных признаков
indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_index", handleInvalid="skip") for column in categorical_features]

# Инициализируем OneHotEncoder для кодирования категориальных признаков
encoder = OneHotEncoder(inputCols=[f"{col}_index" for col in categorical_features],
                        outputCols=[f"{col}_encoded" for col in categorical_features])

# Применяем VectorAssembler для объединения всех признаков, включая закодированные
feature_columns = ["site_id_encoded", "period_id_encoded", "load_00", "load_01", "load_02", "load_03", "load_04", "load_05",
                   "pv_00", "pv_01", "pv_02", "pv_03", "pv_04", "pv_05"]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Применяем OneHotEncoder и VectorAssembler
assembled_df = Pipeline(stages=indexers + [encoder, vector_assembler]).fit(filtered_df).transform(filtered_df).select("features", "label")

# Разделяем данные на обучающий и тестовый наборы
(train_data, test_data) = assembled_df.randomSplit([0.8, 0.2], seed=123)

# Инициализируем модель Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Создаем конвейер для последовательного выполнения шагов
pipeline = Pipeline(stages=[lr])

# Задаем сетку параметров для поиска
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.3, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Инициализируем оценщик бинарной классификации
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Инициализируем кросс-валидатор с 5 фолдами
cross_validator = CrossValidator(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

# Обучаем модель на обучающем наборе с использованием кросс-валидации и подбора параметров
cv_model = cross_validator.fit(train_data)

# Проводим оценку модели на тестовом наборе
predictions = cv_model.transform(test_data)

# Вычисляем значение площади под ROC-кривой
area_under_roc = evaluator.evaluate(predictions)
print("Area under ROC = %g" % area_under_roc)


Area under ROC = 0.999475


In [154]:
# Преобразуйте результаты предсказаний в Pandas DataFrame
predictions_df = predictions.select("label", "prediction", "probability").limit(20).toPandas()

# Выведите первые несколько строк DataFrame
print(predictions_df.head(20))


    label  prediction                                probability
0       0         0.0  [0.5304489898308216, 0.46955101016917844]
1       0         0.0  [0.5312680479654897, 0.46873195203451035]
2       0         0.0  [0.5352670278134856, 0.46473297218651444]
3       0         0.0   [0.5357014566416268, 0.4642985433583732]
4       0         0.0   [0.537338935244245, 0.46266106475575497]
5       0         0.0   [0.5351485605838534, 0.4648514394161466]
6       0         0.0   [0.5354243449122519, 0.4645756550877481]
7       0         0.0     [0.534155499134751, 0.465844500865249]
8       0         0.0   [0.531868299944354, 0.46813170005564597]
9       0         0.0  [0.5333508030575281, 0.46664919694247187]
10      0         0.0     [0.530624068336641, 0.469375931663359]
11      0         0.0   [0.5301406586422228, 0.4698593413577772]
12      0         0.0  [0.5346317098207232, 0.46536829017927683]
13      0         0.0   [0.5304265188566696, 0.4695734811433304]
14      0         0.0  [0

In [155]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Оценка модели с использованием метрик
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 Score: {:.4f}".format(f1_score))

F1 Score: 0.9769


F1 Score равный 0.9554 является довольно высоким показателем и может свидетельствовать о хорошей производительности модели на задаче классификации. F1 Score - это метрика, которая учитывает как точность (precision), так и полноту (recall) модели. Значение 0.9554 говорит о том, что модель успешно сбалансировала компромисс между точностью и полнотой при классификации.

In [156]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Преобразуем DataFrame с предсказаниями в RDD
prediction_and_label = predictions.select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))

# Инициализируем объект MulticlassMetrics
metrics = MulticlassMetrics(prediction_and_label)

# Оценка Accuracy
accuracy = metrics.accuracy
print(f"Accuracy: {accuracy}")

# Оценка Precision для класса 1
precision = metrics.precision(1.0)
print(f"Precision for class 1: {precision}")




Accuracy: 0.9769388134647573
Precision for class 1: 0.9587140439932318


Accuracy: 0.956 (95.6%) - это общая точность модели, то есть доля правильных предсказаний по всем классам.
Precision for class 1: 0.935 (93.5%) - это precision (точность) для положительного класса. Это показывает, какую долю из предсказанных как положительные объектов на самом деле являются положительными.
Общий уровень точности высок, и точность в предсказании положительного класса тоже хороша, но, конечно, оценка зависит от контекста задачи.

In [157]:
# Останавливаем сессию Spark
spark.stop()