In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

# Инициализация SparkSession
spark = SparkSession.builder.master("local[*]").appName("MLExample").getOrCreate()

In [2]:
# загружаем данные
csv = spark.read.csv('flights.csv', inferSchema=True, header=True)

In [3]:
csv.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [4]:
# Создание бинарной метки (1 если задержка больше 15 минут, иначе 0)
data = csv.withColumn("binary_feature", (col("ArrDelay") > 15).cast("Int"))
data.show()

+----------+---------+-------+---------------+-------------+--------+--------+--------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|binary_feature|
+----------+---------+-------+---------------+-------------+--------+--------+--------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|             0|
|        19|        5|     DL|          14869|        12478|       0|      -8|             0|
|        19|        5|     DL|          14057|        14869|      -4|     -15|             0|
|        19|        5|     DL|          15016|        11433|      28|      24|             1|
|        19|        5|     DL|          11193|        12892|      -6|     -11|             0|
|        19|        5|     DL|          10397|        15016|      -1|     -19|             0|
|        19|        5|     DL|          15016|        10397|       0|      -1|             0|
|        19|        5|     DL|          10397|        14869|

In [5]:
# выбор признаков 
selected_features = ["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"]
label_column = "ArrDelay"  # для задачи регрессии, целевая
binary_label_column = "binary_feature"  # для задачи бинарной классификации, целевая

data = csv.select(selected_features + [label_column])

In [6]:
data.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [7]:
# Индексация категориального признака
carrier_indexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIdx")
assembler = VectorAssembler(inputCols=["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
regressor = RandomForestRegressor(labelCol=label_column, featuresCol="features")

In [8]:
# Создание конвейера
pipeline_regression = Pipeline(stages=[carrier_indexer, assembler, regressor])

In [9]:
# Разделение данных на тренировочный и тестовый набор
train_reg, test_reg = data.randomSplit([0.7, 0.3])

In [10]:
# Обучение модели
model_regression = pipeline_regression.fit(train_reg)

In [11]:
# Предсказание на тестовом наборе
predictions_regression = model_regression.transform(test_reg)

In [12]:
# Оценка модели
evaluator_regression = RegressionEvaluator(labelCol=label_column, metricName="rmse")
rmse = evaluator_regression.evaluate(predictions_regression)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 22.106883607125628


In [13]:
# Для задачи бинарной классификации (LogisticRegression)

In [29]:
# загружаем данные
csv = spark.read.csv('flights.csv', inferSchema=True, header=True)
# Создание бинарной метки (1 если задержка больше 15 минут, иначе 0)
data = csv.withColumn("binary_feature", (col("ArrDelay") > 15).cast("Int"))
data.show()
selected_features = ["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"]
label_column = "ArrDelay"  # для задачи регрессии, целевая
binary_label_column = "binary_feature"  # для задачи бинарной классификации, целевая

+----------+---------+-------+---------------+-------------+--------+--------+--------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|binary_feature|
+----------+---------+-------+---------------+-------------+--------+--------+--------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|             0|
|        19|        5|     DL|          14869|        12478|       0|      -8|             0|
|        19|        5|     DL|          14057|        14869|      -4|     -15|             0|
|        19|        5|     DL|          15016|        11433|      28|      24|             1|
|        19|        5|     DL|          11193|        12892|      -6|     -11|             0|
|        19|        5|     DL|          10397|        15016|      -1|     -19|             0|
|        19|        5|     DL|          15016|        10397|       0|      -1|             0|
|        19|        5|     DL|          10397|        14869|

In [30]:
# Индексация категориального признака
carrier_indexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIdx")
assembler = VectorAssembler(inputCols=["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
binary_indexer = StringIndexer(inputCol=binary_label_column, outputCol="label")
classifier = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3)

In [31]:
# Создание конвейера
pipeline_classification = Pipeline(stages=[carrier_indexer, assembler, binary_indexer, classifier])

In [32]:
# Разделение данных на тренировочный и тестовый набор
train_clf, test_clf = data.randomSplit([0.7, 0.3])

In [33]:
selected_features = ["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"]
label_column = "ArrDelay"  # для задачи регрессии, целевая
binary_label_column = "binary_feature"  # для задачи бинарной классификации, целевая

In [34]:
# Обучение модели
model_classification = pipeline_classification.fit(train_clf)

In [35]:
# Предсказание на тестовом наборе
predictions_classification = model_classification.transform(test_clf)

In [36]:
# Оценка модели
evaluator_classification = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator_classification.evaluate(predictions_classification)
print(f"Area Under ROC (AUC): {auc}")

Area Under ROC (AUC): 0.923177067443397


In [37]:
# Подбор гиперпараметров

In [38]:
# Для RandomForestRegressor
paramGrid_regression = ParamGridBuilder().addGrid(regressor.maxDepth, [5, 10]).addGrid(regressor.numTrees, [10, 20]).build()

In [39]:
# Для LogisticRegression
paramGrid_classification = ParamGridBuilder().addGrid(classifier.regParam, [0.3, 0.1]).addGrid(classifier.maxIter, [10, 5]).addGrid(classifier.threshold, [0.4, 0.3]).build()

In [40]:
# Кросс-валидация для RandomForestRegressor
cv_regression = CrossValidator(estimator=pipeline_regression, evaluator=evaluator_regression, estimatorParamMaps=paramGrid_regression, numFolds=2)
model_cv_regression = cv_regression.fit(train_reg)

In [41]:
# Кросс-валидация для LogisticRegression
cv_classification = CrossValidator(estimator=pipeline_classification, evaluator=evaluator_classification, estimatorParamMaps=paramGrid_classification, numFolds=2)
model_cv_classification = cv_classification.fit(train_clf)

In [42]:
# Получение лучших моделей
best_model_regression = model_cv_regression.bestModel
best_model_classification = model_cv_classification.bestModel

In [43]:
# Оценка лучших моделей
predictions_best_regression = best_model_regression.transform(test_reg)
predictions_best_classification = best_model_classification.transform(test_clf)

In [44]:
# Оценка метрик
rmse_best = evaluator_regression.evaluate(predictions_best_regression)
auc_best = evaluator_classification.evaluate(predictions_best_classification)

print(f"Best Model - Regression - RMSE: {rmse_best}")
print(f"Best Model - Classification - AUC: {auc_best}")

Best Model - Regression - RMSE: 19.90802904610459
Best Model - Classification - AUC: 0.9235247423054463
