# Forecasting with Facebook's Prophet

Snowpark API Reference
https://docs.snowflake.com/developer-guide/snowpark/reference/python/index.html

This notebook will walk us through an example flow of a Data Scientist experimenting locally but offloading processing to Snowflake and storing the final results in Snowflake

The goal of this notebook is to explore how to leverage [Facebook's Prophet](https://facebook.github.io/prophet/) Python package to create powerful forecasts completely within Snowpark.

<b>Forecasting</b> - whether it's demand, sales, supply, or transportation - is a business planning method that helps analysts and planners better anticipate what is going to happen in their business. Depending on the complexity of the business operations, legacy forecasting can be within a handful of spreadsheets, in an ERP planning software (like SAP APO), or as complex as a machine learning model. 

For time-series analyses, several years of historical data is generally required to ensure high accuracy, depending on your situation. If you are forecasting hourly sales at a fast-food restaurant, you may need several months of hourly data to meet requirements. If you are forecasting at a weekly level for store sales, you may need several years of data to appropriately reflect any type of annual seasonality.

Another critical consideration in forecasting is the level of granularity. For example, you may need a sales forecast at both a regional and a store level. Often times, there needs to be a balance between the best level of detail to make decisions off of and how what historical values were regularly measured. As a rule of thumb, it's more accurate to have several specific forecasts (one for each store) and then aggregate up to the bigger picture (sales for all stores) if need be. Creating the underlying forecast at the store level and then aggregating up to the regional level can deliver the best results.

This notebook is leveraging the [Store-Item Demand Forecasting Challenge](https://www.kaggle.com/c/demand-forecasting-kernels-only/overview) Kaggle dataset. As a friendly reminder, this dataset is extremely clean, which is not usually the case for real-world forecasting. This dataset only contains historical sales data. While historical trends are generally the strongest indicators of what is to come, other factors, like pricing, promotions, distribution, or macroeconomic factors can influence sales.

<b>Why Facebook Prophet?</b>
Facebook Prophet is a powerful and easy to use time-series package, and you can use it in Snowpark! It does require a lot of historical and stationary data, but it does a great job combining long-term and short-term trends and picks up on seasonality. 

In [None]:
# Leveraging Snowpark's Pandas implementation to push processing directly to Snowflake
import snowflake.connector as sc
import snowflake.connector.pandas_tools as pt
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark import Session
from utils import snowpark_utils

session = snowpark_utils.get_snowpark_session()
session.use_database("snowpark")
session.use_schema("forecast")
session.use_warehouse("data_science")


In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Read in the sales data from local.
import pandas as pd

sales = pd.read_csv('data/train.csv')
sales.columns = ['DATE','STORE','ITEM','SALES']

## Move processing to Snowflake by creating a Snowpark DataFrame

In [None]:
from snowflake.snowpark.functions import concat, lit

#load pandas dataframe as a snowpark temporary table
sales_spdf = session.create_dataframe(sales)
#alternatively could have used a table already within Snowflake
#sales_spdf = session.table("sales")

# Adjust datatypes
sales_spdf = sales_spdf.withColumn('DATE', sales_spdf.DATE.cast("date"))\
                       .withColumn('STORE', sales_spdf.STORE.cast("string"))\
                       .withColumn('ITEM', sales_spdf.ITEM.cast("string"))

# Create a field for the combination of store-item
sales_spdf = sales_spdf.withColumn('STORE_ITEM', concat(sales_spdf.STORE, lit("_"), sales_spdf.ITEM))

In [None]:
sales_spdf.show()

### Exploratory Data Analysis

Before jumping into generating a forecast, let's get familiar with the data. Let's answer some of the following questions:
* How many products are there?
* How many stores are there?
* How many combinations are there?
* How many years of historical data is there? 
* Is the data stationary?

In [None]:
print('There are '+str(sales_spdf.select('STORE').distinct().count())+' unique stores')
print('There are '+str(sales_spdf.select('ITEM').distinct().count())+' unique products')
print('There are '+str(sales_spdf.select('STORE_ITEM').distinct().count())+' unique combinations for store-product')

In [None]:
#A quick overview
sales_spdf.describe().show()

In [None]:
from snowflake.snowpark.functions import when, count, col

#Check for nulls
sales_spdf.select([count(when(col(c).isNull(),c)).alias(c) for c in sales_spdf.columns]).show()

# Turns out that there are no null columns. Great!

In [None]:
historical_size = sales_spdf.select('DATE', 'STORE', 'ITEM').dropDuplicates()\
                            .groupBy('STORE','ITEM').agg(count('*').alias('NUM_DAYS'))

historical_size = historical_size.withColumn('NUM_WEEKS', historical_size.NUM_DAYS / 52)\
                                 .withColumn('NUM_YEARS', historical_size.NUM_DAYS / 365)

In [None]:
historical_size.select('NUM_WEEKS', 'NUM_YEARS').describe().show()

So far, the data looks very good! The range of sales and general statistics looks pretty reasonable. We have 5 years of historical data available. There are no null values. Now, let's more on and check to see if the data is overall stationary with some graphs

In [None]:
from snowflake.snowpark.functions import sum
import matplotlib.pyplot as plt

# Plot total daily sales
daily_sales = sales_spdf.select('DATE', 'SALES')
daily_sales = daily_sales.groupBy('DATE').agg(sum('SALES').alias('SALES'))\
                         .sort(col('DATE').asc())

plt.figure(figsize=(14,8)) 
plt.plot(daily_sales.to_pandas()['DATE'], daily_sales.to_pandas()['SALES'])
plt.title('Sales by Date')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.show()

In [None]:
store = '1'

# Plot daily sales by store
daily_sales_store = sales_spdf.select('DATE', 'STORE', 'SALES')
daily_sales_store = daily_sales_store.groupBy('DATE','STORE').agg(sum('SALES').alias('SALES'))\
                         .sort(col('DATE').asc())

filtered_daily_sales_store = daily_sales_store.filter(col('STORE')==store)
plt.figure(figsize=(14,8)) 
plt.plot(filtered_daily_sales_store.to_pandas()['DATE'], filtered_daily_sales_store.to_pandas()['SALES'])
plt.title('Store '+store+' Sales by Date')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.show()

In [None]:
item = '15'

# Plot daily sales - item
daily_sales_item = sales_spdf.select('DATE', 'ITEM', 'SALES')
daily_sales_item = daily_sales_item.groupBy('DATE', 'ITEM').agg(sum('SALES').alias('SALES'))\
                         .sort(col('DATE').asc())

filtered_daily_sales_item = daily_sales_item.filter(col('ITEM')==item)
plt.figure(figsize=(14,8)) 
plt.plot(filtered_daily_sales_item.to_pandas()['DATE'], filtered_daily_sales_item.to_pandas()['SALES'])
plt.title('Item '+item+' Sales by Date')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.show()

In [None]:
store_item = '3_44'

# Plot daily sales - item
daily_sales_store_item = sales_spdf.select('DATE', 'STORE', 'ITEM', 'STORE_ITEM', 'SALES')
daily_sales_store_item = daily_sales_store_item.groupBy('DATE', 'STORE', 'ITEM', 'STORE_ITEM').agg(sum('SALES').alias('SALES'))\
                         .sort(col('DATE').asc())

filtered_daily_sales_store_item = daily_sales_store_item.filter(col('STORE_ITEM')==store_item)
plt.figure(figsize=(14,8)) 
plt.plot(filtered_daily_sales_store_item.to_pandas()['DATE'], filtered_daily_sales_store_item.to_pandas()['SALES'])
plt.title('Store-Item '+store_item+' Sales by Date')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.show()

All of the data looks to be stationary, fantastic! Because we have the data to support it, we will build a forecast at the day-store-product level for a 90 day horizon.

## Model building

Since we have ample amount of data for this example, we can forecast at the lowest level of granularity: day-store-item. Even though each combination follows pretty similar patterns, we will still generate a unique forecast for each store-item combination. That's 500 forecasts! Don't worry, it's extremely easy and runs a lot faster than you would expect. 


Let's dive into it!

In [None]:
from snowflake.snowpark.functions import max
from datetime import timedelta  
"""
Because we're going to set a 90 day forecasting horizon, we will determine the split date as 90 days back from the 
max date. Again, we are only doing this to ensure we have labeled data to determine how well our training set does.
You can think of the first 4 years and 9 months are the training set. The testing or holdout set is the last 90 days.
"""

print('The max date is: '+str(sales_spdf.select(max('DATE') ).collect()[0][0]))
print('90 days for testing starts on: '+str(sales_spdf.select(max('DATE') ).collect()[0][0]-timedelta(90)))

In [None]:
# Prepare the dataframe for modeling 
sales_sub = sales_spdf.select('DATE', 'STORE_ITEM', 'SALES')

# Because the testing dataset doesn't have labels, let's leave out the last 90 days for the testing set
train = sales_sub.filter(col('DATE')<='2017-10-02')

# Combining sales and date columns
train = train.withColumn("DATE_SALES", concat(col('DATE'), lit("_"), col('SALES')))

In [None]:
train.show()

In [None]:
#now we will make and register a udf for the model training and inference
import pandas as pd
from prophet import Prophet

session.add_packages("pandas", "prophet")

In [None]:
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import ArrayType, StringType

@udf(name='prophet_fit', input_types=[ArrayType(StringType())], return_type=ArrayType(StringType()), is_permanent=False, replace=True)
def prophet_fit(ds_y: list) -> list:
    
    #splitting column into dates and values
    df = pd.DataFrame({'ds_y':ds_y}).ds_y.str.split('_',expand=True)
    df.columns = ['ds', 'y']
    df.ds = pd.to_datetime(df.ds)
    df.y = df.y.astype(int)
    
    
    # Enable daily sesanality since we are dealing with daily data
    m = Prophet(daily_seasonality=True)
    # Prophet has a built-in feature to easily add US holidays, so we will add that as a regressor
    m.add_country_holidays(country_name='US')
    
    # Now we fit the model
    m.fit(df)
    
    # This next step created a future facing data frame for 90 occurances (periods) at the daily level (freq)
    future = m.make_future_dataframe(periods=90, freq='d')
    
    forecast = m.predict(future)
    
    
    return forecast.ds.astype(str)+"_"+forecast.yhat.astype(str)+"_"+forecast.yhat_lower.astype(str)+"_"+forecast.yhat_upper.astype(str)

In [None]:
from snowflake.snowpark.functions import array_agg

#Aggregate training data by store-item
train_rotated = train.groupBy('STORE_ITEM').agg([array_agg(col('DATE_SALES')).alias("ALL_DATE_SALES")])

#This requires more compute power to perform the fit, lets scale up processing power
session.sql("alter warehouse data_science set warehouse_size=medium").collect()

#Run forecast
forecasts = train_rotated.withColumn('FORECAST', prophet_fit('ALL_DATE_SALES')).select('STORE_ITEM','FORECAST').cache_result()
#Takes 3min 40s on an XS
#Takes 1min 14s on a M

#Don't need the extra compute power anymore lets scale back down
session.sql("alter warehouse data_science set warehouse_size=xsmall").collect()

In [None]:
from snowflake.snowpark.functions import split
from snowflake.snowpark.functions import abs
#Separate forecast values out
flat_forecasts = forecasts.join_table_function("flatten", col("FORECAST")).select('STORE_ITEM','VALUE').withColumn('FORECASTS', split(col("VALUE"), lit("_")))

final_forecasts = flat_forecasts.select('STORE_ITEM', flat_forecasts['FORECASTS'][0], flat_forecasts['FORECASTS'][1], flat_forecasts['FORECASTS'][2], flat_forecasts['FORECASTS'][3])\
                                .toDF(["STORE_ITEM","DATE","YHAT","YHAT_LOWER","YHAT_UPPER"])\
                                .withColumn("DATE", col('DATE').cast("date"))\
                                .withColumn("YHAT", col('YHAT').cast("float"))\
                                .withColumn("YHAT_LOWER", col('YHAT_LOWER').cast("float"))\
                                .withColumn("YHAT_UPPER", col('YHAT_UPPER').cast("float"))

In [None]:
#Join back the actual sales values
final_forecasts = final_forecasts.join(sales_sub, using_columns=['STORE_ITEM', 'DATE'], join_type='left')

#Calculate forecast errors
final_forecasts = final_forecasts.withColumn("ERROR", col('SALES')-col('YHAT'))\
                                 .withColumn("ABS_ERROR", abs(col('ERROR')))

In [None]:
#Add a column that marks whether the row would be in train or test
final_forecasts = final_forecasts.withColumn('EVAL_SET', when(col('DATE') <= "2017-10-02", "TRAIN").otherwise("TEST"))

In [None]:
final_forecasts.show()

In [None]:
final_forecasts.write.mode("overwrite").save_as_table("OUTPUT_FORECASTS")

## Let's take a look at forecast accuracy

Most business define forecast accuracy as a weighted mean average percent error (MAPE).
* Forecast Accuracy = 1 - ( ABS(Predicted - Actuals) / Actuals )

We will use this approach to evaluate our forecast.

In [None]:
# Read in the forecast sales data from Snowflake 
results = session.table("OUTPUT_FORECASTS")
results.show()

In [None]:
results = results.withColumn('STORE', split(col("STORE_ITEM"), lit("_"))[0].cast("string"))\
                 .withColumn('ITEM', split(col("STORE_ITEM"), lit("_"))[1].cast("string"))

results.show()

In [None]:
# Aggregate results at an overall level (actuals and predicted)
results_agg = results.select('DATE','SALES','YHAT').groupBy('DATE').agg([sum('YHAT').alias('YHAT'), sum('SALES').alias('SALES')]).sort(col('DATE').asc())
results_agg.show()

In [None]:
fig,ax = plt.subplots(figsize=(14,8))
ax.plot(results_agg.to_pandas()['DATE'], results_agg.to_pandas()['SALES'])
ax.set_xlabel("Date")
ax.set_ylabel("Sales")
ax.plot(results_agg.to_pandas()['DATE'], results_agg.to_pandas()['YHAT'])
plt.legend(["Actual", "Predicted"])
plt.show()

In [None]:
eval_df = results.filter(col('EVAL_SET')=='TEST')
eval_df.show()

In [None]:
results_agg = eval_df.select('DATE','SALES','YHAT').groupBy('DATE').agg([sum('YHAT').alias('YHAT'), sum('SALES').alias('SALES')]).sort(col('DATE').asc())

fig,ax = plt.subplots(figsize=(14,8))
ax.plot(results_agg.to_pandas()['DATE'], results_agg.to_pandas()['SALES'])
ax.set_xlabel("Date")
ax.set_ylabel("Sales")
ax.plot(results_agg.to_pandas()['DATE'], results_agg.to_pandas()['YHAT'])
plt.legend(["Actual", "Predicted"])
plt.show()

In [None]:
train = results.filter(col('EVAL_SET')=='TRAIN')
print('The forecast accuracy for the training set is: '+str(1-(train.agg(sum('ABS_ERROR')).collect()[0][0]/train.agg(sum('SALES')).collect()[0][0])))

test = results.filter(col('EVAL_SET')=='TEST')
print('The forecast accuracy for the testing set is: '+str(1-(test.agg(sum('ABS_ERROR')).collect()[0][0]/test.agg(sum('SALES')).collect()[0][0])))

In [None]:
store_item_accuracy = eval_df.select('STORE', 'ITEM', 'SALES', 'YHAT', 'ABS_ERROR')\
                             .groupBy(['STORE', 'ITEM']).agg([sum('ABS_ERROR').alias('ABS_ERROR'), sum('SALES').alias('SALES')])\
                             .withColumn('OVERALL_ACCURACY', lit(1)-(col('ABS_ERROR') / col('SALES')) )
store_item_accuracy.show()

In [None]:
# Top 10 store-item combinations by forecast accuracy
store_item_accuracy.sort(col('OVERALL_ACCURACY').desc()).show(10)

In [None]:
# Last 10 store-item combinations by forecast accuracy
store_item_accuracy.sort(col('OVERALL_ACCURACY').asc()).show(10)

In [None]:
# Close session results in Snowflake cleannig up all the temp session tables
session.close()

Go into Snowflake query-history and see what's being done behind the scenes