### Import the required packages and functions

The code cell below imports the required packages and functions. 

In [2]:
import pandas as pd
import logging
from prophet import Prophet
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import current_date

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 6, Finished, Available)

The code cell below disables informational messages from prophet

In [3]:
logging.getLogger('py4j').setLevel(logging.ERROR)

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 7, Finished, Available)

### Get the data
Get the data that needs to be forecasted at the appropriate level of granualarity. We are forecasting products at the store level so we group the forecast_data by ***storekey***, ***productkey***, and ***date*** then sum ***sales*** at that level.

In [4]:
sql_statement = '''
  SELECT
    storekey,
    productkey,
    CAST(date as date) as ds,
    SUM(sales) as y
  FROM forecast_data
  GROUP BY storekey, productkey, ds
  ORDER BY storekey, productkey, ds
  '''

store_item_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['storekey', 'productkey'])
  ).cache()

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 8, Finished, Available)

### Provide structure for the output
The code below is used to provide structure to the output of our forecast.

In [5]:
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('storekey',IntegerType()),
  StructField('productkey',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('trend',FloatType()),
  StructField('trend_lower',FloatType()),
  StructField('trend_upper',FloatType()),
  StructField('multiplicative_terms',FloatType()),
  StructField('multiplicative_terms_lower',FloatType()),
  StructField('multiplicative_terms_upper',FloatType()),
  StructField('weekly',FloatType()),
  StructField('weekly_lower',FloatType()),
  StructField('weekly_upper',FloatType()),
  StructField('yearly',FloatType()),
  StructField('yearly_lower',FloatType()),
  StructField('yearly_upper',FloatType()),
  StructField('additive_terms',FloatType()),
  StructField('additive_terms_lower',FloatType()),
  StructField('additive_terms_upper',FloatType())
  ])

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 9, Finished, Available)

### Create the ***forecast_store_item()*** function to perform the forecasts

The ***forecast_store_item()*** function below is used to apply a forecast to each partion of the data frame passed to it using the ***Prophet** package use the following steps:

1. Builds a ***Prophet*** model object with a 95% confidence interval and produces daily, weekly, and yearly seasonality information.
1. Fits the model to the partition passed to it. In this case, it is a partition at the store and item level.
1. Creates a data frame that produces a 365 day forecast of the data from **step 2** that includes the historical data that was used to produce the forecast.
1. The data frame in ***step 3*** contains many columns, many of them are not needed for our analysis. We use the ***f_pd*** is a summport of the data frame created in ***step 3*** that represents the data we need. **IMPROVE***
1. The ***h_pd*** is a subset of the ***history_pd*** that contains the columns we want to keep from the ***historical_df*** data frame. **IMPROVE THIS**.
1. A data frame is created based on a left outer join between ***f_pd*** and ***h_pd***.
1. blah blah blah
1. blah blah blah 
 

In [6]:
def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
    # TRAIN MODEL AS BEFORE
    # --------------------------------------
    # remove missing values (more likely at day-store-item level)
    history_pd = history_pd.dropna()
  
    # configure the model
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )
  
    # train the model
    model.fit( history_pd )
    # --------------------------------------
  
    # BUILD FORECAST AS BEFORE
    # --------------------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=365, 
        freq='d', 
        include_history=True
    )
    forecast_pd = model.predict( future_pd )  
    # --------------------------------------

    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    f_pd = forecast_pd[ ['ds', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'yhat'] ].set_index('ds')
  
    # get relevant fields from history
    h_pd = history_pd[['ds','storekey','productkey','y']].set_index('ds')
  
    # join history and forecast
    results_pd = f_pd.join( h_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)
  
    # get store & item from incoming data set
    results_pd['storekey'] = history_pd['storekey'].iloc[0]
    results_pd['productkey'] = history_pd['productkey'].iloc[0]
    # --------------------------------------
  
    # return expected dataset
    return results_pd[ ['ds', 'storekey', 'productkey', 'y', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'yhat'] ]

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 10, Finished, Available)

### Apply the forecast to each store and item combination

The code below groups the ***store_item_history*** data frame by the ***storekey*** and ***productkey*** then applies the ***forecast_store_item*** to each grouping. It uses the schema defined in the ***result_schema*** variable to format the output. Next, it adds a new column to the data frame named ***training_date*** and use the ***current_date()*** function to assign the current date to it. The resulting data frame is assigned to a variable named ***results***. Lastly, it uses the ***createOrReplaceTempView()*** method of the spark data frame to create a temporary view named ***new_forecasts***.

In [7]:
results = (
  store_item_history
    .groupBy('storekey', 'productkey')
    .applyInPandas(forecast_store_item, schema=result_schema)
    .withColumn('training_date', current_date() )
    )

results.createOrReplaceTempView('new_forecasts')

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 11, Finished, Available)

### Create the table structure for that ***sales*** table that will hold the forecasts

Converts the cell below to a ***SQL*** cell and runs a statement that deletes the ***sales*** table if it exists.

In [8]:
%%sql
DROP TABLE IF EXISTS sales;

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 12, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

Converts the code cell below to a ***SQL*** cell then creates the ***sales*** table.

In [9]:
%%sql

CREATE TABLE sales (
  Date Date,
  StoreKey integer,
  ProductKey integer,
  Sales float,
  Predicted_Sales float,
  Predicted_Sales_Lower float,
  Predicted_Sales_Upper float,
  Trend float,  
  Trend_Lower float,
  Trend_Upper float,
  multiplicative_terms float,
  multiplicative_terms_lower float,
  multiplicative_terms_upper float,
  Weekly_Seasonality float,
  Weekly_Seasonality_Lower float,
  Weekly_Seasonality_Upper float,
  Yearly_Seasonality float,
  Yearly_Seasonality_Lower float,
  Yearly_Seasonality_Upper float,
  additive_terms float,
  additive_terms_lower float,
  additive_terms_upper float,
  Forecasted int
  )
USING DELTA
PARTITIONED BY (`Date`);

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 13, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

### Inserts the forecasted data held in the ***new_forecasts*** view into the ***sales*** table

Converts the code cell below to a ***SQL*** cell then inserts the data from the ***new_forecasts*** view into the ***sales*** table.

In [10]:
%%sql

INSERT INTO sales(
  `Date`,
  StoreKey,
  ProductKey,
  Sales,
  Predicted_Sales,
  Predicted_Sales_Lower,
  Predicted_Sales_Upper,
  Trend,
  Trend_Lower,
  Trend_Upper,
  multiplicative_terms,
  multiplicative_terms_lower,
  multiplicative_terms_upper,
  Weekly_Seasonality,
  Weekly_Seasonality_Lower,
  Weekly_Seasonality_Upper,
  Yearly_Seasonality,
  Yearly_Seasonality_Lower,
  Yearly_Seasonality_Upper,
  additive_terms,
  additive_terms_lower,
  additive_terms_upper,
  Forecasted
)  

SELECT
  ds,
  storekey,
  productkey,
  y,
  yhat,
  yhat_upper,
  yhat_lower,
  trend,
  trend_lower,
  trend_upper,
  multiplicative_terms,
  multiplicative_terms_lower,
  multiplicative_terms_upper,
  weekly,
  weekly_lower,
  weekly_upper,
  yearly,
  yearly_lower,
  yearly_upper,
  additive_terms,
  additive_terms_lower,
  additive_terms_upper,
  CASE WHEN YEAR(ds) = 2018 THEN 1 ELSE 0 END AS Forecasted
FROM new_forecasts

StatementMeta(, 69ff7e87-4f6c-4858-9025-33edbcec1052, 14, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>