# Multiple Regression Python Code Package

In [None]:
#Purpose: The code below is the proposed model for the dependent variables based on the final candidate variables from 
# stage 3 - Candidate models.  The code will generate the full model diagnostic tests, backtesting results and graphing 
# the predicted with actuals in an excel output workbook.  This code comes up with the strongest candidate models 

In [2]:
#Set Up Libraries
import pandas as pd
import statsmodels.api as sm
import statsmodels.stats.api as sms
import numpy as np
import statsmodels.api as linear_model
#from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.outliers_influence import variance_inflation_factor

import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import scipy.stats as stats
from statsmodels.tsa.stattools import adfuller, kpss
import matplotlib.dates as mdates
from statsmodels.stats.stattools import durbin_watson
import seaborn as sns
import xlsxwriter
from sklearn.metrics import mean_squared_error
from math import sqrt
from numpy import mean

In [3]:
#import history dataset#
Hist_Data =pd.read_csv('Agency_MBS_Data.csv')
Hist_Data

Unnamed: 0,month,Unemployment rate,3-month Treasury rate,5-year Treasury yield,10-year Treasury yield,House Price Index (Level),30Y Current Coupon UMBS/FNMA Yield,30Y10Y_MBSOAS_TY
0,7/1/2009,9.3,0.2,2.3,3.7,139.2,4.4,0.0
1,10/1/2009,9.6,0.2,2.5,3.8,139.7,4.3,0.0
2,1/1/2010,9.9,0.1,2.3,3.7,140.2,4.4,0.0
3,4/1/2010,9.8,0.1,2.4,3.9,140.2,4.4,0.0
4,7/1/2010,9.6,0.1,2.3,3.6,139.3,3.5,0.0
5,10/1/2010,9.5,0.2,1.6,2.9,136.7,3.4,0.0
6,1/1/2011,9.5,0.1,1.5,3.0,135.4,4.2,18.0813
7,4/1/2011,9.0,0.1,2.1,3.5,134.1,4.1,21.4788
8,7/1/2011,9.1,0.0,1.8,3.3,133.7,3.8,32.6153
9,10/1/2011,9.0,0.0,1.1,2.5,134.3,3.3,37.5719


In [4]:
#Scenario 
dt_base =pd.read_csv('Agency_MBS_Data_Base.csv')
dt_stress =pd.read_csv('Agency_MBS_Data_Stress.csv')

dt_base

Unnamed: 0,month,Unemployment rate,3-month Treasury rate,5-year Treasury yield,10-year Treasury yield,House Price Index (Level),30Y10Y_MBSOAS_TY
0,1/1/2020,3.6,1.6,1.6,1.8,215.2,10.1294
1,4/1/2020,3.8,1.1,1.2,1.4,217.8,28.0109
2,7/1/2020,13.0,0.1,0.4,0.7,219.9,-25.4451
3,10/1/2020,8.8,0.1,0.3,0.6,226.9,13.2766
4,1/1/2021,6.8,0.1,0.4,0.9,235.3,-1.7474
5,4/1/2021,6.2,0.1,0.6,1.4,243.0,-33.2744
6,7/1/2021,5.9,0.0,0.8,1.6,254.6,-15.0169
7,10/1/2021,5.1,0.0,0.8,1.4,266.3,-20.6574
8,1/1/2022,4.2,0.1,1.2,1.6,277.3,13.285
9,4/1/2022,3.8,0.3,1.9,2.0,290.3,34.0362


##Set up Functions##

In [5]:
 # transform Raw Data 
    # indep = list of variable IDs 
    # indep_trans = the transformation used for each variable 
    # model_dat = data containing the history 
    # hist_start = start date of the history to use
    # hist_end = last date of history to include in the transformation yyyy-mm
    
def fcst_data_prep(indep, indep_trans, model_dat, hist_start, hist_end):
    
    # independent variable transformation

    base_drivers =model_dat['month']
    base_drivers =pd.DataFrame(base_drivers)
    
    for k in range(len(indep)):
        x = indep[k]
        
        exog_trans = model_dat[['month',x]] 
        
        if len(base_drivers)>0:
        
            if 'QoQ' in indep_trans:          
                exog_trans[x+'_QoQ'] =  exog_trans[x]/exog_trans[x].shift()-1            
                
            if 'YoY' in indep_trans:         
                exog_trans[x+'_YoY'] = exog_trans[x]/exog_trans[x].shift(4)-1
                               
            if 'diff'in indep_trans:
                exog_trans[x+'_diff'] = exog_trans[x]-exog_trans[x].shift()                
               
            if  'log' in indep_trans:
                exog_trans[x+'_log'] = np.log(np.array(exog_trans[x], dtype = float))
                
            if  'log_diff' in indep_trans:
                exog_trans[x+'_logDiff'] = np.log(np.array(exog_trans[x]/exog_trans[x].shift()), dtype = float)
            
            base_drivers = pd.merge(base_drivers, exog_trans, how ='left',on='month')
    
        else: print(x + "is missing history") 
           # base_drivers
        
        Date_Index = base_drivers[(base_drivers['month'] >= hist_start) & (base_drivers['month'] <= hist_end) ].index
        base_drivers.drop(Date_Index, inplace=True)
  
    return base_drivers

In [6]:
# Model Fitting Options & Diagnostics #
# There are three options to model the data :  OLS with Constant,  OLS with HAC Errors,  & ARIMA 

def fit(hist_data, type, independent_var, dependent_var, order,backtest=None):
    
    #DataFrame for ADF & KPSS#
    St = {'Variable_ID':[], 'ADF Statistic:':[], 'p-value:':[], 'Critical Values:':[],'Result':[] }
    Stationary_ADF1 = pd.DataFrame(data=St) 

    St1 = {'Variable_ID':[], 'KPSS Statistic:':[], 'p-value:':[], 'Critical Values:':[],'Result':[] }
    Stationary_KPSS1 = pd.DataFrame(data=St1)
    
    if type == 'OLS':
        Y = hist_data[dependent_var]
        X = hist_data[independent_var]
        X = sm.add_constant(X) 
        #VIF testing
        VIF = pd.Series([variance_inflation_factor(X.values, i) for i in range(X.shape[1])], index=X.columns )
        #Stationarity Testing 
        Ind_var = independent_var
        for r in range(len(Ind_var)):
            print(r)
            var = Ind_var[r]
            x = hist_data[var] 
            print(var)
            Stationary_ADF = test_stationary(var,x,'ADF', backtest) 
            Stationary_ADF1 = pd.concat([Stationary_ADF,Stationary_ADF1], ignore_index=True) 
            Stationary_KPSS = test_stationary(var,x,'KPSS', backtest) 
            Stationary_KPSS1 = pd.concat([Stationary_KPSS,Stationary_KPSS1], ignore_index=True) 
            
        model = sm.OLS(Y, X)
        results = model.fit()
        
    elif type == 'OLS_HAC':
        Y = hist_data[dependent_var]
        X = hist_data[independent_var]
        X = sm.add_constant(X)
        #VIF testing
        VIF = pd.Series([variance_inflation_factor(X.values, i) for i in range(X.shape[1])], index=X.columns ) 
        #Stationarity Testing 
        Ind_var = independent_var
        for r in range(len(Ind_var)):
            var = Ind_var[r]
            x = hist_data[var] 
            Stationary_ADF = test_stationary(var,x,'ADF', backtest) 
            Stationary_ADF1 = pd.concat([Stationary_ADF,Stationary_ADF1], ignore_index=True) 
            Stationary_KPSS = test_stationary(var,x,'KPSS', backtest) 
            Stationary_KPSS1 = pd.concat([Stationary_KPSS,Stationary_KPSS1], ignore_index=True) 
            
        model = sm.OLS(Y, X)
        results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 10})
        
    elif type == 'ARIMA':
        endog = hist_data[dependent_var]
        exog = hist_data[independent_var]
        X = sm.add_constant(exog)
        #VIF testing
        VIF = pd.Series([variance_inflation_factor(X.values, i) for i in range(X.shape[1])], index=X.columns )
        #Stationarity Testing 
        Ind_var = independent_var
        for r in range(len(Ind_var)):
            var = Ind_var[r]
            x = hist_data[var] 
            Stationary_ADF = test_stationary(var,x,'ADF', backtest) 
            Stationary_ADF1 = pd.concat([Stationary_ADF,Stationary_ADF1], ignore_index=True) 
            Stationary_KPSS = test_stationary(var,x,'KPSS', backtest) 
            Stationary_KPSS1 = pd.concat([Stationary_KPSS,Stationary_KPSS1], ignore_index=True)
            
        model = sm.tsa.arima.ARIMA(endog, exog, order=order)
        results = model.fit()
   # else:
      #  pass
    return results, VIF, Stationary_ADF1, Stationary_KPSS1

#Once a Model Summary is generated,  the additional insample diagnostic testing can be pulled out for analysis#
# Output is defined below: 
# R2, MAE, RMSE : Model Goodness of Fit 
# Durbin Watsons: For Autocorrelation
# Breuschpagan_P & GoldFeld_Quandt: For Heteroscatisticy 
def mape(actual, pred): 
    actual, pred = np.array(actual), np.array(pred)
    return np.mean(np.abs((actual - pred) / actual)) * 100

def model_diagnostic(results, type):
    model_diagnostic = pd.DataFrame(columns = ['r2', 'mae', 'rmse', 'dw', 'Goldfeld_Quandt_p', 'breuschpagan_p'])
    if type in ['OLS', 'OLS_HAC']:
        metrics_map = {
            'r2': results.rsquared,
            'mae': sum(abs(results.resid))/len(results.resid),
            'rmse': np.sqrt(results.mse_resid),
            'dw': durbin_watson(results.resid),
            'Goldfeld_Quandt_p': sms.het_goldfeldquandt(results.resid, results.model.exog)[1],
            'breuschpagan_p': sms.het_breuschpagan(results.resid, results.model.exog)[1]
        }
    elif type in ['ARIMA']:
        metrics_map = {
            'r2': 1 - results.sse / sum((results.data.endog - np.mean(results.data.endog)) ** 2),
            'mae': results.mae,
            'rmse': np.sqrt(results.mse),
            'dw': durbin_watson(results.resid),
            'Goldfeld_Quandt_p': sms.het_goldfeldquandt(results.resid, results.model.exog)[1],
            'breuschpagan_p': sms.het_breuschpagan(results.resid, results.model.exog)[1]
        }
    else:
        pass
    return model_diagnostic.append(metrics_map, ignore_index=True)


def model_analysis(results, type):
    if type in ['OLS','OLS_HAC']:
        analysis_df = results.summary()
    elif type in ['ARIMA']:
        analysis_df = results.summary()

    else:
        pass
    return analysis_df

In [7]:
##Stationary Tests of Data
    # var_id - the variable ID being used in the testing 
    # test_series - the data series to use to test. It only accept a single data series without any NA values 
    # test_type - Defining what kind of test to use for stationarity testing. 
    
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import kpss

def kpss_test(var_id,series, **kw):    
    statistic, p_value, n_lags, critical_values = kpss(series, **kw)
    # Format Output
    for key, value in critical_values.items():
        #print(f'   {key} : {value}')
        print(f'{var_id} KPSS Result: The series is {"not " if p_value <.05 else ""}stationary')
        if p_value <.05: result='not stationary' 
        else: result='stationary' 
    
    St1 = {'Variable_ID':[var_id], 'KPSS Statistic:':[statistic], 'p-value:':[p_value], 'Critical Values:':[critical_values],'Result':[result] }
    test_summary = pd.DataFrame(data=St1)
    
    return test_summary

def test_stationary(var_id, test_series, test_type, backtest):

    if backtest==None:    
    # ADF test
        if test_type == 'ADF':
            ADF_result = adfuller(test_series, autolag='AIC')
            for key, value in ADF_result[4].items():   
                print(f'{var_id} ADF Result: The series is {"not " if ADF_result[1] > 0.05 else ""}stationary')
                if ADF_result[1] > 0.05: result_1='not stationary' 
                else: result_1='stationary' 
    
            St = {'Variable_ID':[var_id], 'ADF Statistic:':[ADF_result[0]], 'p-value:':[ADF_result[1]], 'Critical Values:':[ADF_result[4]],'Result':[result_1] }
            test_summary = pd.DataFrame(data=St)
    
    # KPSS test
    
        if test_type == 'KPSS':  
            test_summary = kpss_test(var_id,test_series) 
    else:
        test_summary= pd.DataFrame(data=["backtesting"])
    
    return test_summary

In [8]:
#Model Plotting Features#

#Plot the Autocorrelation and Residiuals 
#results: The Fitted Model results 
#dependent_var: The dependent variable in the Fitted Model 

def residual_plot(results, dependent_var, pdf):
    # residual acf, pacf, qq-plot
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(10, 4), dpi=80)
    plot_acf(results.resid, ax=ax1, lags=9)
    plot_pacf(results.resid, ax=ax2, lags=9)
    sm.qqplot(results.resid, stats.norm, fit=True, line="45", ax=ax3)
    ax3.set_title('Q-Q plot')
    fig.suptitle(dependent_var)
    plt.savefig(f"Residual_{pdf}")
    plt.close(fig)
    plt.cla()
    plt.clf()


def forecast(results, hist_data, type):
    if type in ['OLS', 'OLS_HAC']:
        hist_data = sm.add_constant(hist_data)
        ypred = results.predict(hist_data)
    elif type in ["ARIMA"]:
        ypred1 = results.predict(exog=hist_data[:-9]).to_list()
        ypred2 = results.forecast(exog=hist_data.tail(9), steps=9).to_list()
        ypred =ypred1+ypred2
        #ypred =ypred[0].squeeze(axis = 0)
        print(ypred)
    else:
        pass
    return ypred

def forecast_scenario(results, hist_data, type):
    if type in ['OLS', 'OLS_HAC']:
        hist_data = sm.add_constant(hist_data)
        ypred = results.predict(hist_data)
    elif type in ["ARIMA"]:
        ypred = results.forecast(exog=hist_data, steps=17).to_list()
    else:
        pass
    return ypred

######################Backtesting plot with Out of Time /Out of Sample############################################ 

#hist_data : Full data with the neccessary transformation 
#type: the type of model being used - OLS, OLS_HAC, ARMIA
#independent_var:  the list of transformed independent variables needed for the model 
#dependent_var: The transformed dependent variable.  
#dependent_var_level: the level depdendent variable. 
#testingQ: this number of quarters to be removed for testing Out of Sample 
#tran_type: The transformation type of the dependent variable - diff, YoY 
#order: if the model type is ARMIA, set the residual order (1,0,0)
#VM:  If the model is forecasting Velocity of Money, the VM = True to do the additional transformation needed 
#NGDP: if the VM=True, the NGDP field needs to be set with the NGDP to be used in the transformation 
#Money_Supply:  If the VM = True, the Money_Supply also needs to be set with the money supply.  
#This is used for plotting actuals vs predicted

def backtesting_plot(hist_data, type, independent_var, dependent_var_level, dependent_var, pdf, testingQ, tran_type, order , 
                     Special_transformation=False , NGDP=False ,Money_Supply =False):

    # 9 quarter backtesting
    hist_data = hist_data.dropna()
    insample_hist = hist_data[:-testingQ]
    
    #call the fit function to come up with the insample estimates#
    results, VIF, Stationary_ADF1 , Stationary_KPSS1 = fit(insample_hist, type, independent_var, dependent_var, order, backtest=True)
    
    insample_analysis_df = model_analysis(results, type)
    insample_diagnostic_df = model_diagnostic(results, type)
        
    hist_data["month"] = pd.to_datetime(hist_data['month'].astype('str'))   
    forecast_hist = forecast(results, hist_data[independent_var], type)

    cutoff_date = hist_data['month'].iloc[-testingQ]
    cutoff_date2 = hist_data['month'].iloc[-(testingQ+2)]
    hist_data['prediction']=forecast_hist
    forecast_hist =pd.DataFrame(forecast_hist)
    hist_data2 =hist_data
    
    hist_data_OutofSample = hist_data[hist_data['month'] > cutoff_date2]   
    backtest_untransformed_prediction = hist_data[['month','prediction', dependent_var]]
    
    # transformback For Any Special Variable. THis needs to be updated to allow for any unique transformation.  It is now setup 
    # for Money SUpply.  But it can be updated to reflect any other variable transformation by adjusting the below code below#
    
    if Special_transformation == True: 
        
        hist_data_OutofSample['level_prediction_full'] = untransformation(hist_data_OutofSample['prediction'],hist_data_OutofSample,dependent_var_level,tran_type)
        hist_data2['level_prediction'] = untransformation_onestep(forecast_hist,hist_data2,dependent_var_level,tran_type)
              
        hist_data_OutofSample['prediction_MS'] = hist_data_OutofSample[NGDP]/hist_data_OutofSample['level_prediction_full'] 
        hist_data2['prediction_MS'] = hist_data2[NGDP]/hist_data2['level_prediction'] 
        
        MS_RMSE = sqrt(mean_squared_error(hist_data_OutofSample[Money_Supply], hist_data_OutofSample['prediction_MS']))
        Y_Actual_Average = mean(hist_data_OutofSample[Money_Supply]) 
        MS_RMSE_N = (MS_RMSE/Y_Actual_Average) * 100
       
        
        MS_MAPE = mape(hist_data_OutofSample[Money_Supply], hist_data_OutofSample['prediction_MS'])
        
        backtest_transformed_prediction = hist_data_OutofSample[['month','level_prediction_full','prediction_MS',NGDP,Money_Supply]]
        backtest_transformed_prediction['MS_RMSE']=MS_RMSE
        backtest_transformed_prediction['MS_MAPE']=MS_MAPE
        
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot('month', Money_Supply, data=hist_data, label='Actual', color='black')
        ax.plot('month', 'prediction_MS', data=hist_data[:-testingQ], label='In-sample Fitted', color='red')
        ax.plot('month', 'prediction_MS', data=hist_data_OutofSample, label='Out-sample Fitted', color='blue', linestyle='dashed')
        plt.axvline(x=pd.to_datetime(cutoff_date), color='lightgray')
        ax.set_title('Fitted vs Actual')
        ax.set_ylabel(Money_Supply)
        ax.set_xlabel('month')
        ax.legend()
        plt.savefig(f"MS_Backtesting_{pdf}")
        plt.close(fig)
        plt.cla()
        plt.clf()
    
    # No Special Transformation.  THis is a plain vanilla regression backtesting # 
    
    else:       
        
        hist_data_OutofSample['level_prediction_full'] = untransformation(hist_data_OutofSample['prediction'],hist_data_OutofSample,dependent_var_level,tran_type)
        hist_data2['level_prediction'] = untransformation_onestep(forecast_hist,hist_data2,dependent_var_level,tran_type)
              
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot('month', dependent_var, data=hist_data, label='Actual_Untransformed', color='black')
        ax.plot('month', 'prediction', data=hist_data[:-testingQ], label='In-sample Fitted Untransformed', color='red')
        ax.plot('month', 'prediction', data=hist_data_OutofSample, label='Out-sample Fitted Untransformed', color='blue', linestyle='dashed')
        plt.axvline(x=pd.to_datetime(cutoff_date), color='lightgray')
        ax.set_title('Predicted Fitted vs Actual')
        ax.set_ylabel(dependent_var)
        ax.set_xlabel('month')
        ax.legend()
        plt.savefig(f"Unstransformed_Backtesting_{pdf}")
        plt.close(fig)
        plt.cla()
        plt.clf()
        
        Actuals = hist_data_OutofSample[dependent_var_level]
        Predicted = hist_data_OutofSample['level_prediction_full']
                
        MS_MAPE = mape(Actuals[-9:],Predicted[-9:])       
        MS_RMSE = sqrt(mean_squared_error(Actuals[-9:],Predicted[-9:])) 
        Y_Actual_Average = mean(Actuals[-9:])         
        MS_RMSE_N = (MS_RMSE/Y_Actual_Average) * 100
        
        backtest_transformed_prediction = hist_data_OutofSample[['month',dependent_var_level,'level_prediction_full']]
        backtest_transformed_prediction['MS_RMSE_N']=MS_RMSE_N
        backtest_transformed_prediction['MS_MAPE']=MS_MAPE
        
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot('month', dependent_var_level, data=hist_data, label='Actual_Transformed', color='black')
        ax.plot('month', 'level_prediction', data=hist_data2[:-testingQ], label='In-sample Fitted Transformed', color='red')
        ax.plot('month', 'level_prediction_full', data=hist_data_OutofSample, label='Out-sample Fitted Transformed', color='blue', linestyle='dashed')
        plt.axvline(x=pd.to_datetime(cutoff_date), color='lightgray')
        ax.set_title('Level Fitted vs Actual')
        ax.set_ylabel(dependent_var_level)
        ax.set_xlabel('month')
        ax.legend()
        plt.savefig(f"Transformed_Backtesting_{pdf}")
        plt.close(fig)
        plt.cla()
        plt.clf()
        
    return insample_analysis_df, insample_diagnostic_df, backtest_untransformed_prediction, backtest_transformed_prediction


#################################Transfromation of Dependent Variables ##################################################

#Untransformation_Based on the prior PRedicted Level 

#sce_forecast - the data of the predicted values. this is the value that is the transformed form  
#model_dat :  the data set of the original form.  The function will pull the latest actual T0 and transforms the predicted back to the level 
#depvar: The variable name of the level form in the model_dat dataframe. 
#type:  The form of the transformation to convert the predicted back to the level 

def untransformation(sce_forecast,model_dat, depvar, type):
    
    if type == 'YoY':
        sce_forecast_trans = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]
            if i == 1:
                tn = model_dat[depvar].iloc[1]
            if i == 2:
                tn = model_dat[depvar].iloc[2]
            if i == 3:
                tn = model_dat[depvar].iloc[3]
            if i > 3:
                tn = sce_forecast_trans[i-4]*(sce_forecast.iloc[i]+1)
            sce_forecast_trans = np.append(sce_forecast_trans, tn)
    
    # transform QoQ forecast back to level
    if type == 'QoQ':
        sce_forecast_trans = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]
               
            if i > 0:
                tn = model_dat[depvar].iloc[i-1]*(sce_forecast.iloc[i]+1)
                
            sce_forecast_trans = np.append(sce_forecast_trans, tn)
    
    # transform diff forecast back to level
    if type == 'diff':
        sce_forecast_trans = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]
            if i > 0:
                tn = sce_forecast_trans[i-1] + sce_forecast.iloc[i]
            sce_forecast_trans = np.append(sce_forecast_trans, tn)
            
    # transform diff forecast back to level
    if type == 'logDiff':
        sce_forecast_trans = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]
            if i > 0:
                tn = np.exp(np.log(sce_forecast_trans[i-1]) + sce_forecast.iloc[i])
            sce_forecast_trans = np.append(sce_forecast_trans, tn)            
            
    # level
    if type == 'level':
        sce_forecast_trans = np.array(sce_forecast)
     
    # log transformation back to level
    if type == 'log':
        sce_forecast_trans = np.exp(sce_forecast)
        
    return sce_forecast_trans

####Untransformation_Based on one-step ahead forecast###### 

def untransformation_onestep(sce_forecast,model_dat, depvar, type):

    
    if type == 'YoY':
        sce_forecast_trans_one_step = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tx = model_dat[depvar].iloc[0]
                tn=tx
            if i == 1:
                tx = model_dat[depvar].iloc[1]
                tn=tx
            if i == 2:
                tn = model_dat[depvar].iloc[2]
                tn=tx
            if i == 3:
                tx = model_dat[depvar].iloc[3]
                tn=tx
            if i > 3:
                tx = model_dat[depvar].iloc[i-4]
                tn = tx*(sce_forecast.iloc[i]+1)
            
            sce_forecast_trans_one_step = np.append(sce_forecast_trans_one_step, tn)
    
    # transform QoQ forecast back to level
    if type == 'QoQ':
        sce_forecast_trans_one_step = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0] 
            if i > 0:
                tn = model_dat[depvar].iloc[i-1]*(sce_forecast.iloc[i]+1)
                    
            sce_forecast_trans_one_step = np.append(sce_forecast_trans_one_step, tn)
           
    
    # transform diff forecast back to level
    if type == 'diff':
        sce_forecast_trans_one_step = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]          
            if i > 0:               
                tn = model_dat[depvar].iloc[i-1]+sce_forecast.iloc[i]
                
            sce_forecast_trans_one_step = np.append(sce_forecast_trans_one_step, tn)
            
    # transform diff forecast back to level
    if type == 'logDiff':
        sce_forecast_trans_one_step = []
        for i in range(len(sce_forecast)):
            if i == 0:
                tn = model_dat[depvar].iloc[0]
            if i > 0:
                tn = np.exp(np.log(model_dat[depvar].iloc[i-1])+sce_forecast.iloc[i])
            
            sce_forecast_trans_one_step = np.append(sce_forecast_trans_one_step, tn)            
            
    # level
    if type == 'level':
        sce_forecast_trans_one_step = np.array(sce_forecast)
     
    # log transformation back to level
    if type == 'log':
        sce_forecast_trans_one_step = np.exp(sce_forecast)
        
    return sce_forecast_trans_one_step


In [9]:
#EXCEL OUTPUT FUNCTION -  The workbook name is set in the function 

#out:  Full Model Fit Summary -model_Fit.summary() 
#Model_DA: Model Diagnostics -model_diagnostic(Model_Fit,'OLS')
#VIF:  Vif output from the fit() from the full model 
#Insample_parm : The backtesting parmeters from the backtesting() function 
#Country_Code: The country code used for the tab and images naming 

def excel_output(out,Model_DA,VIF,Insample_parm,Stationary_ADF1,Stationary_KPSS1,Full_Estimation_Data,
                 bt_Estimation_Data,Country_Code,bt_Forecast,bt_prediction,scenario_base_data, 
                 scenario_stress_data):
 #Full Sample Estimates# 
    table1 = pd.DataFrame(out.tables[0])
    table2 = pd.DataFrame(out.tables[1])
    table3 = pd.DataFrame(out.tables[2])
    table4 = pd.DataFrame(VIF)
    table4.columns = ['VIF']
    
    Full_Sample = table1.append(table2).append(table3)
    
    Data_Summary = Full_Estimation_Data.describe()
    Data_Summary_StartDate= Full_Estimation_Data.head(1)
    Data_Summary_EndDate= Full_Estimation_Data.tail(1)
    
    Data_Summary2 = bt_Estimation_Data.describe()
    Data_Summary2_StartDate= bt_Estimation_Data.head(1)
    Data_Summary2_EndDate= bt_Estimation_Data.tail(1)
    
#Backtesting Estimates# 
    table5 = pd.DataFrame(Insample_parm.tables[0])
    table6 = pd.DataFrame(Insample_parm.tables[1])
    table7 = pd.DataFrame(Insample_parm.tables[2])

#Output all Results to Excel Worksheet#
    writer = pd.ExcelWriter(f'{Country_Code}_Debt_Model.xlsx', engine='xlsxwriter')
    text1 = pd.DataFrame({'Full Sample Parameters':['Full Sample Parameters']})
    text1.to_excel(writer, sheet_name=f'{Country_Code}', startrow=1)
    Full_Sample.to_excel(writer, sheet_name=f'{Country_Code}', startrow=2)
    Model_DA.to_excel(writer, sheet_name=f'{Country_Code}', startrow=21)
    table4.to_excel(writer, sheet_name=f'{Country_Code}', startrow=24,startcol =2)
    Stationary_ADF1.to_excel(writer, sheet_name=f'{Country_Code}', startrow=30,startcol =2)
    Stationary_KPSS1.to_excel(writer, sheet_name=f'{Country_Code}', startrow=30,startcol =10)
    Data_Summary.to_excel(writer, sheet_name=f'{Country_Code}', startrow=1,startcol =20)
    Data_Summary_StartDate.to_excel(writer, sheet_name=f'{Country_Code}', startrow=11,startcol =20)
    Data_Summary_EndDate.to_excel(writer, sheet_name=f'{Country_Code}', startrow=13,startcol =20)
    
    Data_Summary2.to_excel(writer, sheet_name=f'{Country_Code}', startrow=1,startcol =28)
    Data_Summary2_StartDate.to_excel(writer, sheet_name=f'{Country_Code}', startrow=11,startcol =28)
    Data_Summary2_EndDate.to_excel(writer, sheet_name=f'{Country_Code}', startrow=13,startcol =28)
    
    text2 = pd.DataFrame({'Backtesting Sample Parameters':['Backtesting Sample Parameters']})
    text2.to_excel(writer, sheet_name=f'{Country_Code}', startrow=1, startcol =11)
    table5.to_excel(writer, sheet_name=f'{Country_Code}', startrow=2,startcol =11)
    table6.to_excel(writer, sheet_name=f'{Country_Code}', startrow=12,startcol =11)
    table7.to_excel(writer, sheet_name=f'{Country_Code}', startrow=18,startcol =11)
    Insample_perform.to_excel(writer, sheet_name=f'{Country_Code}', startrow=21,startcol =11 )
    
    bt_Forecast.to_excel(writer, sheet_name=f'{Country_Code}_BT_Data', startrow=1, startcol =1)
    bt_prediction.to_excel(writer, sheet_name=f'{Country_Code}_BT_Data', startrow=1, startcol =8)

    scenario_base_data.to_excel(writer, sheet_name=f'{Country_Code}_Base_frc', startrow=1, startcol =1)
    scenario_stress_data.to_excel(writer, sheet_name=f'{Country_Code}_Stress_frc', startrow=1, startcol =1)
    
    
    worksheet = writer.sheets[f'{Country_Code}']
    worksheet.insert_image('A38',f"Unstransformed_Backtesting_{pdf}")
    worksheet.insert_image('Q35',f"Backtesting_{pdf}")
    worksheet.insert_image('A64',f"Transformed_Backtesting_{pdf}")
    worksheet.insert_image('A90',f"Residual_{pdf}")
    worksheet.insert_image('Q59',f"ScenarioForecast_{pdf}")

    writer.save()
    writer.close()
   

In [10]:
##Generate Scenario Forecasts by taking the scenario output defined in the first couple of code
#results:  The fitted model output.  This should be the full development model fit
#scenario_base_data :  The base forecast that has the transformation as required by the model. The data needs the independent variable and the untransformed
#scenario_stress_data: The stress forecast that has the transformation as required by the model. The data needs the independent variable and the untransformed
#independent_var: The list of independent variables in the model.  This is the list of the transformation independent variable. 
#dependent_var: The dependent variable - Untransformed dependent
#pdf: The name of the graph of the scenario plots
#type: the model fit type: OLS, OLS_HAC, ARIMA
#tran_type:  The transformation type of the dependent variable. If the variable doesn't need transformation set it ='Level' 

def scenario_output(results, scenario_base_data, scenario_stress_data ,independent_var, dependent_var,pdf, type,  tran_type):
    
    #Generate the predicted scenario output.  This is the output of the transformation 
    base_forecast = forecast_scenario(results, scenario_base_data[independent_var], type)
    base_forecast=pd.DataFrame(base_forecast)
    
    scenario_base_data['ypred']=base_forecast 
    base_forecast=pd.DataFrame(base_forecast)
    
    stress_forecast = forecast_scenario(results, scenario_stress_data[independent_var], type)
    stress_forecast=pd.DataFrame(stress_forecast)
    
    scenario_base_data["month"] = pd.to_datetime(scenario_base_data['month'].astype('str'))  
    scenario_stress_data["month"] = pd.to_datetime(scenario_stress_data['month'].astype('str'))  
    
    scenario_base_data['prediction'] = untransformation(base_forecast,scenario_base_data,dependent_var,tran_type)
    scenario_stress_data['prediction'] = untransformation(stress_forecast,scenario_stress_data,dependent_var,tran_type)
        
    fig, ax = plt.subplots(figsize=(10, 6))
    #ax.plot('month', 'ypred', data=scenario_base_data, label='Predicted_Base', color='black', linestyle='dashed')
   #ax.plot('month', 'ypred', data=scenario_stress_data, label='Predicted_Stress', color='yellow', linestyle='dashed')
    ax.plot('month', 'prediction', data=scenario_base_data, label='Predicted_Base_Level', color='blue', linestyle='dashed')
    ax.plot('month', 'prediction', data=scenario_stress_data, label='Predicted_Stress_Level', color='red', linestyle='dashed')
    ax.set_title('Scenario Forecasts')
    ax.set_ylabel(dependent_var)
    ax.set_xlabel('month')
    ax.legend()
    plt.savefig(f"ScenarioForecast_{pdf}")
    plt.close(fig)
    plt.cla()
    plt.clf()
           
    return scenario_base_data , scenario_stress_data

In [11]:
#Stepwise Selection 
def backward_regression(X, y):
    cols=list(X.columns)
    cols.remove(y)
    
    #Backward Elimination
    pmax = 1
    while (len(cols)>0):
        p= []
        X_1= X[cols]
        y_value = X[y]
        
        X_1 = sm.add_constant(X_1)

        model = sm.OLS(y_value,X_1).fit()

        p = pd.Series(model.pvalues.values[1:],index = cols)
        pmax = max(p)
        feature_with_p_max = p.idxmax()
        
        if(pmax>0.05):
            cols.remove(feature_with_p_max)
            print(f'{feature_with_p_max} is insignificant and dropped from the backward selection: p-value {pmax}')
        else:
            break
    return model

#Set up intitial Candidate Variables#

In [12]:
# Set the Dependent & Independent Variables# 
Variable_Name ='30Y10Y_MBSOAS_TY'
Variable_ID = '30Y10Y_MBSOAS_TY_diff'

Parm1 ='Unemployment rate' 
Parm2='House Price Index (Level)'

VM1_list =[Variable_Name,Parm1,Parm2]
VM1 =Hist_Data
#print(Hist_Data)

Parm1_ind='Unemployment rate_diff'  
Parm1_ind_lag='Unemployment rate_diff_lag' 
Parm2_ind='House Price Index (Level)_QoQ'
Parm2_ind_lag='House Price Index (Level)_QoQ_lag'

#Clean and Transform Data# 
VM1_Tran=fcst_data_prep(VM1_list,['QoQ','diff'], VM1, '7/1/2009', '1/1/2023')

print(VM1_Tran)

VM1_Tran=VM1_Tran.dropna()

VM1_Tran[Parm1_ind_lag]=VM1_Tran[Parm1_ind].shift()
VM1_Tran[Parm2_ind_lag]=VM1_Tran[Parm2_ind].shift()

#Run Backward selection 
Full_Sample_Data = VM1_Tran.drop(['month'], axis=1).dropna()
Full_Sample_Data = Full_Sample_Data.drop(['30Y10Y_MBSOAS_TY_diff','30Y10Y_MBSOAS_TY_QoQ'], axis=1)

T=backward_regression(Full_Sample_Data, Variable_Name)
    
T.summary()


        month  30Y10Y_MBSOAS_TY  30Y10Y_MBSOAS_TY_QoQ  30Y10Y_MBSOAS_TY_diff  \
0    7/1/2009           0.00000                   NaN                    NaN   
1   10/1/2009           0.00000                   NaN                0.00000   
2    1/1/2010           0.00000                   NaN                0.00000   
3    4/1/2010           0.00000                   NaN                0.00000   
4    7/1/2010           0.00000                   NaN                0.00000   
5   10/1/2010           0.00000                   NaN                0.00000   
6    1/1/2011          18.08130                   inf               18.08130   
7    4/1/2011          21.47880              0.187901                3.39750   
8    7/1/2011          32.61530              0.518488               11.13650   
9   10/1/2011          37.57190              0.151972                4.95660   
10   1/1/2012          28.55870             -0.239892               -9.01320   
11   4/1/2012          27.87350         

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  exog_trans[x+'_QoQ'] =  exog_trans[x]/exog_trans[x].shift()-1
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  exog_trans[x+'_diff'] = exog_trans[x]-exog_trans[x].shift()


0,1,2,3
Dep. Variable:,30Y10Y_MBSOAS_TY,R-squared:,0.37
Model:,OLS,Adj. R-squared:,0.342
Method:,Least Squares,F-statistic:,13.21
Date:,"Tue, 29 Aug 2023",Prob (F-statistic):,3.07e-05
Time:,11:15:39,Log-Likelihood:,-191.62
No. Observations:,48,AIC:,389.2
Df Residuals:,45,BIC:,394.8
Df Model:,2,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
const,27.2952,3.365,8.112,0.000,20.518,34.072
Unemployment rate_QoQ,-18.2976,5.557,-3.293,0.002,-29.490,-7.105
House Price Index (Level)_QoQ,-728.9489,162.853,-4.476,0.000,-1056.951,-400.946

0,1,2,3
Omnibus:,6.03,Durbin-Watson:,1.624
Prob(Omnibus):,0.049,Jarque-Bera (JB):,7.418
Skew:,0.294,Prob(JB):,0.0245
Kurtosis:,4.834,Cond. No.,83.4


#RUN FINAL MODELS# 

In [19]:
# Set the Dependent & Independent Variables# 

#OtherSpread 30Y3M_MBSOAS_TY

Variable_Name ='30Y10Y_MBSOAS_TY'
Variable_ID ='30Y10Y_MBSOAS_TY_diff'

Parm1 ='Unemployment rate' 
Parm2='House Price Index (Level)'


#Several Parms can be used.  Must add it into the list and functions below
type = 'OLS_HAC'
tran_type ='level'
order= None

## the below paramters are used to determine if the level or the transformation is used in the model. Set the var_tran to level
dependent_var = f'{Variable_Name}'
dependent_var_tran  = f'{Variable_Name}'

Parm1_ind='Unemployment rate_diff'  
Parm1_ind_lag='Unemployment rate_diff_lag' 
Parm1_ind_lag2='Unemployment rate_diff_lag2' 
Parm2_ind='House Price Index (Level)_QoQ'
Parm2_ind_lag='House Price Index (Level)_QoQ_lag'
Parm2_ind_lag2='House Price Index (Level)_QoQ_lag2'


#MODEL INPUTES#

Independent=[Parm1_ind_lag,Parm2_ind]
pdf = f'{dependent_var}.png'

#Set the transformation of Dependent & Independent Variables names#

#Pull Actuals For the Data Transformation Process
VM1_list =[Variable_Name,Parm1,Parm2]
VM1 =Hist_Data

VM1_Tran=fcst_data_prep(VM1_list,['QoQ','diff'], VM1, '7/1/2009', '1/1/2022')

#VAdditional Transformation - Add lags and remove NAs

VM1_Tran[Parm1_ind_lag]=VM1_Tran[Parm1_ind].shift()
VM1_Tran[Parm1_ind_lag2]=VM1_Tran[Parm1_ind].shift(2)
VM1_Tran[Parm2_ind_lag]=VM1_Tran[Parm2_ind].shift()
VM1_Tran[Parm2_ind_lag2]=VM1_Tran[Parm2_ind].shift(2)

VM1_Tran=VM1_Tran.dropna()

#Test for Stationrity of All Variables

test_stationary([dependent_var], VM1_Tran[dependent_var], 'ADF', backtest= None)
test_stationary([dependent_var], VM1_Tran[dependent_var], 'KPSS', backtest= None)

test_stationary([dependent_var_tran], VM1_Tran[dependent_var_tran], 'ADF', backtest= None)
test_stationary([dependent_var_tran], VM1_Tran[dependent_var_tran], 'KPSS', backtest= None)

######################################DO NOT TOUCH THE CODE BELOW##############################

#Full Model Parameters # 
Model_Fit, VIF, Stationary_ADF1 , Stationary_KPSS1 = fit(VM1_Tran,type,Independent,dependent_var_tran,order=None)
out = Model_Fit.summary() 

Model_DA=model_diagnostic(Model_Fit,type)
print(out)
print(VIF)

residual_plot(Model_Fit, dependent_var_tran, pdf)

#Backtesting Parameters - This is setup to remove 9QRTs from the prior estimation dataset. 
Insample_parm, Insample_perform, backtest_untransformed_prediction, backtest_transformed_prediction = backtesting_plot(VM1_Tran, type, Independent, dependent_var,dependent_var_tran, pdf, 9,tran_type, order, Special_transformation=False)

#print(backtest_transformed_prediction)

#pull the scenario forecast from the library.  This will need to be updated with a CSV file. 
#dt_base =scenario.get_data(variables= VM_list_Base) # Pull the scenario forecast from the library. This will need to be replaced with a pull from CSV
Scenario_Base =fcst_data_prep(VM1_list,['level','QoQ','diff'],dt_base, '1/1/2023', '4/1/2026')

#dt_stress =scenario.get_data(variables= VM_list_Stress) # Pull the scenario forecast from the library. This will need to be replaced with a pull from CSV
Scenario_Stress =fcst_data_prep(VM1_list,['level','QoQ','diff'],dt_stress, '1/1/2023', '4/1/2026')

######################################DO NOT TOUCH THE CODE ABOVE ##############################
#Additional transformation on scenario forecasts 

#Scenario_Base=Scenario_Base.dropna()

#VAdditional Transformation - Add lags and remove NAs

Scenario_Base[Parm1_ind_lag]=Scenario_Base[Parm1_ind].shift()
Scenario_Base[Parm2_ind_lag]=Scenario_Base[Parm2_ind].shift()

#Scenario_Stress=Scenario_Stress.dropna()

Scenario_Stress[Parm1_ind_lag]=Scenario_Stress[Parm1_ind].shift()
Scenario_Stress[Parm2_ind_lag]=Scenario_Stress[Parm2_ind].shift()

######################################DO NOT TOUCH THE CODE BELOW ##############################

#Scenario forecast function 
scenario_base_data,scenario_stress_data =scenario_output(Model_Fit, Scenario_Base, Scenario_Stress,Independent,dependent_var,pdf, type, tran_type)

#Output Fullsample and Backtesting data Summary# 
Full_Estimation_Data = VM1_Tran[['month',dependent_var_tran ,Parm1_ind,Parm2_ind]]
Backtesting_Data =  VM1_Tran[:-9]
Backtesting_Estimation_Data = Backtesting_Data[['month',dependent_var_tran ,Parm1_ind,Parm2_ind]]

#print(out)
#Print out Results to Excel Workbooks 
excel_output(out,Model_DA,VIF,Insample_parm, Stationary_ADF1 , Stationary_KPSS1,Full_Estimation_Data,Backtesting_Estimation_Data, Variable_Name,backtest_untransformed_prediction, backtest_transformed_prediction,scenario_base_data,scenario_stress_data)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  exog_trans[x+'_QoQ'] =  exog_trans[x]/exog_trans[x].shift()-1
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  exog_trans[x+'_diff'] = exog_trans[x]-exog_trans[x].shift()
look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.



['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] ADF Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
['30Y10Y_MBSOAS_TY'] KPSS Result: The series is stationary
Unemployment rate_diff_lag ADF Result: The series is stationary
Unemployment rate_diff_lag ADF Result: The series is stationary
Unemployment rate_diff_lag ADF Result: The series is

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  hist_data_OutofSample['level_prediction_full'] = untransformation(hist_data_OutofSample['prediction'],hist_data_OutofSample,dependent_var_level,tran_type)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  backtest_transformed_prediction['MS_RMSE_N']=MS_RMSE_N
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-co

<Figure size 432x288 with 0 Axes>