In [None]:
import numpy as np 
import pandas as pd
from datetime import timedelta, datetime as dt 
from fbprophet import Prophet
import itertools

In [None]:
def mean_absolute_percentage_error(y_true, y_pred): 
    """Calculates MAPE given y_true and y_pred"""
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - np.abs(y_pred)) / np.abs(y_pred))) * 100

In [None]:
#perform grid search for hyperparamets optimization on all forecasts
def grid_search(forecasts, languages, contacts, data_dict):
    grid_search_results = {'best_params': {forecast: {language: None for language in languages} for forecast in forecasts},
                       'best_scores': {forecast: {language: None for language in languages} for forecast in forecasts}
                       }
    split_date = (dt.utcnow().date() - timedelta(days=60)).strftime('%d-%b-%Y')
    modes = ['additive','multiplicative']
    changepoints = [0.001,0.01,0.1,1,2,5,8,10,12,15,17,20,25]
    yearly_seasonality = [1,2,5,10,12,15,17,20,23,25]
    fouriers = [15,12,10,5,3,1]

    for forecast in forecasts:
        #get list with different combinations possible to test
        a = [modes,changepoints,yearly_seasonality,fouriers]
        combinations = list(itertools.product(*a))
        for combination in combinations:
            for language in languages:

                #apply different trasnsformations depending on channel
                if 'live' in forecast:
                    channel = ['chat','call']
                    
                    #filter for language and channels
                    df = contacts[(contacts['channel'].isin(channel)) & (contacts['contact_language']==language)]
                    
                    #combine both channels
                    df = df.resample('D').sum()
                    
                    #merge contacts with extra data source
                    df = df.merge(right=data_dict[language].reset_index()[['period','pct_total']],on='period',how='left')

                    #deduct % contacts from abnormal period from incoming contacts to get real contacts in normal conditions
                    df['real_contacts'] = df['incoming']*np.where(df['pct_total'].notnull(),(1-df['pct_total']),1)
                    
                    #split saturdays and weekdays on live forecast
                    if 'saturday' in forecast:
                        df_train = df[(df['period']< split_date) & (df['period'].dt.dayofweek.isin([5]))
                             ][['period','real_contacts']]
                        df_test = df[(df['period']> split_date) & (df['period'].dt.dayofweek.isin([5]))
                            ][['period','real_contacts']]
                    else:
                        df_train = df[(df['period']< split_date) & (~df['period'].dt.dayofweek.isin([6,5]))
                                     ][['period','real_contacts']]
                        df_test = df[(df['period']> split_date) & (~df['period'].dt.dayofweek.isin([6,5]))
                                    ][['period','real_contacts']]

                elif 'email' in forecast:
                    channel = ['email']
                    df = contacts[(contacts['channel'].isin(channel)) & (contacts['contact_language']==language)]
                    df = df.resample('D').sum()
                    df = df.merge(right=data_dict[language].reset_index()[['period','pct_total']],on='period',how='left')
                    df['real_contacts'] = df['incoming']*np.where(df['pct_total'].notnull(),(1-df['pct_total']),1)
                    df_train = df[(df['period']< split_date)][['period','real_contacts']]
                    df_test = df[(df['period']> split_date) ][['period','real_contacts']]


                #initiate model with all parameters from combinations with weekly/monthly/yearly seasonality ON
                model = Prophet(
                                seasonality_mode=combination[0], 
                                changepoint_prior_scale=combination[1], 
                                yearly_seasonality=combination[2], 
                                weekly_seasonality=True)
                model.add_seasonality(name='monthly', period=30.5, fourier_order=combination[3])

                #input holidays from particular country if available or TAR (common european holidays) if not available
                if language == 'fr':
                    model.add_country_holidays(country_name='FRA')
                    holiday = 'FRA'
                elif language != 'en':
                    model.add_country_holidays(country_name=language.upper())
                    holiday = language.upper()
                else:
                    model.add_country_holidays(country_name='TAR')
                    holiday = 'TAR'

                #fit model with train data
                model.fit(df_train.reset_index().rename(columns={'period':'ds', 'real_contacts':'y'}))

                #make a prediction DF using the test data
                df_grid = model.predict(df=df_test.reset_index().rename(columns={'period':'ds'}))

                #get the mean absolute percentage error for this forecast
                MAPE = mean_absolute_percentage_error(y_true=df_test['real_contacts'],y_pred=df_grid['yhat'])

                #compare the results and update grid search results parameters with best MAPE
                if grid_search_results['best_scores'][forecast][language] == None:
                    grid_search_results['best_scores'][forecast][language] = MAPE
                elif grid_search_results['best_scores'][forecast][language] > MAPE:
                    grid_search_results['best_params'][forecast][language] = {
                                                        'seasonality_mode': combination[0],
                                                        'changepoint_prior_scale': combination[1],
                                                        'yearly_seasonality': combination[2], 
                                                        'holiday': holiday,
                                                        'fourier': combination[3]
                                                      }
                    grid_search_results['best_scores'][forecast][language] = MAPE
    return grid_search_results

In [None]:
#pipeline to get forecast production data using the parameters set by the gridsearch
def get_forecast(grid_search_results, forecasts, contacts, data_dict):
    split_date = dt.utcnow().date().strftime('%d-%b-%Y')
    df_fcst = {forecast: {language: None for language in languages} for forecast in forecasts }
    for forecast in forecasts:
        for language in languages:

            #get all parameters loaded from the grid search results for each language
            fourier = grid_search_results['best_params'][forecast][language]['fourier']
            yearly = grid_search_results['best_params'][forecast][language]['yearly_seasonality']
            changepoint = grid_search_results['best_params'][forecast][language]['changepoint_prior_scale']
            mode = grid_search_results['best_params'][forecast][language]['seasonality_mode']

            #apply different trasnsformations depending on channel
            if 'live' in forecast:
                channel = ['chat','call']
                df = contacts[(contacts['channel'].isin(channel)) & (contacts['contact_language']==language)]
                df = df.resample('D').sum()
                df = df.merge(right=data_dict[language].reset_index()[['period','pct_total']],on='period',how='left')
                df['real_contacts'] = df['incoming']*np.where(df['pct_total'].notnull(),(1-df['pct_total']),1)
                
                #split saturdays and weekdays on live forecast
                if 'saturday' in forecast:
                    df_train = df[(df['period']< split_date) & (df['period'].dt.dayofweek.isin([5]))
                             ][['period','real_contacts']]
                else:
                    df_train = df[(df['period']< split_date) & (~df['period'].dt.dayofweek.isin([6,5]))
                                 ][['period','real_contacts']]
            
            elif 'email' in forecast:
                channel = ['email']
                df = contacts[(contacts['channel'].isin(channel)) & (contacts['contact_language']==language)]
                df = df.resample('D').sum()
                df = df.merge(right=data_dict[language].reset_index()[['period','pct_total']],on='period',how='left')
                df['real_contacts'] = df['incoming']*np.where(df['pct_total'].notnull(),(1-df['pct_total']),1)
                df_train = df[(df['period']< split_date)][['period','real_contacts']]

            #initiate model with all parameters from grid search results
            model = Prophet(
                    seasonality_mode=grid_search_results['best_params'][forecast][language]['seasonality_mode'], 
                    changepoint_prior_scale=grid_search_results['best_params'][forecast][language]['changepoint_prior_scale'], 
                    yearly_seasonality=grid_search_results['best_params'][forecast][language]['yearly_seasonality'], 
                    weekly_seasonality=True)
            model.add_seasonality(name='monthly', period=30.5, fourier_order=grid_search_results['best_params'][forecast][language]['fourier'])

            #input holidays from particular country if available or TAR (common european holidays) if not available
            if language == 'fr':
                model.add_country_holidays(country_name='FRA')
                holiday = 'FRA'
            elif language != 'en':
                model.add_country_holidays(country_name=language.upper())
                holiday = language.upper()
            else:
                model.add_country_holidays(country_name='TAR')
                holiday = 'TAR'

            #fit model with train data
            model.fit(df_train.reset_index().rename(columns={'period':'ds', 'real_contacts':'y'}))

            #create DF with future dates to forecast
            future = model.make_future_dataframe(freq='D',periods=365,include_history=False)
            
            #separate weekdays from saturday live forecast
            if 'live' in forecast:
                if 'saturday' in forecast:
                    future = future[future['ds'].dt.dayofweek.isin([5])]
                else:    
                    future = future[~future['ds'].dt.dayofweek.isin([6,5])]

            #make prediction DF
            df_fcst[forecast][language] = model.predict(future)

            #add channel and language columns
            df_fcst[forecast][language]['language'] = language
            df_fcst[forecast][language]['channel'] = channel[0]
    return df_fcst



In [None]:
#in case is need to download the forecast to a single CSV with all forecasts
def load_forecast_to_csv(df_fcst, languages, forecasts):
    pd.concat([df_fcst[forecast][language][['ds','language','channel','yhat','yhat_upper','yhat_lower']] 
                for language in languages for forecast in forecasts], axis=0).to_csv(f'365 forecast.csv')

In [None]:
if __name__ == '__main__':
    #setup evaluation variables and initial values
    languages = ['de','fr','en','es','it']
    forecasts = ['daily_live','daily_live_saturday','daily_email']
    
    #replace this by DF coming from database with date as index and parsed dates
    contacts = pd.read_csv('contacts.csv', index_col=0, parse_dates=[0])
    
    #replace this by DF coming from database with date as index and parsed dates 
    data_rate = pd.read_csv('data.csv', index_col=0, parse_dates=[0])
    
    #split into languages and get the contacts data 
    data_dict = {}
    for language in languages:
        data_dict[language] = data_rate[(data_rate['contact_language']==language) & (data_rate['label']=='data')]
    
    #perform grid search and obtain best parameters
    grid_search_results = grid_search(forecasts, languages, contacts, data_dict)
    
    #get final forecast using best parameters
    df_fcst = get_forecast(grid_search_results, forecasts, contacts, data_dict)
    
    