# Тетрадка с ML моделями
**Установка основных библиотек и spark сессии**

In [1]:
!pip install pyspark



In [2]:
import os 
import pandas as pd
import numpy as np

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.regression import DecisionTreeRegressor

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import seaborn as sns
import matplotlib.pyplot as plt
#visual
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 480)

from matplotlib import rcParams 
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
#format
from pyspark.sql.functions import (to_timestamp, date_trunc, month, 
                                   year, dayofweek, avg, hour, 
                                   dayofmonth, lag, expr,
                                   sin, cos, lit)

from pyspark.sql.window import Window
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed 
np.random.set_state=rnd_seed

In [3]:
SPARK_MASTER_IP = '172.18.0.2'

#spark = SparkSession.builder.appName("pyspark-taxi-forecasting") \
#    .master(f"spark://{SPARK_MASTER_IP}:7077") \
#    .config("spark.executor.cores", 1) \
#    .config("spark.task.cpus", 1) \
#    .getOrCreate()



spark = SparkSession.builder.appName("pyspark-taxi-forecasting") \
        .config("spark.executor.cores", 1) \
        .config("spark.task.cpus", 1) \
        .master("local[*]") \
        .config('spark.driver.bindAddress','localhost') \
        .config('spark.ui.port', 4040) \
        .getOrCreate()

In [4]:
spark.getActiveSession()

In [5]:
sqlContext = SQLContext(spark.sparkContext)



# Основные функции

In [6]:
#группируем по 1 часу и районам
def tables_area(df):
    df.createOrReplaceTempView("table")

    tables_dict = {}
    
    for i in range(1, 78):
        name = i
        tab = spark.sql(f"""
            select datetime, 
                   sum(count_1h) as count_1h
            from (select
                        date_format(datetime, "yyyy-MM-dd HH:00:00") as datetime,
                        count(*) as count_1h
                              from (select id, datetime from table where area = {i}) as ar
                    GROUP BY datetime
                    ORDER BY datetime) as t
            GROUP BY datetime
            ORDER BY datetime
                        """)
        # Сохраняем таблицу в словаре с использованием имени как ключа
        tables_dict[name] = tab
    
    return tables_dict  # Возвращаем словарь с таблицами

Думаю, на этом моменте нужно уделить по больше внимания. ДАнная функция генерит временные признаки, лаг количества заказов и среднее кол-во заказов за определенный диапазон. 

Так же эта функция поддерживает как словарь, так и обычную таблицу. + Дописал отдельный исход действий для дополнительной тестовой таблицы(третья тестовая таблица). Правда не совсем понял из каких значений делать для нее лаги.


In [7]:
#сознадие признаков

def features_area(data, max_lag, rolling_mean):
    
    if isinstance(data, dict):
    
        for i in range(1, 78):
            #год
            #sdf = sdf.withColumn("Year", year("datetime"))#спорно
            #месяц
            data[i] = data[i].withColumn("month", month("datetime"))
            #день
            data[i] = data[i].withColumn("day", dayofmonth("datetime"))
            #день недели
            data[i] = data[i].withColumn("day_week", dayofweek("datetime"))
            #час 
            data[i] = data[i].withColumn("hour", hour("datetime"))

            #lag
            for m in range(1, max_lag+1):
                name = 'lag_' + str(m)
                # Определяем окно (порядок сортировки) для использования lag
                window_spec = Window.orderBy("datetime")
                data[i] = data[i].withColumn(name, lag("count_1h", m).over(window_spec))

            # rolling mean
            window_spec = Window.orderBy("datetime")
            data[i] = data[i].withColumn("rolling_mean", avg("count_1h").over(window_spec.rowsBetween(-rolling_mean-1, -1)))

            #удаление пропусков
            data[i] = data[i].na.drop()
        
        return data

    elif "area" in sdf_test.columns:
        #месяц
        data = data.withColumn("month", month("datetime"))
            #день
        data = data.withColumn("day", dayofmonth("datetime"))
            #день недели
        data = data.withColumn("day_week", dayofweek("datetime"))
            #час 
        data = data.withColumn("hour", hour("datetime"))
        
        #пустой лаг
        for i in range(1, max_lag+1):
            lag_col_name = f"lag_{i}"
            data = data.withColumn(lag_col_name, lit(0))
        #mean как и таргет
        data = data.withColumn("rolling_mean", data.count_1h)
        return data
    
    else:
        #год
        #sdf = sdf.withColumn("Year", year("datetime"))#спорно
        #месяц
        data = data.withColumn("month", month("datetime"))
        #день
        data = data.withColumn("day", dayofmonth("datetime"))
        #день недели
        data = data.withColumn("day_week", dayofweek("datetime"))
        #час 
        data = data.withColumn("hour", hour("datetime"))

        #lag
        for m in range(1, max_lag+1):
            name = 'lag_' + str(m)
            # Определяем окно (порядок сортировки) для использования lag
            window_spec = Window.orderBy("datetime")
            data = data.withColumn(name, lag("count_1h", m).over(window_spec))

            # rolling mean
            window_spec = Window.orderBy("datetime")
            data = data.withColumn("rolling_mean", avg("count_1h").over(window_spec.rowsBetween(-rolling_mean-1, -1)))

            #удаление пропусков
            data = data.na.drop()
        return data

 Данная функция создана для разбиения данных на выборки и не происходило перемешивание. 

Тренировочная выборка составила 80 %, валидационная и тренировочная по 10 %

Поддержи типвает словарь и обычную таблу.
% 


In [8]:
#разделение на 3 выборки 
def split(data):
    #размеры 
    train_ratio = 0.8
    validation_ratio = 0.1
    test_ratio = 0.1

    #создание 3 новых словарей 
    df_train = {}
    df_validation = {}
    df_test = {}
    if isinstance(data, dict):
        for i in range(1, 78):
        
            # Рассчитайте индексы для разделения данных
            total_count = data[i].count()
            train_split_index = int(total_count * train_ratio)
            validation_split_index = int(total_count * (train_ratio + validation_ratio))

            # Разделите данные на тренировочную, валидационную и тестовую выборки
            df_train_1 = data[i].limit(train_split_index)
            df_validation_1 = data[i].limit(validation_split_index).subtract(df_train_1)
            df_test_1 = data[i].subtract(df_train_1).subtract(df_validation_1)

            #добавление в словарь
            df_train[i] = df_train_1
            df_validation[i] = df_validation_1
            df_test[i] = df_test_1
        return df_train,df_validation,df_test

    else:
        # Рассчитайте индексы для разделения данных
        total_count = data.count()
        train_split_index = int(total_count * train_ratio)
        validation_split_index = int(total_count * (train_ratio + validation_ratio))

        # Разделите данные на тренировочную, валидационную и тестовую выборки
        df_train_1 = data.limit(train_split_index)
        df_validation_1 = data.limit(validation_split_index).subtract(df_train_1)
        df_test_1 = data.subtract(df_train_1).subtract(df_validation_1)

        #добавление в словарь
        df_train = df_train_1
        df_validation = df_validation_1
        df_test = df_test_1
        return df_train,df_validation,df_test

Функция МАПЕ (Mean Absolute Percentage Error) рассчитывается вручную тк библиотеки подходящей не нашел. Она измеряет среднюю абсолютную процентную ошибку между фактическими и прогнозируемыми значениями.

In [9]:
#ф-ция оценки МАПЕ
def mape(df, label_col, prediction_col):  #target_pred, 
    
    # Добавьте новую колонку с абсолютной разницей между предсказаниями и фактическими значениями
    target_pred_mape = df.withColumn("abs_error", expr(f"abs({label_col} - {prediction_col})"))

    # Рассчитываем абсолютную процентную ошибку (MAPE) для каждой строки
    df_with_mape = target_pred_mape.withColumn("mape", (expr("abs_error") / col(label_col)) * 100)

    # Рассчитываем среднюю MAPE
    mape = df_with_mape.selectExpr("avg(mape) as average_mape").collect()[0][0]

    return  mape

Кодирую временные признаки отдельно чтобы избежать перепада между 23 и 0 часами. Если кодировать обычным методом, то модель, которая будет обучатся на этих признаках будет воспринимать 23 ч. как большую переменную, а 0 ч. как меньшую. Поэтому все временные данные кодирую в виде sin и cos

In [10]:
#кодирование временных признаков
def encode(data, col_nam, max_val):
    nam_sim = col_nam + '_sin'
    nam_cos = col_nam + '_cos'
    if isinstance(data, dict):
        for i in range(1, 78):
                
            data[i] = data[i].withColumn(nam_sim, sin(2 * 3.14159 * col(col_nam) / max_val))
            feature_time_new.append(nam_sim)
        
            data[i] = data[i].withColumn(nam_cos, cos(2 * 3.14159 * col(col_nam) / max_val))
            feature_time_new.append(nam_cos)
        return data

    else:
        data = data.withColumn(nam_sim, sin(2 * 3.14159 * col(col_nam) / max_val))
        feature_time_new.append(nam_sim)
        
        data = data.withColumn(nam_cos, cos(2 * 3.14159 * col(col_nam) / max_val))
        feature_time_new.append(nam_cos)
        return data

# общий запрос запуска

In [11]:
taxi_22 = "fails/Taxi_Trips_-_2022.csv"
taxi_23 = "fails/Taxi_Trips_-_2023.csv"

sdf_22 = spark.read.csv(path=taxi_22, header=True, inferSchema=True)
#выбор столбцов {id аменить на другую фичу, нужна для caunt()}
sdf_22 = sdf_22.select(["Trip ID", "Trip Start Timestamp", 'Pickup Community Area'])

sdf_23 = spark.read.csv(path=taxi_23, header=True, inferSchema=True)
#выбор столбцов {id аменить на другую фичу, нужна для caunt()}
sdf_23 = sdf_23.select(['Trip ID', 'Trip Start Timestamp', 'Pickup Community Area'])

sdf = sdf_22.union(sdf_23)
sdf = sdf.withColumnRenamed("Trip Start Timestamp", "datetime")
sdf = sdf.withColumnRenamed("Pickup Community Area", "area")
sdf = sdf.withColumnRenamed("Trip ID", "id")
#изменение формата в столбце с датой и временем начала поездки  
timestamp_format = "MM/dd/yyyy hh:mm:ss a"

sdf = sdf.withColumn("datetime", to_timestamp("datetime", timestamp_format))

sdf = sdf.na.drop()


    #вырезаем данные которые нужно будет потом предсказать на лучшей модели
lim = '2023-07-31 23:00'
sdf = sdf.filter(F.to_timestamp(F.col('datetime'), 'MM/dd/yyyy hh:mm:ss a')<lim)

sdf.cache()
#группировка по 1 часу
table_areas = tables_area(sdf)
#mime seria
time = sdf.select('datetime').groupby(date_trunc('hour', 'datetime')).count()\
       .withColumnRenamed("date_trunc(hour, datetime)", "datetime")\
       .orderBy('datetime')\
       .select('datetime')
#заполнение недостающих часов 0
for i in range(1, 78):
    table_areas[i] = table_areas[i]\
                     .join(time, on='datetime', how='right_outer')\
                     .orderBy('datetime')\
                     .fillna(0)
    table_areas[i].cache()
                     #.write.csv(f'area_1_hours/{i}_area.csv')

DataFrame[id: string, datetime: timestamp, area: int]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

DataFrame[datetime: timestamp, count_1h: bigint]

# Обработка данных перед обучением 
Создаю признаки, кодирую временные и все остальные созданные признаки стандартизирую. Все фичи соединяю в единый вектор и выгружаю созданную таблицу, чтобы при перезапуске ядра не возвращается к этому процессу повторно. 

P.S. если кто-то будет это запускать когда-,то то нужно выбрать папку для выгрузки табл)ц.


In [12]:
#генерация признаков
features_area(table_areas, 4, 3);

In [13]:
#сохраняю всю таблицу
#for i in range(1, 78):
#    table_areas[i].write.parquet(f'arima/{i}_feature_area.parquet');

In [14]:
#кодирование временных признаков 
feature_time_new = []
feature_all = table_areas[1].drop('datetime','count_1h',
                                   'month', 'day', 
                                   'day_week', 'hour'
                                   ).columns
#Кодируем месяцы
encode(table_areas, 'month', 12);

#Кодируем часы
encode(table_areas, 'hour', 24);

#Кодируем дни
encode(table_areas, 'day', 31);

#Кодируем день недели
encode(table_areas, 'day_week', 7);

#сохранение уникальных временных признаков
feature_time_new = list(set(feature_time_new))
feature_time_new.append('features_scaled');

In [15]:
#стандартизация всех остальных признаков
for i in range(1, 78):
    #создание вектора
    assembler = VectorAssembler(inputCols=feature_all, outputCol="features_no_time")
    table_areas[i] = assembler.transform(table_areas[i])

    # Инициализируем `standardScaler`
    standardScaler = StandardScaler(inputCol='features_no_time', outputCol="features_scaled")
    # Обучим 
    table_areas[i] = standardScaler.fit(table_areas[i]).transform(table_areas[i])

    #создание общего вектора 
    assembler2 = VectorAssembler(inputCols=feature_time_new, outputCol="features_all")
    table_areas[i] = assembler2.transform(table_areas[i])

    table_areas[i] = table_areas[i].select('datetime', "count_1h", "features_all")
    
    #сохраняю таблицу с временем, заказами и вектором признаков
    #table_areas[i].write.parquet(f'area_1_hours/{i}_area.parquet');

In [16]:
#table_areas[2].printSchema()

**Завершаю спарк сессию, и перезапускаю ее заново с обновлением ядра. Чтобы выгрузить все ненужное из кэша и освободить память.**

Загружаю таблицы в новый словарь и все готово к обучению моделей.

In [17]:
#завершаем спарк сессию 
spark.stop()

In [18]:
#запускаем новую
spark = SparkSession.builder.appName("pyspark-taxi-forecasting") \
        .config("spark.executor.cores", 1) \
        .config("spark.task.cpus", 1) \
        .master("local[*]") \
        .config('spark.driver.bindAddress','localhost') \
        .config('spark.ui.port', 4040) \
        .getOrCreate()

In [19]:
schema = StructType([
    StructField("datetime", TimestampType(), nullable=False),
    StructField("count_1h", LongType(), nullable=False),
    StructField("features_all", VectorUDT(), nullable=False)
])

In [20]:
#загрузки районо с диска 
dikt = {}
for i in range(1, 78):
    path_to_parquet_files = f'area_1_hours/{i}_area.parquet'
    dikt[i] = spark.read.schema(schema).parquet(path_to_parquet_files)
    dikt[i].cache();

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

DataFrame[datetime: timestamp, count_1h: bigint, features_all: vector]

# Модель линейной регрессии 
обучаю модель для каждого района по отдельности пытаюсь добиться самой лучшей метрик на валидационной выборкеи, а потом только делаю предсказание на тестовой выборк.


In [21]:
df_train, df_validation, df_test = split(dikt)

#задание параметров модели
#предикт и таргет
label_col = 'count_1h'
prediction_col = 'pred_lr'
features = 'features_all'
#оценка качества модели
mae_all_val = []
mae_all_test = []

mape_all_val = []
mape_all_test = []

models_lr = {}

# Инициилизируем модель
lr = (LinearRegression(featuresCol=features,
                       labelCol=label_col, 
                       predictionCol=prediction_col, 
                       standardization=False,
                       maxIter=100, 
                       regParam=0.001)) 

for i in range(1, 78):

    # Обучиаем модель на данных
    models_lr[i] = lr.fit(df_train[i])

    #предсказание
    df_validation[i] = models_lr[i].transform(df_validation[i])
    #MAE
    evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
    mae_val = format(evaluator.evaluate(df_validation[i]))
    mae_all_val.append(float(mae_val))
    #MAPE
    result_val = mape(df_validation[i], label_col, prediction_col)
    mape_all_val.append(result_val)

In [22]:
'Среднее МАЕ по всем районам:', sum(mae_all_val)/len(mae_all_val)
'Среднее МАРЕ по всем районам:', sum(mape_all_val)/len(mape_all_val)

('Среднее МАЕ по всем районам:', 2.398187916830095)

('Среднее МАРЕ по всем районам:', 54.696354021268846)

In [23]:
#предсказание test 
for i in range(1, 78):
     
    df_test[i] = models_lr[i].transform(df_test[i])
    #MAE
    evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
    mae_test = format(evaluator.evaluate(df_test[i]))
    mae_all_test.append(float(mae_test))
    #MAPE
    result_test = mape(df_validation[i], label_col, prediction_col)
    mape_all_test.append(result_test)

In [24]:
'Среднее МАЕ по всем районам на тестостовой выборкуе:', sum(mae_all_test)/len(mae_all_test)
'Среднее МАРЕ по всем районам на тестостовой выборкуе:', sum(mape_all_test)/len(mape_all_test)

('Среднее МАЕ по всем районам на тестостовой выборкуе:', 2.3826978859408636)

('Среднее МАРЕ по всем районам на тестостовой выборкуе:', 54.696354021268846)

# Модель Линейной грегрессии на 1 
обучал тестово для одного района, просто решил оставить на память) 

In [25]:
#table_areas[7].show(1, truncate=False)

In [26]:
#генерация признаков
#table_areas[7] = features_area(table_areas[7], 4, 6);

In [27]:
#кодирование временных признаков 
#feature_time_new = []
#feature_all = table_areas[7].drop('datetime','count_1h',
#                                   'month', 'day', 
#                                   'day_week', 'hour'
#                                   ).columns
#Кодируем месяцы
#table_areas[7] = encode(table_areas[7], 'month', 12);

#Кодируем часы
#table_areas[7] = encode(table_areas[7], 'hour', 24);

#Кодируем дни
#table_areas[7] =encode(table_areas[7], 'day', 31);

#Кодируем день недели
#table_areas[7] =encode(table_areas[7], 'day_week', 7);

#сохранение уникальных временных признаков
#feature_time_new = list(set(feature_time_new))
#feature_time_new.append('features_scaled');

In [28]:
#1 район 
#создание вектора 
#assembler = VectorAssembler(inputCols=feature_all, outputCol="features_no_time")
#table_areas[7] = assembler.transform(table_areas[7])
  # Инициализируем `standardScaler`
#standardScaler = StandardScaler(inputCol='features_no_time', outputCol="features_scaled")
    # Обучим 
#table_areas[7] = standardScaler.fit(table_areas[7]).transform(table_areas[7])
    #создание общего вектора 
#assembler2 = VectorAssembler(inputCols=feature_time_new, outputCol="features_all")
#table_areas[7] = assembler2.transform(table_areas[7])

#table_areas[7] = table_areas[7].select('datetime', "count_1h", "features_all")
    
    #сохраняю таблицу с временем, заказами и вектором признаков
#table_areas[6].write.parquet(f'area_1_hours/{i}_area.parquet');

In [29]:
#df_train, df_validation, df_test = split(table_areas[7])

# Инициилизируем модель
#задание параметров модели
#предикт и таргет
#label_col = 'count_1h'
#prediction_col = 'pred_lr'
#features = 'features_all'
#оценка качества модели
#mae_all_val = []
#mae_all_test = []
#mape_all_val = []
#mape_all_test = []

#models_lr = {}

# Инициилизируем модель
#lr = (LinearRegression(featuresCol=features,
#                       labelCol=label_col, 
#                       predictionCol=prediction_col, 
#                       standardization=False,
#                       maxIter=100, regParam=0.001))
# Обучиаем модель на данных
#models_lr = lr.fit(df_train)

    #предсказание
#df_validation = models_lr.transform(df_validation)
    #MAE
#evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
#mae_val = format(evaluator.evaluate(df_validation))
#mae_all_val.append(float(mae_val))
    #MAPE
#result_val = mape(df_validation, label_col, prediction_col)
#mape_all_val.append(result_val)

#mae_all_val
#mape_all_val

# Модель Деревья решений (Decision Trees) 

## Для 1 района 

In [30]:
#генерация признаков
#table_areas[7] = features_area(table_areas[7], 8, 3);


#кодирование временных признаков 
#feature_time_new = []
#feature_all = table_areas[7].drop('datetime','count_1h',
                                   #'month', 'day', 
#                                   'day_week', 
#                                  'hour'
#                                   ).columns
#Кодируем месяцы
#table_areas[7] = encode(table_areas[7], 'month', 12);

#Кодируем часы
#table_areas[7] = encode(table_areas[7], 'hour', 24);

#Кодируем дни
#table_areas[7] =encode(table_areas[7], 'day', 31);

#Кодируем день недели
#table_areas[7] =encode(table_areas[7], 'day_week', 7);

#сохранение уникальных временных признаков
#feature_time_new = list(set(feature_time_new))
#feature_time_new.append('features_scaled');
#1 район 
#создание вектора 
#assembler = VectorAssembler(inputCols=feature_all, outputCol="features_no_time")
##table_areas[7] = assembler.transform(table_areas[7])
  # Инициализируем `standardScaler`
#standardScaler = StandardScaler(inputCol='features_no_time', outputCol="features_scaled")
    # Обучим 
#table_areas[7] = standardScaler.fit(table_areas[7]).transform(table_areas[7])
    #создание общего вектора 
#assembler2 = VectorAssembler(inputCols=feature_time_new, outputCol="features_all")
#table_areas[7] = assembler2.transform(table_areas[7])

#table_areas[7] = table_areas[7].select('datetime', "count_1h", "features_all")
    
    #сохраняю таблицу с временем, заказами и вектором признаков
#table_areas[6].write.parquet(f'area_1_hours/{i}_area.parquet');

#df_train, df_validation, df_test = split(table_areas[7])

In [31]:
#df_train, df_validation, df_test = split(table_areas[7])

# Инициилизируем модель
#задание параметров модели
#предикт и таргет
#label_col = 'count_1h'
#prediction_col = 'pred_tree'
#features = 'features_all'
#оценка качества модели
#mae_all_val = []
#mae_all_test = []
#mape_all_val = []
#mape_all_test = []

#models_lr = {}

# Инициилизируем модель
#tree = (DecisionTreeRegressor(featuresCol=features,
#                              labelCol=label_col, 
#                              predictionCol=prediction_col, 
                              
#                              maxBins=32,
#                              maxDepth=10, 
#                              minInstancesPerNode=10))
# Обучиаем модель на данных
#models_tree = tree.fit(df_train)

    #предсказание
#df_validation = models_tree.transform(df_validation)
    #MAE
#evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
#mae_val = format(evaluator.evaluate(df_validation))
#mae_all_val.append(float(mae_val))
    #MAPE
#result_val = mape(df_validation, label_col, prediction_col)
#mape_all_val.append(result_val)

#mae_all_val
#mape_all_val

In [32]:
    #предсказание test
#df_test = models_tree.transform(df_test)
    #MAE
#evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
#mae_test = format(evaluator.evaluate(df_test))
#mae_all_test.append(float(mae_test))
    #MAPE
#result_test = mape(df_test, label_col, prediction_col)
#mape_all_val.append(result_test)

#mae_all_val
#mape_all_val

## Для всех районов

In [33]:
df_train, df_validation, df_test = split(dikt)

#задание параметров модели
#предикт и таргет
label_col = 'count_1h'
prediction_col = 'pred_tree'
features = 'features_all'

#оценка качества модели
mae_all_val = []
mae_all_test = []

mape_all_val = []
mape_all_test = []

models_tree = {}

In [34]:
# Инициилизируем модель
tree = (DecisionTreeRegressor(featuresCol=features,
                              labelCol=label_col, 
                              predictionCol=prediction_col, 
                              
                              maxBins=25,
                              maxDepth=10, 
                              minInstancesPerNode=5
                             ))
# Обучиаем модели
for i in range(1, 78):
    
    models_tree[i] = tree.fit(df_train[i])

    #предсказание
    df_validation[i] = models_tree[i].transform(df_validation[i])
    #MAE
    evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
    mae_val = format(evaluator.evaluate(df_validation[i]))
    mae_all_val.append(float(mae_val))
    #MAPE
    result_val = mape(df_validation[i], label_col, prediction_col)
    mape_all_val.append(result_val)

In [35]:
'Среднее МАЕ по всем районам:', sum(mae_all_val)/len(mae_all_val)
'Среднее МАРЕ по всем районам:', sum(mape_all_val)/len(mape_all_val)

('Среднее МАЕ по всем районам:', 2.310819470266529)

('Среднее МАРЕ по всем районам:', 57.26886464778424)

In [36]:
#предсказание test 
for i in range(1, 78):
     
    df_test[i] = models_tree[i].transform(df_test[i])
    #MAE
    evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
    mae_test = format(evaluator.evaluate(df_test[i]))
    mae_all_test.append(float(mae_test))
    #MAPE
    result_test = mape(df_test[i], label_col, prediction_col)
    mape_all_test.append(result_test)

In [37]:
'Среднее МАЕ по всем районам на тестостовой выборкуе:', sum(mae_all_test)/len(mae_all_test)
'Среднее МАРЕ по всем районам на тестостовой выборкуе:', sum(mape_all_test)/len(mape_all_test)

('Среднее МАЕ по всем районам на тестостовой выборкуе:', 2.2638927524316474)

('Среднее МАРЕ по всем районам на тестостовой выборкуе:', 58.814657460122625)

**Вывод:** Можно сделать вывод что обученная модель не переобучилась и допускает ошибку для каждого района на 2 заказа, что достаточно неплохо. Если рассматривать в процентах, ошибка уже будет выглядеть более критично. Данную модель я буду использовать для предказания отдного часа лля каждого района. 

# Проверка модели на тестовом дата сете
Определенное колитов махинаций, а если быть точным не 1 десяток позволили нам ее привести к такому же типу, как и обучаемые данные.


In [38]:
tati_test = 'fails/y_true_2022-12-31_23-00_UTC0.csv'

sdf_test = spark.read.csv(path=tati_test, header=True, inferSchema=True)
sdf_test = sdf_test.withColumnRenamed("Pickup Community Area", "area")
sdf_test = sdf_test.withColumnRenamed("trips_count", "count_1h")
sdf_test = sdf_test.withColumnRenamed("hours", "datetime")

In [39]:
timestamp_format = "MM/dd/yyyy hh:mm:ss a"
sdf_test = sdf_test.withColumn("datetime", to_timestamp("datetime", timestamp_format))

In [40]:
#генерация признаков
sdf_test = features_area(sdf_test, 4, 3);

In [41]:
#кодирование временных признаков

#Кодируем месяцы
sdf_test = encode(sdf_test, 'month', 12);

#Кодируем часы
sdf_test = encode(sdf_test, 'hour', 24);

#Кодируем дни
sdf_test = encode(sdf_test, 'day', 31);

#Кодируем день недели
sdf_test = encode(sdf_test, 'day_week', 7);

In [42]:
feature_time_new = sdf_test.drop('area', 'datetime', 'count_1h', 'month',
                                 'day', 'day_week', 'hour').columns

In [43]:
#создание общего вектора фичей
assembler = VectorAssembler(inputCols=feature_time_new, outputCol="features_all")
sdf_test = assembler.transform(sdf_test)

sdf_test = sdf_test.select('area', 'datetime', "count_1h", "features_all")

In [44]:
area_dict = {}
for q in range(1, 78):
    area_dict[q] = sdf_test.filter(col('area') == q)

In [45]:
area_dict[77].show()

+----+-------------------+--------+--------------------+
|area|           datetime|count_1h|        features_all|
+----+-------------------+--------+--------------------+
|  77|2022-12-31 23:00:00|       8|[0.0,0.0,0.0,0.0,...|
+----+-------------------+--------+--------------------+



In [46]:
test_1_h = {}
test_1_h_mae = []
test_1_h_mape = []

for i in range(1, 78):
    area_dict[i] = models_tree[i].transform(area_dict[i])
        
        #MAE
    evaluator = RegressionEvaluator(predictionCol=prediction_col, labelCol=label_col, metricName='mae')
    mae_test = format(evaluator.evaluate(area_dict[i]))
    test_1_h_mae.append(float(mae_test))

In [47]:
'Среднее МАЕ для предсказания одного часа:', sum(test_1_h_mae)/len(test_1_h_mae)
#'Среднее МАРЕ по всем районам на тестостовой выборкуе:', sum(mape_all_test)/len(mape_all_test)

('Среднее МАЕ для предсказания одного часа:', 7.365798046720732)

**Вывод:** 
Результат для предсказания тестовой модели показал не очень радужные результаты. Думалось все будет лучше) 
Скорее всего это связано с тем, что лаги в данном временном диапазоне == 0. Но побороть эту проблему я не смог, думаю, что скрывается недочет именно тут.
Параллельно пытался обучить ARIMA, она показала неплохой показатель на 1 районе, но пришлось перевести таблицу в формат pandas. Конечно, я хотел переписать ее на запросах от спарка, но оказалось это слишком сложно для меня поэтому от нее я отказался. 


# **Общий вывод о проделанной работе:**
Если мы дошли до этого момента, то код нигде не упал и все работает на костылях уже славно. Это мой девиз этого проекта)) 

 1.	Объединил 2 и рассматривал их как единый временный период.
2.	Провел предобработку данных, различными методами сгруппировал
3.	Отобразил графически сезонность спроса. 
4.	Обучил модель Деревья решений и сделал предсказания со средней погрешностью 2.26 заказа. 
5.	Сделал тестовое предсказание 1 часа. Средняя погрешность составила 7.36 заказа по всем районам.