In [None]:
# this is the Python code for a forecast solution I built at BDT
# BDT sends several kinds of outreach letters to various outreach groups, inviting them to call the BDT call center 
# for help with submitting benefit applications.

# This code generates a time series forecast for each outreach line (a combination of state / letter # / outreach group / attempt / target benefit)
# the goal is to predict the number of calls/application submissions that the call center will see on a given week 
# for computational efficiency, some forecasting functions are handled in stored procedures in BigQuery (not included in this notebook)

# this code has been prepared for converting the notebook into a cloud function, so all the helper functions come first, and the main functions
# (fcst_main, seq_validation) are at the bottom

import pandas as pd
from google.cloud import bigquery
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import numpy as np
from scipy import stats
from datetime import datetime, time, timedelta
from pandas.tseries.holiday import USFederalHolidayCalendar
from statsmodels.tsa.exponential_smoothing.ets import ETSModel
import warnings
warnings.filterwarnings('ignore')
from dateutil.relativedelta import relativedelta, FR

bigquery_client=bigquery.Client()

In [None]:
# function initialize_forecast
# Initialize the forecast with the specified parameters.
# Args:
#   live (int, optional): Flag to indicate if the forecast will be live. Default is 0 (not live).
#   visible (int, optional): Flag to indicate if the (historic) forecast should be visible in the dashboard. Default is 0 (not visible).
#   ftype (str, optional): Type of forecast: is this a response forecast (R) or a submission forecast (S). Default is 'R' (e.g., 'R' for regular).
#   refdate (str, optional): Reference date for the forecast: i.e. let's pretend today is this date and we don't know anything that has happened since. 
#     Used for generating historical forecasts for validation. Default is an empty string, means today's date will be used.
#   params (dict, optional): parameters for the forecast. Include only those parameters where you want to overrride the default parameter value.
#                Default is an empty dictionary.
#
# Returns:
#   fcst_id (uuid): the identifier for the forecast
#   refdate (date): reference date adjusted to previous Friday
#   params (list): values for all parameters (default or overridden)

def initialize_forecast(live = 0, visible = 0, ftype = 'R', refdate = '',params = {}):
    if (refdate != ''):
        rdatecode = "DATE_TRUNC('"+refdate +"', WEEK(FRIDAY))"
    else:
        rdatecode = "DATE_TRUNC(DATE_SUB(CURRENT_DATE(), INTERVAL 4 DAY), WEEK(FRIDAY))"

    fcstframe = bigquery_client.query("SELECT GENERATE_UUID() as id, " + rdatecode + " as rdate").to_dataframe()
    fcst_id = fcstframe["id"][0]
    rdate = fcstframe["rdate"][0]
    
    if (live == 1):
        fcstupdate = bigquery_client.query("""UPDATE lab_forecast.fcst SET live = 0 WHERE live = 1""")    
        fcstupdate.result()
            
    fcstinsert = bigquery_client.query("""INSERT INTO lab_forecast.fcst (generated_at, reference_date, fcst_type, fcst_uuid, live, visible)
                                         VALUES (CURRENT_TIMESTAMP(), """ + rdatecode + """,'""" + ftype + """','""" + fcst_id+"""',
                                                 """+str(live)+""", """+str(visible)+""")""")    
    fcstinsert.result()
    
    myparams = bigquery_client.query("""SELECT parameter_uuid, parameter_name, default_value as par_value, min_value, max_value 
                                     FROM lab_forecast.fcst_parameter_list""").to_dataframe()
    for key,value in params.items():
        if (myparams.loc[myparams["parameter_name"]==key,"min_value"].values[0] > value):
            value = myparams.loc[myparams["parameter_name"]==key,"min_value"].values[0]
            print(key + " value provided is too low. Resetting to " + str(value))
        if (myparams.loc[myparams["parameter_name"]==key,"max_value"].values[0] < value):
            value = myparams.loc[myparams["parameter_name"]==key,"max_value"].values[0]
            print(key + " value provided is too high. Resetting to " + str(value))    
        myparams.loc[myparams["parameter_name"]==key,"par_value"] = value
    
    myparams = myparams.set_index("parameter_name")
    myparams = myparams[["parameter_uuid","par_value"]]
    myparams["fcst_uuid"] = fcst_id    
        
    ptable = bigquery_client.dataset("lab_forecast").table("fcst_parameters")
    rows_to_insert = myparams.to_dict(orient='records')       
    errors = bigquery_client.insert_rows(bigquery_client.get_table(ptable), rows_to_insert) 
    
    if (errors):
        print(errors)
    
    return fcst_id, rdate, myparams.to_dict().get("par_value")

In [None]:
# function create_lines: find a list of forecast lines (state/letter/outreach group/attempt/benefit) that could be contributing to 
# the call/application volume during the forecast period [refdate, refdate + forecast_horizon]
# because response often takes time, this could include any letter sent during the previous response_weeks weeks as well as letters
# that are planned for the future

# the function calls a stored procedure that precomputes certain values based on how much individual history there is for each line

# lines that have been in operation less than own_history_weeks_threshold_for_response weeks will be forecasted using a combination of own data and 
# and pooled data from similar lines

# similarly, lines where fewer than own_history_letters_threshold_for_delay letters have received a response will have the delay probability
# distribution calculated based on a combination of own and pooled data

# Args:
#  fcst_id (uuid): unique identifier for the forecast
#  params (list): list of parameter values
# Returns:
#  lines_needed (dataframe): list of lines and their properties

def create_lines(fcst_id, params):
    create_query = bigquery_client.query("""CALL lab_forecast.create_lines('""" + fcst_id + """',""" + str(round(params["response_weeks"])) + """,
                                    """ + str(round(params["forecast_horizon"])) + """,
                                    """ + str(round(params["offset_weeks"])) + """,""" + str(round(params["cutoff_weeks"])) + """,
                                    """ + str(round(params["own_history_weeks_threshold_for_response"])) + """,
                                    """ + str(round(params["own_history_letters_threshold_for_delay"])) + """)""")
    create_query.result()
    
    lines_needed = bigquery_client.query("""SELECT a.*, b.state, LEAST(b.mail_attempt,2) as attempt, b.target_benefit as benefit
                                            FROM lab_forecast.fcst_fcst_lines a,
                                                 lab_forecast.fcst_lines b
                                            WHERE a.line_uuid = b.line_uuid and fcst_uuid = '""" + fcst_id + """'
                                            ORDER BY a.line_uuid""").to_dataframe() 
    return lines_needed

In [24]:
# function plot_response_rates: generates a time series plot of adjusted historical response rates
# Args:
#  fcst_id (uuid): unique identifier for the forecast

def plot_response_rates(fcst_id):
    hrates = bigquery_client.query("""SELECT * FROM lab_forecast.fcst_hist_rates WHERE rate is not null and fcst_uuid = '""" + fcst_id + """' 
                               order by line_uuid, rate_date""").to_dataframe()
    itable = pd.pivot_table(hrates, values='rate', index=['rate_date'],
                        columns=['line_uuid'], aggfunc=np.sum)
    itable.plot()

In [25]:
# function get_delay_query: builds a SQL query for retrieving the response delay historical distribution
# Args:
#  fcst_id (uuid): unique identifier for the forecast
#  response_weeks (int): threshold for the number of response weeks to include.

def get_delay_query(fcst_id, response_weeks):
    query = """SELECT 
    TO_HEX(MD5(concat(m.state, '##', m.letter_number, '##', m.mail_outreach_group, '##',m.mail_attempt, '##',m.mail_target_benefit))) as line_id,
    m.state, LEAST(m.mail_attempt,2) as attempt, m.mail_target_benefit as benefit,
    cast(DATE_ADD(DATE_TRUNC(m.mail_scheduled_at, WEEK), INTERVAL 5 DAY) as date) as mailwk,  
    m.mailing_uuid, cast(date_sub(date_TRUNC(pc.created_at, WEEK),INTERVAL 2 DAY) as date) as callwk     
        FROM 
        lab_forecast.fcst f,
        `bdt-data-sci-prod.model_benefit_applications.f_mailing` m,
        `bdt-data-sci-prod.model_benefit_applications.f_phone_call` pc
        WHERE
        f.fcst_uuid = '"""+fcst_id+"""'
        and pc.last_touch_mailing_uuid = m.mailing_uuid
        and cast(pc.created_at as date) <= f.reference_date
        and cast(DATE_ADD(DATE_TRUNC(m.mail_scheduled_at, WEEK), INTERVAL 5 DAY) as date) >= 
                       DATE_SUB(f.reference_date, INTERVAL  """+str(response_weeks)+""" WEEK)"""

    return query    

In [26]:
# function get_delay_frequencies: computes the probability distribution of response delay
# using a combination of individual and pooled data
# Args:
#  fcst_id (uuid): unique identifier for the forecast
#  params (list): forecast parameters
#  lines_needed (dataframe): list of lines and their parameters
#  compute_agg_rates (boolean): whether the aggregated rates should be recomputed

def get_delay_frequencies(fcst_id, params, lines_needed, compute_agg_rates = True):
    cutoff_weeks = round(params["cutoff_weeks"])
    response_weeks = round(params["response_weeks"])
    own_history_letters = round(params["own_history_letters_threshold_for_delay"])
    delay_query = get_delay_query(fcst_id, response_weeks)
    delay_df = bigquery_client.query(delay_query).to_dataframe()
   
    delay_df['period'] = np.maximum(np.minimum((delay_df['callwk']-delay_df['mailwk']).dt.days/7,cutoff_weeks+1),0)
          
    result_all = pd.DataFrame()
    if (compute_agg_rates):         
        result_all = get_delay_frequency_table(delay_df, cutoff_weeks)      
       
        agg_lines = delay_df.groupby(["state","benefit","attempt"], as_index=False)["mailing_uuid"].count()
        for s in range(len(agg_lines.index)):    
            delay_df_sub = delay_df.loc[(delay_df["state"]==agg_lines["state"][s]) & (delay_df["benefit"]==agg_lines["benefit"][s]) & 
                      (delay_df["attempt"]==agg_lines["attempt"][s]),]
            result_sub = get_delay_frequency_table(delay_df_sub, cutoff_weeks)
            
            if (np.sum(result_sub["mailing_uuid"]) >= own_history_letters):
                result_sub["state"] = agg_lines["state"][s]
                result_sub["benefit"] = agg_lines["benefit"][s]
                result_sub["attempt"] = agg_lines["attempt"][s]
                result_all = result_all.append(result_sub, ignore_index = True)
                
        result_all["updated_at"] = datetime.now()
        result_all = result_all.drop(columns = ["mailing_uuid"])
        
        aggtable = bigquery_client.dataset("lab_forecast").table("fcst_default_delay_distribution")
        
        # insert in two steps
        rows_to_insert = result_all[result_all['state'].isna()].dropna(axis=1).to_dict(orient='records')       
        errors = bigquery_client.insert_rows(bigquery_client.get_table(aggtable), rows_to_insert)
        if errors != []:
            print(errors)

        rows_to_insert = result_all.dropna().to_dict(orient='records')       
        errors = bigquery_client.insert_rows(bigquery_client.get_table(aggtable), rows_to_insert)
        if errors != []:
            print(errors)

        
    else:
        result_all = bigquery_client.query("""SELECT * FROM lab_forecast.fcst_default_delay_distribution
                                               WHERE updated_at = 
                                               (select max(updated_at) from lab_forecast.fcst_default_delay_distribution)""").to_dataframe()
    
    linetable = bigquery_client.dataset("lab_forecast").table("fcst_line_delay_distribution")
    result_all_lines = pd.DataFrame()
    
    for s in range(len(lines_needed.index)):
        line_id = lines_needed["line_uuid"][s]
        weight = lines_needed["delay_weight"][s]
        delay_df_line = delay_df.loc[(delay_df["line_id"]==line_id),]
        #print(delay_df_line)
        result_line = pd.DataFrame()
        if (delay_df_line.empty == False and weight > 0):
            result_line = get_delay_frequency_table(delay_df_line, cutoff_weeks)
            #print(result_line)
            #print(line_id," has ",np.sum(result_line["mailing_uuid"])," calls.")
        #else:
        #    print(line_id," has no calls.")
        if (result_line.empty or weight < 1):
            #print(lines_needed["line_uuid"][s]," needs help.")
            myprof = find_matching_profile(lines_needed["state"][s], lines_needed["benefit"][s], lines_needed["attempt"][s], result_all)
            #print(myprof)
            if (weight > 0):
                mergeprof = pd.merge(myprof,result_line, how = 'outer',on = 'period', suffixes=('_x', '_y'))
                mergeprof["raw_value"] = (1-weight)*mergeprof["raw_value_x"].fillna(0) + weight*mergeprof["raw_value_y"].fillna(0)
                mergeprof["adj_value"] = (1-weight)*mergeprof["adj_value_x"].fillna(0) + weight*mergeprof["adj_value_y"].fillna(0)
                #print(mergeprof)
                result_line = mergeprof
            else:
                result_line = myprof 
 
        result_line = result_line.loc[:, ['period', 'raw_value', 'adj_value']]    
        result_line["line_uuid"] = line_id
        
        if (result_all_lines.empty):
            result_all_lines = result_line
        else:
            result_all_lines = result_all_lines.append(result_line, ignore_index = True)
    result_all_lines["fcst_uuid"] = fcst_id
    rows_to_insert = result_all_lines.to_dict(orient='records')       
    errors = bigquery_client.insert_rows(bigquery_client.get_table(linetable), rows_to_insert)
    if errors != []:
        print(errors)  
    
    return result_all_lines, result_all

# function find_matching_profile: find the closest match for a forecast line with insufficient individual delay data

def find_matching_profile(state, benefit, attempt, result_all):
    myprof = result_all.loc[(result_all["state"]==state) & (result_all["benefit"]==benefit) & (result_all["attempt"]==attempt),]
    if (myprof.empty):
        #print('Using overall profile')
        myprof = result_all.loc[pd.isnull(result_all["state"]),]
    #else:
    #    print('Using state-benefit-attempt profile')
    return myprof

# function get_delay_frequency_table: performs aggregation and deals with any delays longer than cutoff_weeks

def get_delay_frequency_table(delay_df, cutoff_weeks):    
    byweek=delay_df.groupby("period", as_index=False)["mailing_uuid"].count()
    byweek["raw_value"] = byweek["mailing_uuid"]/np.sum(byweek["mailing_uuid"])
    byweek["adj_value"] = byweek["raw_value"]
    
    if byweek.loc[byweek["period"]==cutoff_weeks+1].empty == False:
        byweek.loc[byweek["period"]==cutoff_weeks+1,"adj_value"] = 0
        multiplier = 1/(1 - byweek.loc[byweek["period"]==cutoff_weeks+1,"raw_value"].iloc[0])  
        byweek["adj_value"] = byweek["adj_value"]*multiplier
    return byweek


In [27]:
# function plot_delay_profiles: plot the computed adjusted delay distributions, individual and pooled

def plot_delay_profiles(dflines, dfagg):
    itable = pd.pivot_table(dflines, values='adj_value', index=['period'],
                            columns=['line_uuid'], aggfunc=np.sum)
    itable.plot()
    dfagg["id"] = dfagg["state"] + " + " + dfagg["benefit"] + " + " + dfagg["attempt"].apply(str)
    itable = pd.pivot_table(dfagg, values='adj_value', index=['period'],
                            columns=['id'], aggfunc=np.sum)
    itable.plot()

In [28]:
# function compute_hist_rates: calls a SQL procedure that computes the adjusted historical response rates

def compute_hist_rates(fcst_id, params):
    create_query = bigquery_client.query("""CALL lab_forecast.compute_hist_rates('""" + fcst_id + """',""" + str(round(params["response_weeks"])) + """,
                                    """ + str(round(params["forecast_horizon"])) + """,
                                    """ + str(round(params["offset_weeks"])) + """)""")
    create_query.result()

In [29]:
# these functions perform different smoothing operations on the historical response rates
# to come up with an initial "future rate" and corresponding confidence interval
# forecast parameters determine which smoothing function will be used

# obtain a response rate by performing exponential smoothing on historical weekly response rates
def get_smoothed_rate(resp_ts, alpha):
    smooth_rate = resp_ts["rate"][0]
    for s in range(len(resp_ts.index)-1):
        smooth_rate = resp_ts["rate"][s+1]*(alpha) + smooth_rate*(1.0- alpha)
    return smooth_rate

def get_smoothed_rate2(resp_ts, offset_weeks, forecast_horizon, drate, a):
    model = ETSModel(endog=resp_ts["rate"], error="add", initialization_method="known", initial_level=resp_ts["rate"][0], 
                     trend=None, damped_trend=False, seasonal=None, seasonal_periods=1)
    mfit = model.fit(smoothing_level=a, optimized=False)
    point_forecast = mfit.forecast(offset_weeks+forecast_horizon-1)
    ci = mfit.get_prediction(start = point_forecast.index[0],
                                end = point_forecast.index[-1])
    ci_lim = ci.pred_int(alpha = .05) #confidence interval
    preds = pd.concat([point_forecast, ci_lim], axis = 1).reset_index(drop=True)
    preds.columns = ['yhat', 'yhat_lower', 'yhat_upper']
    #print(preds)
    return preds

# obtain a response rate by averaging historical weekly response rates
def get_average_rate(resp_ts):
    return sum(resp_ts["rate"]*resp_ts["letters"])/sum(resp_ts["letters"])

# obtain a reponse rate series and CI by using the Holt-Winters method
def get_rate_with_trend_and_ci(resp_ts, offset_weeks, forecast_horizon, drate, tr = None, a = 0):
    err = "add"
    if (err == "mul"):
        resp_ts["rate"] = np.maximum(resp_ts["rate"],0.001)
        ivalue = 1
    else:
        ivalue = 0
    if (len(resp_ts.index) < 10):
        #print('initializing with ',drate)
        model = ETSModel(endog=resp_ts["rate"],  error = err, trend = tr,
                         initialization_method="known", initial_level=drate, initial_trend=ivalue, initial_seasonal=ivalue)
    else:
        model = ETSModel(endog=resp_ts["rate"],  error = err, trend = tr)
    if (tr == None):
        with model.fix_params({'smoothing_level': a}):
            mfit = model.fit()
        #mfit = model.fit_constrained({'smoothing_level': a})
    else:    
        mfit = model.fit()
    
    point_forecast = mfit.forecast(offset_weeks+forecast_horizon-1)
    ci = mfit.get_prediction(start = point_forecast.index[0],
                                end = point_forecast.index[-1])
    ci_lim = ci.pred_int(alpha = .05) #confidence interval
    preds = pd.concat([point_forecast, ci_lim], axis = 1).reset_index(drop=True)
    preds.columns = ['yhat', 'yhat_lower', 'yhat_upper']
    #print(preds)
    return preds

In [30]:
# function compute_response_rates: uses one of the smoothing functions to come up with a response rate
# pools it with an aggregated rate if there is insufficient history to generate a reliable rate

def compute_response_rates(fcst_id, params, lines_needed):
    
    rate_type = round(params["smoothing_type"])
    offset_weeks = round(params["offset_weeks"])
    forecast_horizon = round(params["forecast_horizon"])
    alpha = params["alpha"]
    
    resp_ts =  bigquery_client.query("""SELECT * FROM lab_forecast.fcst_hist_rates WHERE fcst_uuid = '""" + fcst_id + """'
                                        ORDER BY line_uuid, rate_date""").to_dataframe()
    df = pd.DataFrame(columns=['fcst_uuid', 'line_uuid', 'rate','rate_lower_bound','rate_upper_bound','rate_trend'])
    for s in range(len(lines_needed.index)):
        line = lines_needed["line_uuid"][s]
        if (lines_needed["rate_weight"][s] > 0):
            resp_sub = resp_ts.loc[(resp_ts["line_uuid"]==line),["rate_date","letters","rate"]]
            resp_sub = resp_sub.dropna().reset_index(drop=True)
            #print(resp_sub)
            if (rate_type in [0,1]):
                if rate_type == 0:
                    rate = get_average_rate(resp_sub)
                else:
                    rate = get_smoothed_rate(resp_sub, alpha)   
                rate = lines_needed["rate_weight"][s]*rate + (1.0 - lines_needed["rate_weight"][s])*lines_needed["default_rate"][s]    
                result = [fcst_id, line, rate, rate, rate, 0]
            else:
                if rate_type == 2:
                    preds = get_rate_with_trend_and_ci(resp_sub, offset_weeks, forecast_horizon, resp_sub["rate"][0],None,alpha)
                else:    
                    preds = get_rate_with_trend_and_ci(resp_sub, offset_weeks, forecast_horizon, lines_needed["default_rate"][s],"add")
                result = [fcst_id,line] + preds.loc[offset_weeks-1,].values.flatten().tolist() + [preds["yhat"][offset_weeks-1] - preds["yhat"][offset_weeks-2]]
        else:
             result = [fcst_id, line, lines_needed["default_rate"][s], 0, 2*lines_needed["default_rate"][s], 0]
        df.loc[len(df.index)] = result    
    #print(df)
    linetable = bigquery_client.dataset("lab_forecast").table("fcst_rates")
    rows_to_insert = df.to_dict(orient='records')       
    errors = bigquery_client.insert_rows(bigquery_client.get_table(linetable), rows_to_insert)
    if (errors):
        print(errors)

        
    

In [31]:
# function generate_fcst_volumes: generates the predicted number of calls from each line for each week in the forecast horizon

def generate_fcst_volumes(fcst_id, rdate, lines_needed, params):
    forecast_horizon = round(params["forecast_horizon"])
    weekdays = round(params["weekdays"])
    
    query = """SELECT f.fcst_uuid, h.line_uuid,f.day as week, 
    SUM(h.letters*d.adj_value*ifnull(h.rate, GREATEST(r.rate - r.rate_trend*DATE_DIFF(f.reference_date,h.rate_date, WEEK),0))) volume,
    SUM(h.letters*d.adj_value*ifnull(h.rate, GREATEST(r.rate_lower_bound - r.rate_trend*DATE_DIFF(f.reference_date,h.rate_date, WEEK),0))) volume_lower_bound,
    SUM(h.letters*d.adj_value*ifnull(h.rate, GREATEST(r.rate_upper_bound - r.rate_trend*DATE_DIFF(f.reference_date,h.rate_date, WEEK),0))) volume_upper_bound 
    FROM `bdt-data-sci-prod.lab_forecast.fcst_hist_rates` h, `bdt-data-sci-prod.lab_forecast.fcst_line_delay_distribution` d,
    `bdt-data-sci-prod.lab_forecast.fcst_rates` r,
    (SELECT fcst_uuid, reference_date,day
     FROM (SELECT fcst_uuid, reference_date
      FROM lab_forecast.fcst
     ) t JOIN
     UNNEST(GENERATE_DATE_ARRAY(t.reference_date, DATE_ADD(t.reference_date,INTERVAL """ + str(forecast_horizon-1) + """ WEEK), INTERVAL 1 WEEK)) day
     WHERE t.fcst_uuid = '""" + fcst_id + """') f
    WHERE r.fcst_uuid = f.fcst_uuid and h.fcst_uuid = f.fcst_uuid and h.line_uuid = r.line_uuid 
    and d.fcst_uuid = f.fcst_uuid and d.line_uuid = h.line_uuid
    and d.period = DATE_DIFF(f.day,h.rate_date, WEEK)
    group by 1,2,3"""
    
    if (weekdays == 0):
        
        iquery = bigquery_client.query("""INSERT INTO `bdt-data-sci-prod.lab_forecast.fcst_values` 
                                       (fcst_uuid, line_uuid, fcst_period, volume, volume_lower_bound, volume_upper_bound)""" + query)
        iquery.result()
        
        volumes = bigquery_client.query(query + " order by 1,2,3").to_dataframe()
        volumes["fcst_period"] = volumes["week"].astype(str)
        return volumes
   
    else:
        calls = get_actual_calls(fcst_id, -2,-1)
        daysopen = get_days_open(fcst_id, rdate, -2, forecast_horizon)
        volumes = bigquery_client.query(query + " order by 1,2,3").to_dataframe()
        
        volumes["week"] = volumes["week"].astype(str)
        mergeframe = pd.merge(lines_needed.loc[:,"line_uuid"], 
                              daysopen, how = "cross")
        mergeframe = pd.merge(mergeframe,calls, how = 'left',on = ['week','line_uuid'])
        mergeframe = pd.merge(mergeframe,volumes, how = 'left',on = ['week','line_uuid'])
        mergeframe = mergeframe.fillna(0)
        mergeframe["volume_diff"] = 0.0 
        mergeframe["fcst_uuid"] = fcst_id
        mergeframe["fcst_period"] = mergeframe["week"]
        leftover = 0.0
        for s in range(len(mergeframe)):
            weekno = mergeframe["weekno"][s]
            if (weekno == 0):
                leftover = 0.0
            if (weekno < 2):
                if (mergeframe["daysopen"][s] < 5):
                    leftover = leftover + mergeframe["numcalls"][s]*(5.0/mergeframe["daysopen"][s]-1.0)
                else:
                    leftover = 0.0
            else:
                if (mergeframe["daysopen"][s] == 5):
                    if (leftover > 0):
                        mergeframe["volume_diff"][s] = leftover
                        leftover = 0.0
                else:
                    diff = mergeframe["volume"][s]*(5.0/mergeframe["daysopen"][s]-1.0)
                    mergeframe["volume_diff"][s] = - diff
                    leftover = leftover + diff
            #print(leftover)  
        mergeframe["volume"] = mergeframe["volume"] + mergeframe["volume_diff"]
        mergeframe["volume_lower_bound"] = np.maximum(mergeframe["volume_lower_bound"] + mergeframe["volume_diff"],0)
        mergeframe["volume_upper_bound"] = mergeframe["volume_upper_bound"] + mergeframe["volume_diff"]
        #display(mergeframe)
        
        mergeframe = mergeframe.loc[mergeframe["volume"] != 0.0, ['fcst_uuid', 'line_uuid', 'fcst_period','volume','volume_lower_bound','volume_upper_bound']]
        vtable = bigquery_client.dataset("lab_forecast").table("fcst_values")
        rows_to_insert = mergeframe.to_dict(orient='records')       
        errors = bigquery_client.insert_rows(bigquery_client.get_table(vtable), rows_to_insert)
        if (errors):
            print(errors)
        return mergeframe    

        

In [32]:
# function get_actual_calls: gets the actual number of calls received each week for each line
# for the purposes of displaying historical values and validation of historical forecasts

def get_actual_calls(fcst_id, start_week, end_week):   
    calls = bigquery_client.query("""
    SELECT TO_HEX(MD5(concat(m.state, '##', m.letter_number, '##', m.mail_outreach_group, '##',m.mail_attempt, '##',m.mail_target_benefit))) as line_uuid, 
           cast(date_sub(date_TRUNC(pc.created_at, WEEK),INTERVAL 2 DAY) as date) as week, count(*) as numcalls 
    FROM 
    lab_forecast.fcst f, 
    `bdt-data-sci-prod.model_benefit_applications.f_mailing` m,
    `bdt-data-sci-prod.model_benefit_applications.f_phone_call` pc
    WHERE pc.last_touch_mailing_uuid = m.mailing_uuid
    and f.fcst_uuid = '""" + fcst_id + """' 
    and cast(date_sub(date_TRUNC(pc.created_at, WEEK),INTERVAL 2 DAY) as date) >= DATE_ADD(f.reference_date,INTERVAL """ + str(start_week) + """ WEEK)     
    and cast(date_sub(date_TRUNC(pc.created_at, WEEK),INTERVAL 2 DAY) as date) <= DATE_ADD(f.reference_date,INTERVAL """ + str(end_week) + """ WEEK)     
    GROUP BY 1,2
    ORDER BY 1,2""").to_dataframe()
    calls["week"] = calls["week"].astype(str)
    return calls
    

In [33]:
# function generate_weeks: generates a frame containing the list of weeks for the specified horizon (historical as well as forecast horizon)
# for each week, the number of days the call center was/will be open is computed (this information is used to adjust response rates)

def generate_weeks(sdate,weeks):
    startdate = np.datetime64(sdate)+np.timedelta64(2, "D")
    enddate = startdate+np.timedelta64(7*weeks, "D")
    dates = np.arange(np.datetime64(startdate), enddate, np.timedelta64(7, "D"))

    cal = USFederalHolidayCalendar()
    holidays = cal.holidays(start=startdate, end=enddate+np.timedelta64(7, "D")).to_pydatetime()
    holidays = np.append(holidays,np.datetime64('2022-11-25'))
    holidays = np.append(holidays,np.datetime64('2023-06-19'))   
    #print(holidays)
    
    df = pd.DataFrame(dates, columns = ['week'])
    df["daysopen"] = 5
    #df["weekyr"] = np.array([xi.strftime("%V") for xi in df["week"]])
    
    for h in holidays:
        for i in range(len(df.index)):
            if (df["week"][i]<=h and df["week"][i]+np.timedelta64(7, "D") > h):
                df["daysopen"][i] = df["daysopen"][i]-1
    df["week"] = df["week"] - np.timedelta64(2, "D")
    df["week"] = df["week"].astype(str)
    df["daysopen"] = df["daysopen"].astype(float)
    df["weekno"] = df.index
    return df

In [34]:
def get_days_open(fcst_id, rdate, start_week, end_week):   
    sdate = np.datetime64(rdate)+np.timedelta64(7*start_week, "D")
    weeks = end_week - start_week
    return generate_weeks(sdate,weeks)

In [35]:
# compute the symmetric mean absolute error (SMAPE) between two vectors
def smape(A, F):
    tmp = 2 * np.abs(F - A) / (np.abs(A) + np.abs(F))
    tmp.fillna(0)
    #len_ = np.count_nonzero(~np.isnan(tmp))
    #if len_ == 0 and np.nansum(tmp) == 0: # Deals with a special case
    #    return 1
    return (1 / len(A)) * np.sum(tmp)

# compute  vector of error mesures including MSE, SMAPE, difference in sums, percentage difference in sums
def compute_measures(actual, predicted):
    #pr = np.array(predicted_raw)
    #ac = np.array(actual_raw)
    #goodi = np.logical_and(~np.isnan(pr), ~np.isnan(ac))
    #predicted = pr[goodi]
    #actual = ac[goodi]
    if (len(actual)>0):
        return [mean_squared_error(actual,predicted),
                         smape(actual,predicted),
                         np.sum(predicted) - np.sum(actual),
                         (np.sum(predicted) - np.sum(actual))/np.sum(actual)] 
    else:
        return [np.nan,np.nan,np.nan,np.nan]

In [36]:
# function seq_visualize: generate a graph visualizing a sequential validation scheme

def seq_visualize(start_slice, stepsize_weeks, response_weeks, horizon_weeks, slices):
    delta = timedelta(days=7*stepsize_weeks)
    u = datetime.strptime(start_slice,"%Y-%m-%d")
    dates = []
    dates_s = []
    dates_e = []
    horizons = []
    for i in range(slices):
        dates.append(u)
        dates_s.append(u-timedelta(days=7*response_weeks))
        dend = min(u+timedelta(days=7*horizon_weeks), datetime.combine(datetime.now(), time.min) - timedelta(days=4) + relativedelta(weekday=FR(-1)))
        dates_e.append(dend)
        horizons.append(int((dend - u).days/7))
        u = u + delta
    
    # Fixing random state for reproducibility
    plt.rcdefaults()
    plt.rcParams["figure.figsize"] = (12,5)
    fig, ax = plt.subplots()
    y_pos = np.arange(slices)+1
    ax.barh(y_pos, dates_e, color = 'r')
    ax.barh(y_pos, dates, align='center')
    ax.barh(y_pos, dates_s, align='center', color = 'w')
    ax.invert_yaxis()  # labels read top-to-bottom
    ax.set_title('Sequential Validation Scheme: '+str(slices)+' '+str(horizon_weeks)+'-week slices starting from '+start_slice+' using '+str(response_weeks)+' weeks of data')
    ax.xaxis_date()
    ax.set_xlim(dates[0]-timedelta(days=7*(response_weeks+1)),datetime.now())
    plt.show()
    return [date_obj.strftime('%Y-%m-%d') for date_obj in dates], horizons


In [37]:
# function seq_validation: perform parameter tuning and sequential validation
# this function cycles through all possible combinations of parameters within the supplied parameter list
# generating a set of historical forecasts for each parameter set according to the specified validation scheme
# and computing error measures that are displayed to the user in a convenient table

def seq_validation(start_slice, stepsize_weeks, slices, paramlist, r=26, horizon_weeks=1):
    pdict_list = make_param_dict_list(paramlist, horizon_weeks)
    statscols = ['MSE','SMAPE','SUM','PSUM']
    statscombine = pd.DataFrame(columns = ['Params'] + statscols)
    ds, hs = seq_visualize(start_slice, stepsize_weeks, r, horizon_weeks, slices)
    j = 0
    for ps in pdict_list:
        j = j + 1
        stats = pd.DataFrame(columns = ['Step'] + statscols)
        for i in range(slices):
            fcst_id, rdate, params, lines, df = fcst_main(0,0,'R',ds[i], ps)
            if (hs[i]>0):
                calls = get_actual_calls(fcst_id, 0,hs[i]-1)
                daysopen = get_days_open(fcst_id, rdate, 0, hs[i])
                mergeframe = pd.merge(lines.loc[:,"line_uuid"], daysopen, how = "cross")
                mergeframe = pd.merge(mergeframe,calls, how = 'left',on = ['week','line_uuid'])
                mergeframe = pd.merge(mergeframe,df, how = 'left',left_on = ['week','line_uuid'],right_on = ['fcst_period','line_uuid'])
                mergeframe = mergeframe.fillna(0)
                mergeframe.to_csv("accuracy"+str(j)+".csv")
                err = compute_measures(mergeframe["numcalls"],mergeframe["volume"])
                stats.loc[len(stats.index)] = [ds[i]]+err                          
        display(stats)
        stats["SUM"] = stats["SUM"].abs()
        stats["PSUM"] = stats["PSUM"].abs()
        statscombine.loc[len(statscombine.index)] = [str(ps)] + list(stats.mean(axis = 0)) 
    #display(statscombine)
    return statscombine

# takes a dictionary containing lists of parameter values to try
# returns a list of dictionaries containing every possible combination of parameters

def make_param_dict_list(paramlist_dict, horizon_weeks):
    empty_dict = {"forecast_horizon":horizon_weeks}
    new_dictlist = [empty_dict]
    prev_dictlist = [empty_dict]
    for key,valuelist in paramlist_dict.items():
        #print(key, valuelist)
        new_dictlist = []
        for d in prev_dictlist:
            for v in valuelist:
                newd = d.copy()
                newd[key] = v
                new_dictlist.append(newd)
        prev_dictlist = new_dictlist.copy()
        #print(prev_dictlist)
    return new_dictlist    



In [38]:
# function fcst_main: generate the forecast 
# Args:
#   live (int, optional): Flag to indicate if the forecast will be live. Default is 0 (not live).
#   visible (int, optional): Flag to indicate if the (historic) forecast should be visible in the dashboard. Default is 0 (not visible).
#   ftype (str, optional): Type of forecast: is this a response forecast (R) or a submission forecast (S). Default is 'R' (e.g., 'R' for regular).
#   refdate (str, optional): Reference date for the forecast: i.e. let's pretend today is this date and we don't know anything that has happened since. 
#     Used for generating historical forecasts for validation. Default is an empty string, means today's date will be used.
#   params (dict, optional): parameters for the forecast. Include only those parameters where you want to overrride the default parameter value.
#                Default is an empty dictionary.
#
# Returns:
#   fcst_id (uuid): unique identifier for the forecast
#   rdate (date): reference date adjusted to previous Friday
#   params (list): values for all parameters (default or overridden)
#   lines (dataframe): list of forecast lines with their properties
#   df (dataframe): actual and predicted call volume time series 

def fcst_main(live = 0, visible = 0, ftype = 'R', refdate = '',ps = {}):
    fcst_id, rdate, params = initialize_forecast(live, visible, ftype, refdate, ps)
    lines = create_lines(fcst_id, params)
    dflines,dfagg = get_delay_frequencies(fcst_id, params, lines)
    compute_hist_rates(fcst_id, params)
    compute_response_rates(fcst_id, params, lines)
    df = generate_fcst_volumes(fcst_id, rdate, lines, params)
    return fcst_id, rdate, params, lines, df

In [39]:
# THIS IS THE "MAIN" FUNCTION #
# THIS IS WHAT NEEDS TO RUN EVERY TUESDAY #

fcst_id, rdate, params, lines, df = fcst_main(0,0) # FOR TESTING, USE (0,0) IN PRODUCTION, USE (1,1)

# THIS IS THE "MAIN" FUNCTION #