In [1]:
pip install prophet

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from prophet import Prophet

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

Importing plotly failed. Interactive plots will not work.


In [3]:
SPARK_MASTER_IP = '172.18.0.2' 
spark = SparkSession.builder.appName("pyspark-taxi-forecasting") \
    .master(f"spark://{SPARK_MASTER_IP}:7077") \
    .config("spark.executor.cores", 3) \
    .config('spark.local.dir', 'spark_tmp/') \
    .config("spark.task.cpus", 3) \
    .getOrCreate()

spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()

In [4]:
spark

# Обучение модели и предсказания

In [5]:
all_hours = spark.read.csv("all_hours_short_v2.csv", header = True, inferSchema = True)

In [6]:
test = spark.read.csv("y_true_2023-07-31_23-00_UTC0.csv", header = True, inferSchema = True)
test = test.withColumn('ds', test['hours']) \
    .withColumn('pickup_community_area', test['Pickup Community Area']) \
    .withColumn('y', test['trips_count'])

In [7]:
column_list = ["pickup_community_area"]
  

Windowspec = Window.partitionBy(["pickup_community_area"]).orderBy(all_hours.hour_cons.desc())
  
all_hours_lagged = all_hours.withColumn(
    'med_cost_lagged1', lag(all_hours['cost_median'], -1).over(Windowspec)).withColumn(
    'med_miles_lagged1', lag(all_hours['miles_median'], -1).over(Windowspec)).withColumn(
    'med_seconds_lagged1', lag(all_hours['seconds_median'], -1).over(Windowspec)).withColumn(
    'trips_count_lagged1', lag(all_hours['trips_count'], -1).over(Windowspec)).withColumn(
    'trips_count_lagged2', lag(all_hours['trips_count'], -2).over(Windowspec)).withColumn(
    'trips_count_lagged3', lag(all_hours['trips_count'], -3).over(Windowspec)).withColumn(
    'trips_count_lagged12', lag(all_hours['trips_count'], -12).over(Windowspec)).withColumn(
    'trips_count_lagged24', lag(all_hours['trips_count'], -24).over(Windowspec)).withColumn(
    'trips_count_lagged_week', lag(all_hours['trips_count'], -24*7).over(Windowspec))

all_hours_lagged = all_hours_lagged.withColumn(
    'rolling_average_on3hours', avg(all_hours_lagged['trips_count_lagged1'],).over(Windowspec.rowsBetween(-3, -0))) \
    .withColumn('rolling_average_on24hours', avg(all_hours_lagged['trips_count_lagged1'],).over(Windowspec.rowsBetween(-24, -0))) \
    .na.drop('any').cache()


# Проведем предсказания с помощью модели Prophet

## Разделим данные для Prophet

In [8]:
train_proph = all_hours_lagged.filter((all_hours_lagged.hour_cons < '2023-07-31 23:00:00')) \
    .selectExpr( 'pickup_community_area',
    'hour_cons as ds', 'trips_count as y')
# Partition the data dfsp_partitionned
train_proph.createOrReplaceTempView("pickup_community_area")
sql = "select * from pickup_community_area"
train_proph = (spark.sql(sql)\
   .repartition(spark.sparkContext.defaultParallelism, 
   ['pickup_community_area'])).cache()
train_proph.explain()

== Physical Plan ==
InMemoryTableScan [pickup_community_area#18, ds#357, y#358]
   +- InMemoryRelation [pickup_community_area#18, ds#357, y#358], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- Exchange hashpartitioning(pickup_community_area#18, 9), REPARTITION_BY_NUM, [plan_id=115]
            +- *(1) Project [pickup_community_area#18, hour_cons#19 AS ds#357, trips_count#21 AS y#358]
               +- *(1) Filter (isnotnull(hour_cons#19) AND (hour_cons#19 < 2023-07-31 23:00:00))
                  +- InMemoryTableScan [hour_cons#19, pickup_community_area#18, trips_count#21], [isnotnull(hour_cons#19), (hour_cons#19 < 2023-07-31 23:00:00)]
                        +- InMemoryRelation [_c0#17, pickup_community_area#18, hour_cons#19, taxi_countdist#20, trips_count#21, cost_median#22, miles_median#23, seconds_median#24, med_cost_lagged1#75, med_miles_lagged1#86, med_seconds_lagged1#97, trips_count_lagged1#109, trips_count_lagged2#122, trips_count_lagged3#136, trips_count_lag

In [9]:
test_proph = all_hours_lagged.filter(all_hours_lagged.hour_cons == '2023-07-31 23:00:00') \
    .selectExpr( 'pickup_community_area',
    'hour_cons as ds', 'trips_count as y') 
    #.withColumn("y",col("y").cast(DoubleType()))

## Проведем обучение и предсказание модели через функцию Pandas_udf

In [10]:
# Define a schema
schema = StructType([ \
                     StructField('pickup_community_area', IntegerType()), 
                     StructField('ds', TimestampType()),
                     StructField('y', FloatType()),
                     StructField('yhat', DoubleType()),
                     StructField('daily', DoubleType()),
                     StructField('weekly', DoubleType())
                    ])

In [11]:
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(store_pd):  # instantiate the model and set parameters
  model = Prophet(
      interval_width=0.95,
      growth='linear',
      n_changepoints = 150,
      daily_seasonality=True,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='additive'
  )  # fit the model to historical data
  model.fit(store_pd)  # Create a data frame that lists 90 dates starting from Jan 1 2018
  future = model.make_future_dataframe(
      periods=1, #test_proph.filter(test_proph.pickup_community_area==community_num).count(),
      freq='h',
      include_history=True
   )  # Out of sample prediction
  prediction = model.predict(future)  # Create a data frame that contains store, item, y, and yhat
  f_pd = prediction[['ds', 'yhat', 'daily', 'weekly']]
  st_pd = store_pd[['ds', 'pickup_community_area', 'y']]
  result_pd = f_pd.join(st_pd.set_index('ds'), on='ds', how='left')  # fill store and item
  result_pd['pickup_community_area'] = store_pd['pickup_community_area'].iloc[0]
  return result_pd[['pickup_community_area', 'ds', 'y', 'yhat',
                    'daily', 'weekly']]# Apply the function to all store-items
# Print the results - calculate the time to run
results = train_proph.groupby(['pickup_community_area']).apply(apply_model).cache()




# Создадим признаки задерки и скользящего среднего

In [12]:
column_list = ["pickup_community_area"]
  

Windowspec = Window.partitionBy(["pickup_community_area"]).orderBy(all_hours.hour_cons.desc())
  
all_hours_lagged = all_hours.withColumn(
    'med_cost_lagged1', lag(all_hours['cost_median'], -1).over(Windowspec)).withColumn(
    'med_miles_lagged1', lag(all_hours['miles_median'], -1).over(Windowspec)).withColumn(
    'med_seconds_lagged1', lag(all_hours['seconds_median'], -1).over(Windowspec)).withColumn(
    'trips_count_lagged1', lag(all_hours['trips_count'], -1).over(Windowspec)).withColumn(
    'trips_count_lagged2', lag(all_hours['trips_count'], -2).over(Windowspec)).withColumn(
    'trips_count_lagged3', lag(all_hours['trips_count'], -3).over(Windowspec)).withColumn(
    'trips_count_lagged12', lag(all_hours['trips_count'], -12).over(Windowspec)).withColumn(
    'trips_count_lagged24', lag(all_hours['trips_count'], -24).over(Windowspec)).withColumn(
    'trips_count_lagged_week', lag(all_hours['trips_count'], -24*7).over(Windowspec))

all_hours_lagged = all_hours_lagged.withColumn(
    'rolling_average_on3hours', avg(all_hours_lagged['trips_count_lagged1'],).over(Windowspec.rowsBetween(-3, -0))) \
    .withColumn('rolling_average_on24hours', avg(all_hours_lagged['trips_count_lagged1'],).over(Windowspec.rowsBetween(-24, -0))) \
    .na.drop('any').cache()


# Проведем обучение и окончательные предсказания с помощью линейной регрессии

## Выделим тренировочную и тестовую выборки

In [13]:
train_lr = all_hours_lagged.filter((all_hours_lagged.hour_cons < '2023-07-31 23:00:00')) \
    .selectExpr( 'pickup_community_area',
    'med_cost_lagged1','med_miles_lagged1','med_seconds_lagged1',
    'trips_count_lagged1', 'trips_count_lagged2', 'trips_count_lagged3', 
    'trips_count_lagged12', 'trips_count_lagged24','trips_count_lagged_week',
    'rolling_average_on3hours',
    'rolling_average_on24hours',
    'hour_cons as ds', 'trips_count as y') \
        .join(results['pickup_community_area', 'ds', 'daily', 'weekly'], on=['ds','pickup_community_area'] , how='inner'
             ).cache()

In [14]:
featureCols = [
    'med_cost_lagged1','med_miles_lagged1','med_seconds_lagged1',
    'trips_count_lagged1', 'trips_count_lagged2', 'trips_count_lagged3', 
    'trips_count_lagged12', 'trips_count_lagged24','trips_count_lagged_week',
    'rolling_average_on3hours',
    'rolling_average_on24hours',
     'daily', 'weekly'
]

In [15]:
test_lr = all_hours_lagged.filter(all_hours_lagged.hour_cons == '2023-07-31 23:00:00') \
    .selectExpr('pickup_community_area',
    'med_cost_lagged1','med_miles_lagged1','med_seconds_lagged1',
    'trips_count_lagged1', 'trips_count_lagged2', 'trips_count_lagged3', 
    'trips_count_lagged12', 'trips_count_lagged24','trips_count_lagged_week',
    'rolling_average_on3hours',
    'rolling_average_on24hours',
    'hour_cons as ds', 'trips_count as y') \
        .join(results['pickup_community_area', 'ds', 'daily', 'weekly'], on=['ds','pickup_community_area'] , how='inner'
             ).cache()

## Создадим pipeline и evaluator

In [16]:
# положить фичи в вектор
assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd = True)
lr = (LinearRegression(featuresCol='features_scaled', labelCol="y", predictionCol='y_pred')) 

pipeline = Pipeline(stages=[assembler, standardScaler, lr])

In [17]:
evaluator = RegressionEvaluator(predictionCol="y_pred", labelCol='y', metricName='mae')

# Реализуем регрессию для каждого региона через функцию

In [18]:
def modelsandpredictions(func_community):
        
    temp_train = train_lr.filter(train_lr.pickup_community_area == func_community)
    temp_test = test_lr.filter(test_lr.pickup_community_area == func_community)
    
    temp_lr = pipeline.fit(temp_train)
    temp_pred = temp_lr.transform(temp_test)
    temp_predandtrue = temp_pred.select(
                                        'pickup_community_area',
                                        'ds',
                                        "y_pred"
                                        ).withColumn("y_pred",F.round(temp_pred["y_pred"],0)) 
    temp_predandtrue = temp_predandtrue.withColumn('y_pred', F.when((F.col("y_pred") <= 0), 0)\
        .otherwise(temp_predandtrue.y_pred)).cache()
    #temp_lr = lr.fit(temp_train.union(temp_test))
    #temp_lr.write().overwrite().save("/models/lr{0}".format(i))
    return temp_predandtrue

In [19]:
preds_schema = StructType([ \
    StructField("pickup_community_area",IntegerType (),True), \
    StructField("ds",TimestampType(),True), \
    StructField("y_pred",FloatType(),True) \
  ])

In [None]:
%%time
test_preds = spark.createDataFrame([],preds_schema)
for i in range(78):
    test_preds = test_preds.union(modelsandpredictions(i))

In [None]:
trueandpreds = test.join(test_preds, on=['ds','pickup_community_area'], how='inner') \
    .select(
        test.ds,
        test.pickup_community_area,
        test.y,
        test_preds.y_pred).cache()

## Посмотрим финальные предсказания и оценим их

In [None]:
%%time
trueandpreds.sort(trueandpreds.pickup_community_area.desc()).show(80)
print("LinearRegression MAE: {0}".format(evaluator.evaluate(trueandpreds)))
print('MAPE:',
    trueandpreds.select(avg((100*abs((trueandpreds.y - trueandpreds.y_pred) / trueandpreds.y)))).collect())

    Окончательные ошибки отдельных Линейных регрессий для каждого района составили
    МАЕ: 2.1025641025641026
    МАРЕ: 65.79573756314875

    Возможно, тестовые данные были обработаны иначе. Необходимо определить и скоординировать в частности, работу с пропусками и заполнение района 0 в тестовых и тренировочных данных.

1,93 когда заполнил пропуски в community area на 0

1.9230769230769231 - когда не заполнял пропуски в длительности, расстоянии и стоимости и при отсечке по квантилям 0,99