In [1]:
# Import other modules not related to PySpark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline

In [2]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()
# Укажите путь к CSV файлу
csv_file_path = "itineraries.csv"

# Прочитайте CSV файл в DataFrame
df = spark.read.csv(csv_file_path, header=True, sep=',', inferSchema=True)

drop_columns = ['legId', 'searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'segmentsCabinCode', 'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw', 'segmentsArrivalTimeEpochSeconds',
                'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode', 'segmentsAirlineName', 'segmentsAirlineCode', 'segmentsEquipmentDescription',
               'segmentsDurationInSeconds', 'segmentsDistance', 'segmentsCabinCode']

df = df.select([col for col in df.columns if col not in drop_columns])

limited_data_pandas = df.limit(10)

limited_data_pandas.toPandas()

Unnamed: 0,travelDuration,elapsedDays,isBasicEconomy,isRefundable,isNonStop,baseFare,totalFare,seatsRemaining,totalTravelDistance
0,PT2H29M,0,False,False,True,217.67,248.6,9,947
1,PT2H30M,0,False,False,True,217.67,248.6,4,947
2,PT2H30M,0,False,False,True,217.67,248.6,9,947
3,PT2H32M,0,False,False,True,217.67,248.6,8,947
4,PT2H34M,0,False,False,True,217.67,248.6,9,947
5,PT2H38M,0,False,False,True,217.67,248.6,7,947
6,PT4H12M,0,False,False,False,213.02,251.1,3,956
7,PT5H18M,0,False,False,False,213.02,251.1,3,956
8,PT5H32M,0,False,False,False,213.02,251.1,7,956
9,PT6H38M,0,False,False,False,213.02,251.1,7,956


In [3]:
from pyspark.sql.functions import col, regexp_extract
# Функция для преобразования формата "PT2H29M" в минуты и записи в новую колонку
def convert_duration_to_minutes(df, column):
    # Извлекаем часы и минуты из строки
    hours = regexp_extract(col(column), r'PT(\d+)H', 1).cast('int')
    minutes = regexp_extract(col(column), r'(\d+)M', 1).cast('int')
    
    # Выполняем необходимые вычисления
    minutes_in_total = hours * 60 + minutes
    
    # Добавляем новую колонку
    df = df.withColumn(f"{column}_inMinutes", minutes_in_total)
    
    return df

# Применяем функцию для указанной колонки "travelDuration"
df = convert_duration_to_minutes(df, "travelDuration")

df = df.drop("travelDuration")

# Печатаем DataFrame с новой колонкой
limited_data_pandas = df.limit(10)

limited_data_pandas.toPandas()

Unnamed: 0,elapsedDays,isBasicEconomy,isRefundable,isNonStop,baseFare,totalFare,seatsRemaining,totalTravelDistance,travelDuration_inMinutes
0,0,False,False,True,217.67,248.6,9,947,149
1,0,False,False,True,217.67,248.6,4,947,150
2,0,False,False,True,217.67,248.6,9,947,150
3,0,False,False,True,217.67,248.6,8,947,152
4,0,False,False,True,217.67,248.6,9,947,154
5,0,False,False,True,217.67,248.6,7,947,158
6,0,False,False,False,213.02,251.1,3,956,252
7,0,False,False,False,213.02,251.1,3,956,318
8,0,False,False,False,213.02,251.1,7,956,332
9,0,False,False,False,213.02,251.1,7,956,398


In [4]:
from pyspark.sql.functions import col, when
# Преобразование колонки elapsedDays в булевый тип
df_transformed = df.withColumn(
    "elapsedDays_bool",
    when(col("elapsedDays") == 0, False).otherwise(True)
)

df = df_transformed
df = df.drop("elapsedDays")

# Печатаем DataFrame с новой колонкой
limited_data_pandas = df.limit(10)

limited_data_pandas.toPandas()

Unnamed: 0,isBasicEconomy,isRefundable,isNonStop,baseFare,totalFare,seatsRemaining,totalTravelDistance,travelDuration_inMinutes,elapsedDays_bool
0,False,False,True,217.67,248.6,9,947,149,False
1,False,False,True,217.67,248.6,4,947,150,False
2,False,False,True,217.67,248.6,9,947,150,False
3,False,False,True,217.67,248.6,8,947,152,False
4,False,False,True,217.67,248.6,9,947,154,False
5,False,False,True,217.67,248.6,7,947,158,False
6,False,False,False,213.02,251.1,3,956,252,False
7,False,False,False,213.02,251.1,3,956,318,False
8,False,False,False,213.02,251.1,7,956,332,False
9,False,False,False,213.02,251.1,7,956,398,False


In [5]:
# Удаляем строки с пустыми значениями
df = df.na.drop()

for col_name in df.columns:
    count_missing = df.filter(col(col_name).isNull()).count()
    print(f"Количество пустых значений в колонке '{col_name}': {count_missing}")

Количество пустых значений в колонке 'isBasicEconomy': 0
Количество пустых значений в колонке 'isRefundable': 0
Количество пустых значений в колонке 'isNonStop': 0
Количество пустых значений в колонке 'baseFare': 0
Количество пустых значений в колонке 'totalFare': 0
Количество пустых значений в колонке 'seatsRemaining': 0
Количество пустых значений в колонке 'totalTravelDistance': 0
Количество пустых значений в колонке 'travelDuration_inMinutes': 0
Количество пустых значений в колонке 'elapsedDays_bool': 0


# LinearRegression: Предсказание стоимости билета

In [6]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import Normalizer

# Подготовка данных для модели
feature_columns = ['isBasicEconomy', 'isNonStop', 'baseFare', 'totalTravelDistance', 'travelDuration_inMinutes', 'elapsedDays_bool']

# Создадим столбец features, который объединяет все признаки в один вектор
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
regression_data = assembler.transform(df).select("features", "totalFare")

# Разделение данных на тренировочный и тестовый наборы
train_data, test_data = regression_data.randomSplit([0.8, 0.2], seed=42)

normalizer_link = Normalizer(inputCol='features', outputCol='features', p=1.0)

# Создание модели линейной регрессии
lr = LinearRegression(featuresCol="features", labelCol="totalFare", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Создание пайплайна
pipeline = Pipeline(stages=[lr])

# Создание сетки параметров для поиска по сетке
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Оценщик для регрессии
mse_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="mse")
# mae_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="mae")
# rmse_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")
# r2_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="r2")

# Создание кросс-валидатора
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=mse_evaluator,
                          numFolds=3)  # Количество складываний для кросс-валидации

# Обучение модели
cv_model = crossval.fit(train_data)

# Получение прогнозов на тестовых данных
predictions = cv_model.transform(test_data)

# Вывод предсказаний
predictions.select("totalFare", "prediction").show()

# Оценка модели
mse_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)
print(f"Mean Squared Error: {mse}")

mae_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error: {mae}")

rmse_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Squared Error: {rmse}")

r2_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R^2: {r2}")

# Получение лучших параметров
best_model = cv_model.bestModel
best_params = best_model.stages[0].extractParamMap()
print("Best Parameters:")
for key, value in best_params.items():
    print(f"{key.name}: {value}")

+---------+-----------------+
|totalFare|       prediction|
+---------+-----------------+
|    64.38|65.25740276866291|
|    69.98|69.78572289366353|
|    69.98|69.78572289366353|
|    64.38|62.96647222008511|
|    64.38|64.11165543403133|
|    69.98|69.42839137662435|
|    64.38|63.35657719775198|
|    64.38|64.95736282224823|
|    73.98|73.65968973210133|
|    73.98|73.65968973210133|
|    73.98|74.06099206590272|
|    73.98|74.42581236935857|
|    73.98|74.79063267281438|
|    73.98|74.95480180936951|
|    79.58|77.96456931288009|
|    73.98|75.58833778984186|
|    74.37| 73.5320028058544|
|    74.37|74.86184036405189|
|    78.47|78.45340519038682|
|    79.97|78.71707450678036|
+---------+-----------------+
only showing top 20 rows

Mean Squared Error: 1.969412365468528
Mean Absolute Error: 0.9693204953329867
Root Mean Squared Error: 1.4033575330144945
R^2: 0.9999554084172494
Best Parameters:
aggregationDepth: 2
elasticNetParam: 0.0
epsilon: 1.35
featuresCol: features
fitIntercept: 

- R^2 - коэффициент детерминации
- aggregationDepth (Глубина агрегации): Глубина агрегации используется для управления глубиной агрегации в реализации регуляризации. Это может повлиять на производительность и распределение вычислений в распределенных вычислительных средах.
- 
elasticNetParam (Параметр Elastic Net): Этот параметр контролирует смешивание L1 (Lasso) и L2 (Ridge) регуляризации. Значение 0.0 означает использование только L2 (Ridge), а 1.0 - только L1 (Lassoрия остановки оптимизации.

- epsilon (Эпсилон): Эпсилон является параметром оптимизации и используется для определения, когда остановить итерацию оптимизации.
- featuresCol (Колонка признаков): Название колонки, содержащей вектор признаков.
- fitIntercept (Оценка интерсепта): Если установлен в True, то модель оценивает интерсепт (свободный член).
- labelCol (Колонка меток): Название колонки, содержащей целевую переменную.

- loss (Функция потерь): Функция потерь, используемая при оптимизации. В данном случае, squaredError означает квадратичную функцию потерь.

- maxBlockSizeInMB (Максимальный размер блока в мегабайтах): Максимальный размер блока для вычислений в распределенной среде.

- maxIter (Максимальное количество итераций): Максимальное количество итераций оптимизации.

- predictionCol (Колонка предсказания): Название колонки, в которой будут сохранены предсказанные значения.

- regParam (Параметр регуляризации): Коэффициент регуляризации, который управляет силой регуляризации. Значение 0.01 указывает на относительно небольшую силу регуляризации.

- solver (Метод решения): Метод оптимизации для нахождения весов модели. В данном случае, auto означает автоматический выбор между "l-bfgs" и "normal".

- standardization (Стандартизация): Если установлен в True, то признаки будут стандартизированы перед обучением модели.

- tol (Точность): Параметр точности, используемый для определения критерия остановки оптимизации.

# GradientBoostingMachine: Определение прямых рейсов

In [7]:
from pyspark.sql.functions import when

# Используйте функцию when для создания новой колонки
df = df.withColumn("isNonStop_numeric", when(df["isNonStop"] == "true", 1).otherwise(0))

# Удалите оригинальную колонку isNonStop, если это необходимо
df = df.drop("isNonStop")

# Переименуйте новую колонку, чтобы сделать ее более информативной
df = df.withColumnRenamed("isNonStop_numeric", "isNonStop")

# Выведите результат
df.show()

+--------------+------------+--------+---------+--------------+-------------------+------------------------+----------------+---------+
|isBasicEconomy|isRefundable|baseFare|totalFare|seatsRemaining|totalTravelDistance|travelDuration_inMinutes|elapsedDays_bool|isNonStop|
+--------------+------------+--------+---------+--------------+-------------------+------------------------+----------------+---------+
|         false|       false|  217.67|    248.6|             9|                947|                     149|           false|        1|
|         false|       false|  217.67|    248.6|             4|                947|                     150|           false|        1|
|         false|       false|  217.67|    248.6|             9|                947|                     150|           false|        1|
|         false|       false|  217.67|    248.6|             8|                947|                     152|           false|        1|
|         false|       false|  217.67|    248.6|

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# # Используйте метод sample для создания случайной подвыборки
# sampled_df = df.sample(withReplacement=False, fraction=0.3, seed=42)

# Разделение данных на тренировочный и тестовый наборы
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Создание ассемблера для объединения признаков в одну колонку "features"
feature_columns = ['baseFare', 'totalFare', 'totalTravelDistance', 'travelDuration_inMinutes', 'elapsedDays_bool']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Создание модели GBTClassifier
gbt = GBTClassifier(featuresCol="features", labelCol="isNonStop", seed=42)

# Создание пайплайна
pipeline = Pipeline(stages=[assembler, gbt])

# Создание сетки параметров
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .addGrid(gbt.maxIter, [5, 10]) \
    .build()

# Оценщик производительности
evaluator = BinaryClassificationEvaluator(labelCol="isNonStop", metricName="areaUnderROC")

# Кросс-валидация с подбором параметров
cross_validator = CrossValidator(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=3)  # Выберите количество складок по своему усмотрению

# Обучение модели с подбором параметров
cv_model = cross_validator.fit(train_data)

# Прогнозирование на тестовых данных
predictions = cv_model.transform(test_data)

# Оценка производительности модели
area_under_roc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {area_under_roc}")

# Получение лучших параметров
best_params = cv_model.bestModel.stages[-1].extractParamMap()
print("Best Parameters:")
for param, value in best_params.items():
    print(f"{param.name}: {value}")


Area Under ROC: 0.9999528230216939
Best Parameters:
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: isNonStop
leafCol: 
lossType: logistic
maxBins: 32
maxDepth: 5
maxIter: 10
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 42
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01


-cacheNodeIds: Определяет, кэшировать ли идентификаторы узлов для ускорения обучения. В данном случае, установлено в False.

-checkpointInterval: Интервал для проверки точек во время обучения.

-featureSubsetStrategy: Стратегия выбора подмножества признаков. Здесь установлено в all, что означает использование всех признаков.

-impurity: Мера неопределенности для построения дерева решений. В данном случае, используется variance для задачи регрессии.

-labelCol: Колонка с метками класса (ваш целевой признак). Здесь установлено в isNonStop.

-maxBins: Максимальное количество корзин, используемых при построении дерева.

-maxDepth: Максимальная глубина каждого дерева в ансамбле.

-maxIter: Максимальное количество итераций (деревьев) в градиентном бустинге.

-minInfoGain: Минимальный прирост информации, необходимый для разделения узла.

-minInstancesPerNode: Минимальное количество экземпляров, требуемых для создания узла.

-minWeightFractionPerNode: Минимальная доля общего веса обучающих экземпляров, необходимая для создания узла.

-seed: Зерно для инициализации случайных чисел для воспроизводимости.

-stepSize: Размер шага (learning rate) для обновления весов при обучении.

-subsamplingRate: Доля данных, используемых для обучения каждого дерева.

-validationTol: Параметр для остановки обучения, если значение не улучшается достаточно быстро.лучшается достаточно быстро.