d #Demand Forecasting

The objective of this notebook is to illustrate how we might generate a large number of fine-grained forecasts at the store-item level in an efficient manner leveraging the distributed computational power of Databricks.  For this exercise, we will make use of an increasingly popular library for demand forecasting, [FBProphet](https://facebook.github.io/prophet/), which we will load into the notebook session associated with a cluster running Databricks 6.0 or higher:

In [2]:
#from pyspark.sql.functions import expr
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as f


In [3]:
# load fbprophet library
dbutils.library.installPyPI('FBProphet', version='0.5') # find latest version of fbprophet here: https://pypi.org/project/fbprophet/
dbutils.library.installPyPI('holidays','0.9.12') # this line is in response to this issue with fbprophet 0.5: https://github.com/facebook/prophet/issues/1293

dbutils.library.restartPython()

## Examine the COVID-19 Data

For our training dataset, we use COVID-19 [here](https://docs.google.com/spreadsheets/d/14quQPFErG-hlpsrNgYcX85vW7JMMK5X2vNZrafRcH8c/edit#gid=684361619) or [here](https://bit.ly/2yN71PJ)

In [5]:
%fs ls /FileStore/tables/FileStore/tables

Read the Data from Databricks UI

In [7]:
IS_TABLE_PERMANANT = False

In [8]:
# File location and type
file_location = "/FileStore/tables/COVID_19_Cases___COVID_19_Confirmed-80092.xls"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
confirmed_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)



In [9]:
confirmed_df.take(3)

In [10]:
# File location and type of The Deaths
file_location_deaths = "/FileStore/tables/COVID_19_Cases___COVID_19_Deaths-1e48f.csv"

# The applied options are for CSV files. For other file types, these will be ignored.
deaths_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_deaths)



In [11]:
deaths_df.take(3)

In [12]:
# create updated column with the standard data format
# from 02-02-2020: 2020-02-02T23:43:02, which is yyyy-MM-dd'T'HH:mm:ss
confirmed_df = confirmed_df.withColumn("updated_date", f.from_unixtime(unix_timestamp(col('Date'),"MM/dd/yyyy"))) \
                          .withColumn("updated_date", col("updated_date").cast("timestamp"))

In [13]:

deaths_df = deaths_df.withColumn("updated_date", from_unixtime(unix_timestamp(col('Date'),"MM/dd/yyyy"))) \
                      .withColumn("updated_date", col("updated_date").cast("timestamp"))

In [14]:
# Create a view or table: enable the table name in %sql
confirmed_tbl = "c19_confirmed"
confirmed_df.createOrReplaceTempView(confirmed_tbl)

deaths_tbl = "c19_deaths"
deaths_df.createOrReplaceTempView(deaths_tbl)

In [15]:
# Save it as permanant
if IS_TABLE_PERMANANT:
  confirmed_tbl = "c19_confirmed"
  confirmed_df.write.format("parquet").saveAsTable(confirmed_tbl)
  deaths_tbl = "c19_deaths"
  deaths_df.write.format("parquet").saveAsTable(deaths_tbl)
  

When performing demand forecasting, we are often interested in general trends and seasonality.  Let's start our exploration by examing the annual trend in unit sales:

In [17]:
confirmed_df.describe()

In [18]:
%sql
DESCRIBE c19_confirmed

col_name,data_type,comment
Case_Type,string,
Cases,int,
Difference,int,
Date,string,
Country_Region,string,
Province_State,string,
Admin2,string,
FIPS,int,
Lat,double,
Long,double,


In [19]:
ch_confirmed = spark.sql("SELECT * FROM c19_confirmed WHERE Country_Region=='US' AND Province_State=='California' AND Admin2=='Los Angeles'")
ch_confirmed.show()

In [20]:
# Creates a DataFrame from a specified worksheet
display(ch_confirmed)

Case_Type,Cases,Difference,Date,Country_Region,Province_State,Admin2,FIPS,Lat,Long,updated_date
Confirmed,1,0,2/8/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-02-08T00:00:00.000+0000
Confirmed,3518,499,4/1/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-04-01T00:00:00.000+0000
Confirmed,144,50,3/17/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-03-17T00:00:00.000+0000
Confirmed,1,0,2/27/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-02-27T00:00:00.000+0000
Confirmed,1465,236,3/27/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-03-27T00:00:00.000+0000
Confirmed,14,0,3/8/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-03-08T00:00:00.000+0000
Confirmed,1,0,2/16/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-02-16T00:00:00.000+0000
Confirmed,812,150,3/25/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-03-25T00:00:00.000+0000
Confirmed,1,0,2/3/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-02-03T00:00:00.000+0000
Confirmed,1,1,1/26/2020,US,California,Los Angeles,6037,34.30828379,-118.2282411,2020-01-26T00:00:00.000+0000


In [21]:
%sql

SELECT
  month(updated_date) as month, 
  sum(cases) as cases
FROM c19_confirmed
GROUP BY month(updated_date)
ORDER BY month;

month,cases
1,38534
2,1671539
3,8892729
4,14850668


It's very clear from the data that there is a generally upward trend in total unit sales across the stores. If we had better knowledge of the markets served by these stores, we might wish to identify whether there is a maximum growth capacity we'd expect to approach over the life of our forecast.  But without that knowledge and by just quickly eyeballing this dataset, it feels safe to assume that if our goal is to make a forecast a few days, months or even a year out, we might expect continued linear growth over that time span.

Now let's examine seasonality.  If we aggregate the data around the individual months in each year, a distinct yearly seasonal pattern is observed which seems to grow in scale with overall growth in sales:

In [23]:
%sql

SELECT 
  TRUNC(updated_date, 'MM') as month,
  SUM(Difference) as Difference
  --SUM(cases) as cases
FROM c19_confirmed
GROUP BY TRUNC(updated_date, 'MM')
ORDER BY month;

month,Difference
2020-01-01,9927
2020-02-01,76040
2020-03-01,771368
2020-04-01,914022


Aggregating the data at a weekday level, a pronounced weekly seasonal pattern is observed with a peak on Sunday (weekday 0), a hard drop on Monday (weekday 1) and then a steady pickup over the week heading back to the Sunday high.  This pattern seems to be pretty stable across the five years of observations:

In [25]:
%sql

SELECT
  YEAR(updated_date) as year,
  CAST(DATE_FORMAT(updated_date, 'u') as Integer) % 7 as weekday,
  --CONCAT(DATE_FORMAT(date, 'u'), '-', DATE_FORMAT(date, 'EEEE')) as weekday,
  AVG(Difference) as Difference
  --AVG(cases) as cases
FROM (
  SELECT 
    updated_date,
    SUM(Difference) as Difference
    --SUM(cases) as cases
  FROM c19_confirmed
  GROUP BY updated_date
 ) x
GROUP BY year, CAST(DATE_FORMAT(updated_date, 'u') as Integer) --, CONCAT(DATE_FORMAT(date, 'u'), '-', DATE_FORMAT(date, 'EEEE'))
ORDER BY year, weekday;

year,weekday,Difference
2020,0,17712.636363636364
2020,1,18650.272727272728
2020,2,20943.18181818182
2020,3,20437.75
2020,4,23480.166666666668
2020,5,25571.33333333333
2020,6,25593.25


Now that we are oriented to the basic patterns within our data, let's explore how we might build a forecast.

###Build a Forecast

Before attempting to generate forecasts for individual combinations of stores and items, it might be helpful to build a single forecast for no other reason than to orient ourselves to the use of FBProphet.

Our first step is to assemble the historical dataset on which we will train the model:

In [28]:
# query to aggregate data to date (ds) level
sql_statement = '''
  SELECT
    CAST(updated_date as date) as ds,
    Difference as y
  FROM c19_confirmed
  ORDER BY ds
  '''

# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()

# drop any missing records
history_pd = history_pd.dropna()

Now, we will import the fbprophet library, but because it can be a bit verbose when in use, we will need to fine-tune the logging settings in our environment:

In [30]:
from fbprophet import Prophet
import logging

# disable informational messages from fbprophet
logging.getLogger('py4j').setLevel(logging.ERROR)

Based on our review of the data, it looks like we should set our overall growth pattern to linear and enable the evaluation of weekly and yearly seasonal patterns. We might also wish to set our seasonality mode to multiplicative as the seasonal pattern seems to grow with overall growth in sales:

In [32]:
# set model parameters
model = Prophet(
  interval_width=0.95,
  growth='linear',
  daily_seasonality=False,
  weekly_seasonality=True,
  yearly_seasonality=True,
  seasonality_mode='multiplicative'
  )

# fit the model to historical data
model.fit(history_pd)

Now that we have a trained model, let's use it to build a 90-day forecast:

In [34]:
# define a dataset including both historical dates & 90-days beyond the last available date
future_pd = model.make_future_dataframe(
  periods=90, 
  freq='d', 
  include_history=True
  )

# predict over the dataset
forecast_pd = model.predict(future_pd)

display(forecast_pd)

ds,trend,yhat_lower,yhat_upper,trend_lower,trend_upper,multiplicative_terms,multiplicative_terms_lower,multiplicative_terms_upper,weekly,weekly_lower,weekly_upper,yearly,yearly_lower,yearly_upper,additive_terms,additive_terms_lower,additive_terms_upper,yhat
2020-01-22T00:00:00.000+0000,46.286606033248816,-555.681046173045,596.4335219186041,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-500.19851145011967,608.5398360434951,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-601.602737750509,610.9592674279314,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-528.6352708133325,617.2248287804575,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-522.0357672835239,604.9835998399212,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-533.6018042293964,605.6656364545192,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-494.155058586252,603.7197557018075,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-537.1856359871485,623.774831502696,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-537.4659833456318,607.1979657769598,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889
2020-01-22T00:00:00.000+0000,46.286606033248816,-514.3836385126652,656.1691766136546,46.286606033248816,46.286606033248816,-0.0009983379541099516,-0.0009983379541099516,-0.0009983379541099516,-0.00017047168362966097,-0.00017047168362966097,-0.00017047168362966097,-0.0008278662704802905,-0.0008278662704802905,-0.0008278662704802905,0.0,0.0,0.0,46.24039635767889


How did our model perform? Here we can see the general and seasonal trends in our model presented as graphs:

In [36]:
trends_fig = model.plot_components(forecast_pd)
display(trends_fig)

And here, we can see how our actual and predicted data line up as well as a forecast for the future, though we will limit our graph to the last year of historical data just to keep it readable:

In [38]:
predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='Difference')

# adjust figure to display dates from last year + the 90 day forecast
xlim = predict_fig.axes[0].get_xlim()
new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
predict_fig.axes[0].set_xlim(new_xlim)

display(predict_fig)

**NOTE** This visualization is a bit busy. Bartosz Mikulski provides [an excellent breakdown](https://www.mikulskibartosz.name/prophet-plot-explained/) of it that is well worth checking out.  In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.

Visual inspection is useful, but a better way to evaulate the forecast is to calculate Mean Absolute Error, Mean Squared Error and Root Mean Squared Error values for the predicted relative to the actual values in our set:

In [41]:
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date

# get historical actuals & predictions for comparison
actuals_pd = history_pd[ history_pd['ds'] < date(2020, 4, 12) ]['y']
predicted_pd = forecast_pd[ forecast_pd['ds'] < date(2020, 4, 12) ]['yhat']

# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)

# print metrics to the screen
print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )

FBProphet provides [additional means](https://facebook.github.io/prophet/docs/diagnostics.html) for evaluating how your forecasts hold up over time. You're strongly encouraged to consider using these and those additional techniques when building your forecast models but we'll skip this here to focus on the scaling challenge.

###Scaling Model Training & Forecasting

With the mechanics under our belt, let's now tackle our original goal of building numerous, fine-grain models & forecasts for individual store and item combinations.  We will start by assembling sales data at the store-item-date level of granularity:

**NOTE**: The data in this data set should already be aggregated at this level of granularity but we are explicitly aggregating to ensure we have the expected data structure.

In [44]:
sql_statement = '''
  SELECT
    Province_State,
    Admin2,
    CAST(updated_date as date) as ds,
    Difference as y
  FROM c19_confirmed
  '''
#   GROUP BY Province_State, Admin2, ds
#  ORDER BY Province_State, Admin2, ds

store_item_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['Province_State', 'Admin2'])
  ).cache()

With our data aggregated at the store-item-date level, we need to consider how we will pass our data to FBProphet. If our goal is to build a model for each store and item combination, we will need to pass in a store-item subset from the dataset we just assembled, train a model on that subset, and receive a store-item forecast back. We'd expect that forecast to be returned as a dataset with a structure like this where we retain the store and item identifiers for which the forecast was assembled and we limit the output to just the relevant subset of fields generated by the Prophet model:

In [46]:
from pyspark.sql.types import *

result_schema =StructType([
  StructField('ds',DateType()),
  StructField('Province_State',StringType()),
  StructField('Admin2',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

To train the model and generate a forecast we will leverage a Pandas user-defined function (UDF).  We will define this function to receive a subset of data organized around a store and item combination.  It will return a forecast in the format identified in the previous cell:

In [48]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_store_item( history_pd ):
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
  history_pd = history_pd.dropna()
  
  # configure the model
  model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
    )
  
  # train the model
  model.fit( history_pd )
  # --------------------------------------
  
  # BUILD FORECAST AS BEFORE
  # --------------------------------------
  # make predictions
  future_pd = model.make_future_dataframe(
    periods=90, 
    freq='d', 
    include_history=True
    )
  forecast_pd = model.predict( future_pd )  
  # --------------------------------------
  
  # ASSEMBLE EXPECTED RESULT SET
  # --------------------------------------
  # get relevant fields from forecast
  f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
  
  # get relevant fields from history
  h_pd = history_pd[['ds','Province_State','Admin2','y']].set_index('ds')
  
  # join history and forecast
  results_pd = f_pd.join( h_pd, how='left' )
  results_pd.reset_index(level=0, inplace=True)
  
  # get store & item from incoming data set
  results_pd['Province_State'] = history_pd['Province_State'].iloc[0]
  results_pd['Admin2'] = history_pd['Admin2'].iloc[0]
  # --------------------------------------
  
  # return expected dataset
  return results_pd[ ['ds', 'Province_State', 'Admin2', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

There's a lot taking place within our UDF, but if you compare the first two blocks of code within which the model is being trained and a forecast is being built to the cells in the previous portion of this notebook, you'll see the code is pretty much the same as before. It's only in the assembly of the required result set that truly new code is being introduced and it consists of fairly standard Pandas dataframe manipulations.

Now let's call our UDF to build our forecasts.  We do this by grouping our historical dataset around store and item.  We then apply our UDF to each group and tack on today's date as our *training_date* for data management purposes:

In [51]:
from pyspark.sql.functions import current_date

results = (
  store_item_history
    .groupBy('Province_State', 'Admin2')
    .apply(forecast_store_item)
    .withColumn('training_date', current_date() )
    )

results.createOrReplaceTempView('new_forecasts')

We we are likely wanting to report on our forecasts, so let's save them to a queriable table structure:

In [53]:
%sql
-- create forecast table
create table if not exists forecasts (
  date date,
  Province_State string,
  Admin2 string,
  sales float,
  sales_predicted float,
  sales_predicted_upper float,
  sales_predicted_lower float,
  training_date date
  )
using delta
partitioned by (training_date);



In [54]:
%sql
-- load data to it
insert into forecasts
select 
  ds as date,
  Province_State,
  Admin2,
  y as sales,
  yhat as sales_predicted,
  yhat_upper as sales_predicted_upper,
  yhat_lower as sales_predicted_lower,
  training_date
from new_forecasts;

But how good (or bad) is each forecast?  Using the UDF technique, we can generate evaluation metrics for each store-item forecast as follows:

In [56]:
import pandas as pd

# schema of expected result set
eval_schema =StructType([
  StructField('training_date', DateType()),
  StructField('Province_State', StringType()),
  StructField('Admin2', StringType()),
  StructField('mae', FloatType()),
  StructField('mse', FloatType()),
  StructField('rmse', FloatType())
  ])

# define udf to calculate metrics
@pandas_udf( eval_schema, PandasUDFType.GROUPED_MAP )
def evaluate_forecast( evaluation_pd ):
  
  # get store & item in incoming data set
  training_date = evaluation_pd['training_date'].iloc[0]
  store = evaluation_pd['Province_State'].iloc[0]
  item = evaluation_pd['Admin2'].iloc[0]
  
  # calulate evaluation metrics
  mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
  mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
  rmse = sqrt( mse )
  
  # assemble result set
  results = {'training_date':[training_date], 'Province_State':[Province_State], 'Admin2':[Admin2], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
  return pd.DataFrame.from_dict( results )

# calculate metrics
results = (
  spark
    .table('new_forecasts')
    .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
    .select('training_date', 'Province_State', 'Admin2', 'y', 'yhat')
    .groupBy('training_date', 'Province_State', 'Admin2')
    .apply(evaluate_forecast)
    )
results.createOrReplaceTempView('new_forecast_evals')

Once again, we will likely want to report the metrics for each forecast, so we persist these to a queriable table:

In [58]:
%sql

create table if not exists forecast_evals (
  Province_State string,
  Admin2 string,
  mae float,
  mse float,
  rmse float,
  training_date date
  )
using delta
partitioned by (training_date);

insert into forecast_evals
select
  Province_State,
  Admin2,
  mae,
  mse,
  rmse,
  training_date
from new_forecast_evals;

We now have constructed a forecast for each store-item combination and generated basic evaluation metrics for each.  To see this forecast data, we can issue a simple query (limited here to product 1 across stores 1 through 10):

In [60]:
%sql

SELECT
  store,
  date,
  sales_predicted,
  sales_predicted_upper,
  sales_predicted_lower
FROM forecasts a
WHERE item = 1 AND
      --store IN (1, 2, 3, 4, 5) AND
      date >= '2018-01-01' AND
      training_date=current_date()
ORDER BY store

store,date,sales_predicted,sales_predicted_upper,sales_predicted_lower


And for each of these, we can retrieve a measure of help us assess the reliability of each forecast:

In [62]:
%sql

SELECT
  store,
  mae,
  mse,
  rmse
FROM forecast_evals a
WHERE item = 1 AND
      training_date=current_date()
ORDER BY store

store,mae,mse,rmse
