# Cross validation: Facebook's Prophet
---

Facebook Prophet with New Year holiday.

This notebook conducts cross validation of the method using a rolling forecast origin method.

**The notebook outputs:**
* MASE, RMSE and MAPE at 7 day intervals from 7 to 84 days.
* 80 and 95% prediction intervals between 7 and 84 days.

These are saved into the folder `results/model_selection/stage2/`

---

# Imports

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

#for parallel running of CV
from joblib import Parallel, delayed

#error measures
from forecast_tools.metrics import (mean_absolute_scaled_error, 
                                    root_mean_squared_error,
                                    symmetric_mean_absolute_percentage_error)

#models
from statsmodels.tsa.arima.model import ARIMA
from fbprophet import Prophet

import warnings
warnings.filterwarnings('ignore')

In [2]:
#to select exceptionally busy days as covariates.
from amb_forecast.feature_engineering import (regular_busy_calender_days)

In [3]:
#custom ensemble class
from amb_forecast.ensemble import (Ensemble, UnweightedVote)

# Data Input

The constants `TOP_LEVEL`, `STAGE`, `REGION`,`TRUST` and `METHOD` are used to control data selection and the directory for outputting results.  

> Output file is `f'{TOP_LEVEL}/{STAGE}/{REGION}-{METHOD}_{metric}.csv'`.  where metric will be smape, rmse, mase, coverage_80 and coverage_95. Note: `REGION`: is also used to select the correct data from the input dataframe.

In [4]:
#SW subregions (notebook runs CV for subregions in parallel)
regions = ['BNSSG',
           'Cornwall',
           'Devon',
           'Dorset',
           'Gloucestershire',
           'Somerset',
           'Wiltshire']

TOP_LEVEL = '../../../results/model_selection'
STAGE = 'stage2'
TRUST = 'south_west'  
METHOD = 'fbp'  # fbp-arima: ensemble; fbp: prophet only

FILE_NAME = 'Daily_Responses_5_Years_2019_full.csv'

#london data is tab delimited '\t', the rest are comma ','
DELIMITER = '\t'

#methods to include in ensemble
INCLUDE_ARIMA = False
INCLUDE_FBP = True

#split training and validation data.
TEST_SPLIT_DATE = '2019-01-01'

#second subdivide: train and val
VAL_SPLIT_DATE = '2017-07-01'

#discard data after 2020 due to coronavirus
#this is the subject of a seperate study.
DISCARD_DATE = '2020-01-01'

In [5]:
#read in path
path = f'../../../data/{FILE_NAME}'
print(path)

../../../data/Daily_Responses_5_Years_2019_full.csv


In [6]:
def pre_process_daily_data(path, index_col, by_col, 
                           values, dayfirst=False,sep=','):
    '''
    Daily data is stored in long format.  Read in 
    and pivot to wide format so that there is a single 
    colmumn for each regions time series.
    '''
    df = pd.read_csv(path, 
                     sep=sep,
                     index_col=index_col, 
                     parse_dates=True, 
                     dayfirst=dayfirst)
    
    df.columns = map(str.lower, df.columns)
    df.index.rename(str(df.index.name).lower(), inplace=True)
    
    clean_table = pd.pivot_table(df, values=values.lower(), 
                                 index=[index_col.lower()],
                                 columns=[by_col.lower()], aggfunc=np.sum)
    
    clean_table.index.freq = 'D'
    
    return clean_table

In [7]:
clean = pre_process_daily_data(path, 'Actual_dt', 'ORA', 'Actual_Value', 
                               dayfirst=False)
clean.head()

ora,BNSSG,Cornwall,Devon,Dorset,Gloucestershire,OOA,Somerset,Trust,Wiltshire
actual_dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2013-12-30,415.0,220.0,502.0,336.0,129.0,,183.0,2042.0,255.0
2013-12-31,420.0,236.0,468.0,302.0,128.0,,180.0,1996.0,260.0
2014-01-01,549.0,341.0,566.0,392.0,157.0,,213.0,2570.0,351.0
2014-01-02,450.0,218.0,499.0,301.0,115.0,,167.0,2013.0,258.0
2014-01-03,419.0,229.0,503.0,304.0,135.0,,195.0,2056.0,269.0


## Train Test Split

In [8]:
def ts_train_test_split(data, split_date):
    '''
    Split time series into training and test data
    
    Parameters:
    -------
    data - pd.DataFrame - time series data.  Index expected as datatimeindex
    split_date - the date on which to split the time series
    
    Returns:
    --------
    tuple (len=2) 
    0. pandas.DataFrame - training dataset
    1. pandas.DataFrame - test dataset
    '''
    train = data.loc[data.index < split_date]
    test = data.loc[data.index >= split_date]
    return train, test

In [9]:
train, test = ts_train_test_split(clean, split_date=TEST_SPLIT_DATE)

#exclude data after 2020 due to coronavirus.
test, discard = ts_train_test_split(test, split_date=DISCARD_DATE)

#train split into train and validation
train, val = ts_train_test_split(train, split_date=VAL_SPLIT_DATE)

In [10]:
#amount of training data
train.shape

(1279, 9)

In [11]:
#amount of validation data
val.shape

(549, 9)

# New years day

In [12]:
exceptional = regular_busy_calender_days(train[regions[0]], quantile=0.99)

In [13]:
new_year = pd.DataFrame({
                         'holiday': 'new_year',
                         'ds': pd.date_range(start=exceptional[0], 
                                             periods=20, 
                                             freq='YS')
                        })

In [14]:
new_year.head()

Unnamed: 0,holiday,ds
0,new_year,2013-01-01
1,new_year,2014-01-01
2,new_year,2015-01-01
3,new_year,2016-01-01
4,new_year,2017-01-01


# Wrapper classes for Prophet and statsmodels ARIMA

Adapter/wrapper classes to enable usage within `Ensemble` class and work with cross validation.

In [15]:
class FbProphetWrapper:
    '''
    Facade for FBProphet object - so that it can be
    used within Ensemble with methods from other packages

    '''
    def __init__(self, training_index, holidays=None, interval_width=0.8,
                 mcmc_samples=0, changepoint_prior_scale=0.05):
        self._training_index = training_index
        self._holidays = holidays
        self._interval_width = interval_width
        self._mcmc_samples = mcmc_samples
        self._cp_prior_scale = changepoint_prior_scale

    def _get_resids(self):
        return self._train - self._forecast['yhat'][:-self._h]

    def _get_preds(self):
        return self._forecast['yhat'][:-self._h].to_numpy()

    def fit(self, train):
        
        self._model = Prophet(holidays=self._holidays, 
                              interval_width=self._interval_width,
                              mcmc_samples=self._mcmc_samples,
                              changepoint_prior_scale=self._cp_prior_scale,
                              daily_seasonality=False)
        
        
        self._model.fit(self._pre_process_training(train))
        self._t = len(train)
        self._train = train
        self.predict(len(train))

    def _pre_process_training(self, train):

        if len(train.shape) > 1:
            y_train = train[:, 0]
        else:
            y_train = train

        y_train = np.asarray(y_train)
            
        #extend the training index
        if len(y_train) > len(self._training_index):
            self._training_index = pd.date_range(start=self._training_index[0], 
                                                 periods=len(y_train),
                                                 freq=self._training_index.freq)
        
        
        prophet_train = pd.DataFrame(self._training_index)
        prophet_train['y'] = y_train
        prophet_train.columns = ['ds', 'y']
        
        return prophet_train

    def predict(self, h, return_conf_int=False, alpha=0.2):
        '''
        forecast h steps ahead.
        
        Params:
        ------
        h: int
            h-step forecast
        
        return_conf_int: bool, optional (default=False)
            return 1 - alpha PI
        
        alpha: float, optional (default=0.2)
            return 1 - alpha PI
                       
        Returns:
        -------
        np.array
            If return_conf_int = False returns preds only
            
        np.array, np.array
            If return_conf_int = True returns tuple of preds, pred_ints
        '''
        if isinstance(h, (np.ndarray, pd.DataFrame)):
            h = len(h)
        
        self._h = h
        future = self._model.make_future_dataframe(periods=h)
        self._forecast = self._model.predict(future)

        if return_conf_int:
            return (self._forecast['yhat'][-h:].to_numpy(), 
                    self._forecast[['yhat_lower', 'yhat_upper']][-h:].to_numpy())
        else:
            return self._forecast['yhat'][-h:].to_numpy()
            

    fittedvalues = property(_get_preds)
    resid = property(_get_resids)

In [16]:
class ARIMAWrapper(object):
    '''
    Facade for statsmodels.statespace
    '''
    def __init__(self, order, seasonal_order, training_index, holidays=None):
        self._order = order
        self._seasonal_order = seasonal_order
        self._training_index = training_index
        self._holidays = holidays

    def _get_resids(self):
        return self._fitted.resid

    def _get_preds(self):
        return self._fitted.fittedvalues
    
    def _encode_holidays(self, holidays, idx):
        dummy = idx.isin(holidays).astype(int)
        dummy = pd.DataFrame(dummy)
        dummy.columns = ['holiday']
        dummy.index = idx
        return dummy

    def fit(self, y_train):
        
        #extend training index
        if len(y_train) > len(self._training_index):

            self._training_index = pd.date_range(start=self._training_index[0], 
                                                 periods=len(y_train),
                                                 freq=self._training_index.freq)
            
        holiday_train = None
        if not self._holidays is None:
            holiday_train = self._encode_holidays(self._holidays, 
                                                  self._training_index)
    
        
        self._model = ARIMA(endog=y_train,
                            exog=holiday_train,
                            order=self._order, 
                            seasonal_order=self._seasonal_order,
                            enforce_stationarity=False)
        
        #could be modified to catch LU decomp error?
        self._fitted = self._model.fit()
        self._t = len(train)
        
    
    def predict(self, horizon, return_conf_int=False, alpha=0.2):
        '''
        forecast h steps ahead.
        
        Params:
        ------
        h: int
            h-step forecast
        
        return_conf_int: bool, optional (default=False)
            return 1 - alpha PI
        
        alpha: float, optional (default=0.2)
            return 1 - alpha PI
                       
        Returns:
        -------
        np.array
            If return_conf_int = False returns preds only
            
        np.array, np.array
            If return_conf_int = True returns tuple of preds, pred_ints
        '''
        
        #+1 to date range then trim off the first value

        f_idx = pd.date_range(start=self._training_index[-1], 
                              periods=horizon+1,
                              freq=self._training_index.freq)[1:]
        
        #encode holidays if included.
        exog_holiday = None
        if not self._holidays is None:
            exog_holiday = self._encode_holidays(self._holidays, f_idx)
        
    
        forecast = self._fitted.get_forecast(horizon, exog=exog_holiday)
        mean_forecast = forecast.summary_frame()['mean'].to_numpy()
        
        if return_conf_int:
            df = forecast.summary_frame(alpha=alpha)
            pi = df[['mean_ci_lower', 'mean_ci_upper']].to_numpy()
            return mean_forecast, pi
            
        
        else:
            return mean_forecast

    fittedvalues = property(_get_preds)
    resid = property(_get_resids)  

# Example of fitting the ensemble
1. Regression with New Year Holiday and Auto ARIMA errors
2. FBProphet with new years day holiday.

The code below demonstrates how to fit the model.

In [17]:
model_1 = FbProphetWrapper(training_index=train.index, 
                           holidays=new_year)

In [18]:
estimators = {'fbp': model_1}
ens = Ensemble(estimators, UnweightedVote())

In [19]:
#fit to training data in chosen region
ens.fit(train[regions[0]])

In [20]:
#predict 7 days ahead
H = 7
ens_preds = ens.predict(horizon=H)

In [21]:
#view predictions
ens_preds

array([521.68570355, 509.71188428, 481.68412198, 460.80217661,
       459.26660331, 465.93804988, 477.92087287])

In [22]:
#with prediction intervals
ens_preds, pi = ens.predict(horizon=H, return_conf_int=True)

In [23]:
ens_preds

array([521.68570355, 509.71188428, 481.68412198, 460.80217661,
       459.26660331, 465.93804988, 477.92087287])

In [24]:
pi

array([[491.28619579, 555.90432881],
       [478.18449613, 542.03855281],
       [447.43024087, 514.50529019],
       [429.34934024, 492.13412941],
       [425.79561608, 494.12547608],
       [433.67033282, 497.93074704],
       [448.28731408, 513.3702458 ]])

# Cross validation functions

`time_series_cv` implements rolling forecast origin cross validation for time series.  
It does not calculate forecast error, but instead returns the predictions, pred intervals and actuals in an array that can be passed to any forecast error function. (this is for efficiency and allows additional metrics to be calculated if needed).

In [25]:
def time_series_cv(model, train, val, horizons, alpha=0.2, step=1):
    '''
    Time series cross validation across multiple horizons for a single model.

    Incrementally adds additional training data to the model and tests
    across a provided list of forecast horizons. Note that function tests a
    model only against complete validation sets.  E.g. if horizon = 15 and 
    len(val) = 12 then no testing is done.  In the case of multiple horizons
    e.g. [7, 14, 28] then the function will use the maximum forecast horizon
    to calculate the number of iterations i.e if len(val) = 365 and step = 1
    then no. iterations = len(val) - max(horizon) = 365 - 28 = 337.
    
    Parameters:
    --------
    model - forecasting model

    train - np.array - vector of training data

    val - np.array - vector of validation data

    horizons - list of ints, forecast horizon e.g. [7, 14, 28] days
    
    alpha - float, optional (default=0.2)
        1 - alpha prediction interval specification

    step -- int, optional (default=1)
            step taken in cross validation 
            e.g. 1 in next cross validation training data includes next point 
            from the validation set.
            e.g. 7 in the next cross validation training data includes next 7 points
            (default=1)
            
    Returns:
    -------
    np.array, np.array, np.array
        - cv_preds, cv_test, cv_intervals
    '''
    
    #point forecasts
    cv_preds = [] 
    #ground truth observations
    cv_actuals = [] 
    #prediction intervals
    cv_pis = []
    
    split = 0

    print('split => ', end="")
    for i in range(0, len(val) - max(horizons) + 1, step):
        split += 1
        print(f'{split}, ', end="")
                
        train_cv = np.concatenate([train, val[:i]], axis=0)
        model.fit(train_cv)
        
        #predict the maximum horizon 
        preds, pis = model.predict(horizon=len(val[i:i+max(horizons)]), 
                                   return_conf_int=True,
                                   alpha=alpha)        
        cv_h_preds = []
        cv_test = []
        cv_h_pis = []
        
        #sub horizon calculations
        for h in horizons:
            #store the h-step prediction
            cv_h_preds.append(preds[:h])
            #store the h-step actual value
            cv_test.append(val.iloc[i:i+h])    
            cv_h_pis.append(pis[:h])
                     
        cv_preds.append(cv_h_preds)
        cv_actuals.append(cv_test)
        cv_pis.append(cv_h_pis)
        
    print('done.\n')        
    return cv_preds, cv_actuals, cv_pis

## Custom functions for calculating CV scores for point predictions and coverage.

These functions have been written to work with the output of `time_series_cv`

In [26]:
def split_cv_error(cv_preds, cv_test, error_func):
    '''
    Forecast error in the current split
    
    Params:
    -----
    cv_preds, np.array
        Split predictions
        
    
    cv_test: np.array
        acutal ground truth observations
        
    error_func: object
        function with signature (y_true, y_preds)
        
    Returns:
    -------
        np.ndarray
            cross validation errors for split
    '''
    n_splits = len(cv_preds)
    cv_errors = []
    
    for split in range(n_splits):
        pred_error = error_func(cv_test[split], cv_preds[split])
        cv_errors.append(pred_error)
        
    return np.array(cv_errors)

def forecast_errors_cv(cv_preds, cv_test, error_func):
    '''
    Forecast errors by forecast horizon
    
    Params:
    ------
    cv_preds: np.ndarray
        Array of arrays.  Each array is of size h representing
        the forecast horizon specified.
        
    cv_test: np.ndarray
        Array of arrays.  Each array is of size h representing
        the forecast horizon specified.
        
    error_func: object
        function with signature (y_true, y_preds)
        
    Returns:
    -------
    np.ndarray
        
    '''
    cv_test = np.array(cv_test)
    cv_preds = np.array(cv_preds)
    n_horizons = len(cv_test)    
    
    horizon_errors = []
    for h in range(n_horizons):
        split_errors = split_cv_error(cv_preds[h], cv_test[h], error_func)
        horizon_errors.append(split_errors)

    return np.array(horizon_errors)

def split_coverage(cv_test, cv_intervals):
    n_splits = len(cv_test)
    cv_errors = []
        
    for split in range(n_splits):
        val = np.asarray(cv_test[split])
        lower = cv_intervals[split].T[0]
        upper = cv_intervals[split].T[1]
        
        coverage = len(np.where((val > lower) & (val < upper))[0])
        coverage = coverage / len(val)
        
        cv_errors.append(coverage)
        
    return np.array(cv_errors)
    
    
def prediction_int_coverage_cv(cv_test, cv_intervals):
    cv_test = np.array(cv_test)
    cv_intervals = np.array(cv_intervals)
    n_horizons = len(cv_test)    
    
    horizon_coverage = []
    for h in range(n_horizons):
        split_coverages = split_coverage(cv_test[h], cv_intervals[h])
        horizon_coverage.append(split_coverages)

    return np.array(horizon_coverage)  

In [27]:
def split_cv_error_scaled(cv_preds, cv_test, y_train):
    n_splits = len(cv_preds)
    cv_errors = []
    
    for split in range(n_splits):
        pred_error = mean_absolute_scaled_error(cv_test[split], cv_preds[split], 
                                                y_train, period=7)
        
        cv_errors.append(pred_error)
        
    return np.array(cv_errors)

def forecast_errors_cv_scaled(cv_preds, cv_test, y_train):
    cv_test = np.array(cv_test)
    cv_preds = np.array(cv_preds)
    n_horizons = len(cv_test)    
    
    horizon_errors = []
    for h in range(n_horizons):
        split_errors = split_cv_error_scaled(cv_preds[h], cv_test[h], y_train)
        horizon_errors.append(split_errors)
        
    return np.array(horizon_errors)

In [28]:
def get_ensemble(fb_interval=0.8, include_arima=True,
                 include_prophet=True):
    '''
    Create ensemble model. Simple average.
    
    Parameters:
    ----------
    fb_interval: float, optional (default=0.8)
        precision of confidence interval
        
    include_arima: bool, optional (default=True)
        Ensemble model include Regression with ARIMA errors (new years day)
        
    include_prophet: bool, optional (default=True)
        Ensemble model includes Prophet model
        
    Returns:
    ------
    Ensemble
    
    '''
        
    model_1 = ARIMAWrapper(order=(1,1,3), seasonal_order=(1,0,1,7), 
                          training_index=train.index,
                          holidays=new_year['ds'].tolist())
        
    model_2 = FbProphetWrapper(training_index=train.index, 
                           holidays=new_year, interval_width=fb_interval)
    
    estimators = {}
    
    if include_arima:
        estimators['arima'] = model_1
    
    if include_prophet:
        estimators['fbp'] = model_2
        
    return Ensemble(estimators, UnweightedVote())
    

# Run cross validation
This is run twices once each for 80 and 95% prediction intervals.

> note this version of the notebook runs ALL subregions specified in the `regions` list.

In [29]:
def single_region_cv(train, val, region, horizons, alpha, step=7, output_pf=True,
                     output_cov=True):
    '''
    Rolling origin forecast time series cross validation of a single geo region.
    
    Params:
    -------
    train - pd.Series
        Initial training data
        
    val: pd.Series
        Validation data (gradually used up as CV executes)
    
    region: str
        The name of the region
        
    horizons: list
        A list of integer time steps to predict ahead.
    
    alpha: float
        1 - alpha prediction interval to generate
    
    step: int, optional (default=7)
        stride of cross validation.  I.e. spacing between folds
        
    output_pf: bool, optional (default=True)
        write point forecast results to file
        
    output_cov: bool, optional (default=True)
        write pi coverage to file
    
    Returns:
    -------
    dict
        str: pd.DataFrame
        where str is the name of the metric.
    '''
    #get the model
    model = get_ensemble(fb_interval=1-alpha,
                         include_arima=INCLUDE_ARIMA, 
                         include_prophet=INCLUDE_FBP)

    #cv
    results = time_series_cv(model, train, val, horizons, 
                             alpha=alpha, step=step)
    
    #smape
    cv_preds, cv_test, cv_intervals = results
    cv_errors = forecast_errors_cv(cv_preds, cv_test, 
                                   symmetric_mean_absolute_percentage_error)
    df_smape = pd.DataFrame(cv_errors)
    df_smape.columns = horizons

    #rmse
    cv_errors = forecast_errors_cv(cv_preds, cv_test, root_mean_squared_error)
    df_rmse = pd.DataFrame(cv_errors)
    df_rmse.columns = horizons
    
    #mase
    cv_errors = forecast_errors_cv_scaled(cv_preds, cv_test, train)
    df_mase = pd.DataFrame(cv_errors)
    df_mase.columns = horizons
    df_mase.describe()
    
    #coverage
    cov = int((1 - alpha) * 100)
    cv_coverage = prediction_int_coverage_cv(cv_test, cv_intervals)
    df_cov = pd.DataFrame(cv_coverage)
    df_cov.columns = horizons
    
    #write to file.
    if output_pf:
        df_smape.to_csv(f'{TOP_LEVEL}/{STAGE}/{region}-{METHOD}_smape.csv')
        df_rmse.to_csv(f'{TOP_LEVEL}/{STAGE}/{region}-{METHOD}_rmse.csv')
        df_mase.to_csv(f'{TOP_LEVEL}/{STAGE}/{region}-{METHOD}_mase.csv')
        
    if output_cov:
        df_cov.to_csv(f'{TOP_LEVEL}/{STAGE}/{region}-{METHOD}_coverage_{cov}.csv')
        
    results_in_dict = {'smape': df_smape,
                       'rmse': df_rmse,
                       'mase': df_mase,
                       'coverage_{cov}': df_cov}
    
    return results_in_dict
    

In [30]:
def multi_region_cv(regions, train, val, horizons, alpha, step=7, n_jobs=-1,
                    output_pf=True, output_cov=True):
    '''
    Parallel execution of CV across multiple regions.
    Rolling origin forecast time series cross validation.
    
    
    Params:
    -------
    train - pd.Series
        Initial training data
        
    val: pd.Series
        Validation data (gradually used up as CV executes)
        
    alpha: float
        1 - alpha prediction interval to generate
    
    step: int, optional (default=7)
        stride of cross validation.  I.e. spacing between folds
        
    output_pf: bool, optional (default=True)
        write point forecast results to file
        
    output_cov: bool, optional (default=True)
        write pi coverage to file

    Returns:
    -------
    list of results ordered by region.  Each list item is a dict:
        str: pd.DataFrame
        where str is the name of the metric.
    '''
    #parallel execution
    res = Parallel(n_jobs=n_jobs)(delayed(single_region_cv)(train[region], 
                   val[region], region, horizons, alpha, step, output_pf, 
                   output_cov) for region in regions)
    
    return res

# Script to run trust level TSCV in parallel.

In [31]:
horizons = [7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84]

print('Running TSCV: with 80% prediction intervals')

#with 80% prediction intervals
results_80 = multi_region_cv(regions, train, val, horizons, alpha=0.2, step=7, 
                             n_jobs=-1)

Running TSCV: with 80% prediction intervals


In [32]:
print('Running TSCV: with 95% prediction intervals')

#with 95% prediction intervals
results_95 = multi_region_cv(regions, train, val, horizons, alpha=0.05, step=7, 
                             n_jobs=-1)

print('Analysis complete')

Running TSCV: with 95% prediction intervals
Analysis complete
