# **Demand Forecasting Using PySpark**

## Objective:
- generate a large number of fine-grained forecasts at the store-item level in an efficient manner leveraging the distributed computational power. which by the way I don't have Xd.
- We'll use a dataset from Kaggle. **Store Item Demand Forecasting Challenge**<br>
https://www.kaggle.com/c/demand-forecasting-kernels-only

In [1]:
!pip install -q findspark
!pip install -q pyspark

### **Create a spark session and import the required libraries**

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')

from fbprophet import Prophet
from fbprophet.plot import plot_plotly

import findspark
findspark.init()
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demand_forecast").getOrCreate()

import warnings
warnings.filterwarnings('ignore')

import logging
# disable informational messages from fbprophet
logging.getLogger('py4j').setLevel(logging.ERROR)

In [3]:
from pyspark.sql.types import *

# structure of the training data set
train_schema = StructType([
  StructField('date', DateType()),
  StructField('store', IntegerType()),
  StructField('item', IntegerType()),
  StructField('sales', IntegerType())
  ])

# read the training file into a dataframe
train = spark.read.csv(
  '../input/demand-forecasting-kernels-only/train.csv', 
  header=True, schema = train_schema)

In [4]:
train.show(5)

### **Print the schema**

In [5]:
train.printSchema()

In [6]:
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')

### **Data Exploration**
**Exploring general trends and seasonality**

In [7]:
yearly_sales = spark.sql('''
SELECT
  year(date) as year, 
  sum(sales) as sales
FROM train
GROUP BY year(date)
ORDER BY year;
''').toPandas()

plt.plot(yearly_sales['year'],yearly_sales['sales'])
plt.title('');

In [8]:
monthly_sales = spark.sql('''
SELECT 
  TRUNC(date, 'MM') as month,
  SUM(sales) as sales
FROM train
GROUP BY TRUNC(date, 'MM')
ORDER BY month;''').toPandas()

plt.plot(monthly_sales['month'],monthly_sales['sales']);

Aggregating the data at a weekday level, a pronounced weekly seasonal pattern is observed with a peak on Sunday (weekday 0), a hard drop on Monday (weekday 1) and then a steady pickup over the week heading back to the Sunday high. This pattern seems to be pretty stable across the five years of observations.

In [9]:
weekly_sales = spark.sql('''
SELECT
  YEAR(date) as year,
  CAST(DATE_FORMAT(date, 'F') as Integer) % 7 as weekday,
  AVG(sales) as sales
FROM (
  SELECT 
    date,
    SUM(sales) as sales
  FROM train
  GROUP BY date
 ) x
GROUP BY year, CAST(DATE_FORMAT(date, 'F') as Integer)
ORDER BY year, weekday;
''').toPandas()
sns.lineplot(x=weekly_sales['weekday'],y=weekly_sales['sales'],hue=weekly_sales['year']);

### **Build a Forecast**
* We will build a forcasting model using fbprophet for single item-store for illustraintion. 
* Based on our review of the data, it looks like we should set our overall growth pattern to linear and enable the evaluation of weekly and yearly seasonal patterns. We might also wish to set our seasonality mode to multiplicative as the seasonal pattern seems to grow with overall growth in sales.<br>

In [10]:
# query to aggregate data to date (ds) level
sql_statement = '''
  SELECT
    CAST(date as date) as ds,
    sales as y
  FROM train
  WHERE store=1 AND item=1
  ORDER BY ds
  '''

# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()

# drop any missing records
history_pd = history_pd.dropna()

In [11]:
# set model parameters
model = Prophet(
  interval_width=0.95,
  growth='linear',
  daily_seasonality=False,
  weekly_seasonality=True,
  yearly_seasonality=True,
  seasonality_mode='multiplicative'
  )

# fit the model to historical data
model.fit(history_pd)

In [12]:
# define a dataset including both historical dates & 90-days beyond the last available date
future_pd = model.make_future_dataframe(
  periods=90, 
  freq='d', 
  include_history=True
  )

# predict over the dataset
forecast_pd = model.predict(future_pd)

In [13]:
trends_fig = model.plot_components(forecast_pd);

In [14]:
predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')

# adjust figure to display dates from last year + the 90 day forecast
xlim = predict_fig.axes[0].get_xlim()
new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
predict_fig.axes[0].set_xlim(new_xlim);

In [15]:
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date,datetime

# get historical actuals & predictions for comparison
actuals_pd = history_pd[ history_pd['ds'] < pd.to_datetime(date(2018, 1, 1)) ]['y']
predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime(date(2018, 1, 1)) ]['yhat']

# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)

# print metrics to the screen
print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )

### **Scaling Model Training & Forecasting**

* We will start by assembling sales data at the store-item-date level of granularity.<br>

* To train the model and generate a forecast we will leverage a Pandas user-defined function (UDF). We will define this function to receive a subset of data organized around a store and item combination.<br>

In [16]:
sql_statement = '''
  SELECT
    store,
    item,
    CAST(date as date) as ds,
    SUM(sales) as y
  FROM train
  GROUP BY store, item, ds
  ORDER BY store, item, ds
  '''

store_item_history = (
  spark
    .sql(sql_statement)
#     .repartition(2, ['store', 'item'])
  ).cache()


In [17]:
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('store',IntegerType()),
  StructField('item',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

In [18]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_store_item( history_pd ):
    # TRAIN MODEL AS BEFORE
    # --------------------------------------
    # remove missing values (more likely at day-store-item level)
    history_pd = history_pd.dropna()

    # configure the model
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative')

    # train the model
    model.fit(history_pd)
    # --------------------------------------

    # BUILD FORECAST AS BEFORE
    # --------------------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True)
    forecast_pd = model.predict( future_pd )  
    # --------------------------------------

    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')

    # get relevant fields from history
    h_pd = history_pd[['ds','store','item','y']].set_index('ds')

    # join history and forecast
    results_pd = f_pd.join( h_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)

    # get store & item from incoming data set
    results_pd['store'] = history_pd['store'].iloc[0]
    results_pd['item'] = history_pd['item'].iloc[0]
    # --------------------------------------

    # return expected dataset
    return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

In [19]:
from pyspark.sql.functions import current_date

results = (
  store_item_history
    .groupBy('store', 'item')
    .apply(forecast_store_item)
    .withColumn('training_date', current_date() )
    )

results.createOrReplaceTempView('new_forecasts')

In [20]:
forecast = spark.sql('''
select 
  ds as date,
  store,
  item,
  y as sales,
  yhat as sales_predicted,
  yhat_upper as sales_predicted_upper,
  yhat_lower as sales_predicted_lower,
  training_date
from new_forecasts;''')

### **Print Forecasting Schema**

In [21]:
forecast.printSchema()

### **Evaluation of Forecasting**

In [22]:
# schema of expected result set
eval_schema =StructType([
  StructField('training_date', DateType()),
  StructField('store', IntegerType()),
  StructField('item', IntegerType()),
  StructField('mae', FloatType()),
  StructField('mse', FloatType()),
  StructField('rmse', FloatType())
  ])

In [23]:
# define udf to calculate metrics
@pandas_udf( eval_schema, PandasUDFType.GROUPED_MAP )
def evaluate_forecast( evaluation_pd ):

    # get store & item in incoming data set
    training_date = evaluation_pd['training_date'].iloc[0]
    store = evaluation_pd['store'].iloc[0]
    item = evaluation_pd['item'].iloc[0]

    # calulate evaluation metrics
    mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
    mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
    rmse = sqrt( mse )

    # assemble result set
    results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
    return pd.DataFrame.from_dict( results )

In [24]:
# calculate metrics
results = (
  spark
    .table('new_forecasts')
    .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
    .select('training_date', 'store', 'item', 'y', 'yhat')
    .groupBy('training_date', 'store', 'item')
    .apply(evaluate_forecast)
    )

In [25]:
results.printSchema()

### **Print The result of Forecasting** 

In [None]:
results.show()

**Note:**<br>
The previous cell requires computing power if you don't have it like me run lower item-store combinations. 