**SBID ECL Modeling - Run Multiple Regression** <br><br>_Required library: pandas, numpy, scipy, sklearn, itertools, statsmodels, matplotlib_

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.feature_selection import f_regression
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error
from statsmodels.tsa.stattools import grangercausalitytests, adfuller
from statsmodels.tsa.api import VAR
import statsmodels.api as sm
import matplotlib.pyplot as plt
import itertools
import time

t_start = time.time()

**Setup Model Arguments**

In [2]:
model_args = {
    'BASE' : "C:\\Users\\muhammad.huda\\Documents\\python\\python_scripts\\file\\input\\model_Q124"
    ,'OUTPUT' : "C:\\Users\\muhammad.huda\\Documents\\python\\python_scripts\\file\\input\\model_Q124\\output"
    ,'MEV_DATA' : "seabank_data.csv"
    ,'ODR_MOM' : "pd_odr_mom.csv"
    ,'ODR_QOQ' : "pd_odr_qoq.csv"
    ,'ODR_HOH' : "pd_odr_hoh.csv"
    ,'ODR_YOY' : "pd_odr_yoy.csv"
    ,'USE_ODR' : 'hoh'
    ,'drop_products': ['Digital_KPL', 'Digital_EML']
    ,'suffix': ''
    ,'y_transform': 'normal'
}

expected_correlation = {
    'GDP': -1,
    'USDIDR': 1,
    'CPI': 1,
    'BIREPO': 1,
    'UNEMPLOYMENT': 1
}

model_start = {
    "Digital_BCL_2": '2021-11-30',
    "Digital_BCL_3": '2021-11-30',
    "Digital_BCL_6": '2021-11-30',
    "Digital_BCL_12": '2021-11-30',
    "Digital_SCL_3": '2022-05-31',
    "Digital_SCL_6": '2022-05-31',
    "Digital_SCL_12": '2022-05-31',
    "Digital_SPL_1": '2021-10-31',
    "Digital_SPL_3": '2021-06-30',
    "Digital_SPL_6": '2021-06-30',
    "Digital_SPL_12": '2021-06-30',
    # "Digital_SPL_18": '2023-07-31',
    # "Digital_SPL_24": '2023-07-31',
    # "Digital_AKL_6": '2023-04-30',
    # "Digital_EAL_6": '2023-08-31',
    # "Digital_AKL_9": '2023-07-31',
    # "Digital_AKL_1": '2023-08-31',
    # "Digital_AKL_3": '2023-08-31',
    # "Digital_RCL_2": '2023-08-31',
    # "Digital_RCL_3": '2023-08-31',
    # "Digital_PYL_24": '2023-08-31',
    # "Digital_EAL_6": '2023-08-31',
    # "Digital_RCL_1": '2023-08-31',
    # "Digital_EAL_4": '2023-08-31',
    # "Digital_EAL_8": '2023-08-31',
    # "Digital_AKL_2": '2023-08-31',
    # "Digital_EAL_2": '2023-08-31',
    # "Digital_RCL_4": '2023-08-31',
    # "Digital_PYL_12": '2023-08-31',
    # "Digital_PYL_6": '2023-08-31',
    # "Digital_PYL_18": '2023-08-31',
}

**Read data from CSV**

In [4]:
try:
    match model_args['USE_ODR']:
        case 'mom':
            y_odr = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['ODR_MOM']), sep = ",",decimal=".", parse_dates=["pt_date"],dtype={"odr_loan": float})
        case 'qoq':
            y_odr = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['ODR_QOQ']), sep = ",",decimal=".", parse_dates=["pt_date"],dtype={"odr_loan": float})
        case 'hoh':
            y_odr = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['ODR_HOH']), sep = ",",decimal=".", parse_dates=["pt_date"],dtype={"odr_loan": float})
        case 'yoy':
            y_odr = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['ODR_YOY']), sep = ",",decimal=".", parse_dates=["pt_date"],dtype={"odr_loan": float})
        case _:
            raise ValueError("Check variable USE_ODR")

except AttributeError:
    raise AttributeError("No argument USE_ODR")

**Data loads and multiindex view for ODR variables (y-variable) of each product & tenor**

In [5]:
try:
    if len(model_args['drop_columns']) > 0:
        y_odr = y_odr.pivot(index="pt_date",columns=["pd_segment", "tenor"], values="odr_loan").drop(axis=1,columns=model_args['drop_products']).fillna(0.0)
    else:
        y_odr = y_odr.pivot(index="pt_date",columns=["pd_segment", "tenor"], values="odr_loan").fillna(0.0)
        
except KeyError:
    y_odr = y_odr.pivot(index="pt_date",columns=["pd_segment", "tenor"], values="odr_loan").fillna(0.0)

product_df = ['_'.join(map(str,x)) for x in y_odr.columns.to_list()]
odr_df = [y_odr.iloc[:, x].rename(product_df[x])for x in range(y_odr.shape[1])]
odr_df = [x for x in odr_df if x.name in model_start] # Additional filtering based on arguments testing
odr_df = [x[pd.Timestamp(model_start[x.name]):] for x in odr_df] # Additional filtering date_model for remove the leading zero of ODR normal

match model_args['y_transform']:
    case 'z_score':
        odr_df = [(x - x.mean()) / x.std(ddof=0) for x in odr_df] # Z_Score transform
    case 'log_n':
        odr_df = [np.log(x.fillna(0.0)) for x in odr_df] # Logarithmic Natural transform
    case 'log_2':
        odr_df = [np.log2(x) for x in odr_df] # Log base 2 transform
    case 'log_10':
        odr_df = [np.log10(x) for x in odr_df] # Log base 10 transform
    case 'MA_1':
        odr_df = [x.rolling(window=1).mean() for x in odr_df] # Moving average 1M transform
    case 'normal':
        odr_df = [x for x in odr_df] # Normal without transform
    case _:
        odr_df = [x for x in odr_df] # Normal by default

**Data load Macroeconomic variable**

In [6]:
x_mev = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['MEV_DATA']),sep=";",decimal=",", index_col=['Date'],parse_dates=["Date"]).fillna(0.0)
x_mev.index = x_mev.index.to_period("M").to_timestamp("M")

  x_mev = pd.read_csv(Path().joinpath(model_args['BASE'], model_args['MEV_DATA']),sep=";",decimal=",", index_col=['Date'],parse_dates=["Date"]).fillna(0.0)


**Do Pearson-Correlation for MEV dataset and MEV dataset with each y-variable**

In [7]:
x_corr = x_mev.corr(method="pearson", numeric_only=True)

corr_pairwise = ((x.name, x_mev.merge(x,left_index=True, right_index=True, how="inner")) for x in odr_df)
corr_pairwise = [(x[0],x[1].corr()) for x in corr_pairwise]

**Make 1-pairwise x and y-variable for SFA**

In [10]:
permutate = itertools.combinations(x_mev.columns.to_list(), r=1)
sfa_df = [x_mev.copy()[x[0]] for x in list(permutate)]
sfa_df = [(y.name, pd.concat([x, y.to_frame(name="odr_loan")], axis=1).fillna(0.0)) for y in odr_df for x in sfa_df]

**Make 2-pairwise x and y-variable for MFA**

In [11]:
permutate = itertools.combinations(x_mev.columns.to_list(), r=2)
mfa_df = [x_mev.copy()[[x[0],x[1]]] for x in list(permutate)]
mfa_df = [(y.name, x.merge(y.to_frame(name="odr_loan"),left_index=True, right_index=True, how="inner").fillna(0.0)) for y in odr_df for x in mfa_df]

**Regression Testing Load: Using Multiple Regression**

In [12]:
lr_sfa_pairwise = [{'model': f'{x[0]}_{i}', 'mev':x[1].drop(["odr_loan"], axis=1), 'odr_loan':x[1]['odr_loan']} for i, x in enumerate(sfa_df)]
lr_mfa_pairwise = [{'model': f'{x[0]}_{i}', 'mev':x[1].drop(["odr_loan"], axis=1), 'odr_loan':x[1]['odr_loan']} for i, x in enumerate(mfa_df)]

model_count = {'count_1pairs': len(lr_sfa_pairwise), 'count_2pairs':len(lr_mfa_pairwise)}
model_count

{'count_1pairs': 451, 'count_2pairs': 9020}

**1_Single SFA Regression**

In [15]:
sfa_models = [{"id": i, "product": x[0], 'model': f'{"X".join(x[1].columns)}','min_date':x[1].index.min(),'max_date':x[1].index.max(),'y': x[1]['odr_loan'], 'x':x[1].drop(['odr_loan'], axis=1)} for i, x in enumerate(sfa_df)]
sfa_result = []

for item in sfa_models:
    y = item['y']
    x = item['x']
    
    lr = LinearRegression()
    _run = lr.fit(x, y)
    
    _result = {"id": item['id'], "product": item['product'], "start_date": item['min_date'],"end_date": item['max_date'], "x": item['x'].columns[0], "y": item['y'].name}
    _result['pearson_dep_x1'] = [t[1] for t in corr_pairwise if t[0] == item['product']][0].loc[item['product'],x.columns[0]]
    _result['intercept'] = _run.intercept_
    _result['coef'] = _run.coef_
    _result['f_statistic'], _result['p_value'] = f_regression(x, y)

    y_fitted = lr.predict(x)
    _result['r2_score'] = r2_score(y, y_fitted)
    _result['mape'] = mean_absolute_percentage_error(y, y_fitted)
    _result['mae'] = mean_absolute_error(y, y_fitted)
    _result['rmse'] = np.sqrt(mean_squared_error(y, y_fitted))

    _result['treshold1'] = np.where((_result['p_value'] < 0.05) & (_result['r2_score'] > 0.6), "Y", "N")
    _result['treshold2'] = np.where((_result['p_value'] < 0.05) & (_result['r2_score'] > 0.5), "Y", "N")
    _result['treshold3'] = np.where((_result['p_value'] < 0.1) & (_result['r2_score'] > 0.4), "Y", "N")

    item['y_fitted'] = y_fitted
    sfa_result.append(_result)

**2_Combination MFA Regression**

In [None]:
mfa_models = [{"id": i, "product": x[0], 'model': f'{"X".join(x[1].columns)}','min_date':x[1].index.min(),'max_date':x[1].index.max(),'y': x[1]['odr_loan'], 'xs':x[1].drop(['odr_loan'], axis=1)} for i, x in enumerate(mfa_df)]
mfa_result = []
base_mev_col = [x for x in mfa_models for y in x_mev.columns if y.upper().find(x) != -1]

for item in mfa_models:
    y = item['y']
    x = item['xs']
    
    lr = LinearRegression()
    _run = lr.fit(x, y)
    
    _result = {"id": item['id'], "product": item['product'], "start_date": item['min_date'],"end_date": item['max_date'],"x1": item['xs'].columns[0], "x2": item['xs'].columns[1], "y": item['y'].name}
    _result['corr_x1_x2'] = x_corr.loc[x.columns[0],x.columns[1]]
    _result['corr_dep_x1'] = [t[1] for t in corr_pairwise if t[0] == item['product']][0].loc[item['product'],x.columns[0]]
    _result['corr_dep_x2'] = [t[1] for t in corr_pairwise if t[0] == item['product']][0].loc[item['product'],x.columns[1]]
    _result['intercept'] = _run.intercept_
    _result['coef_1'] = _run.coef_[0]
    _result['coef_2'] = _run.coef_[1]

    _f_result = f_regression(x, y)
    _result['f_statistic_1'] = _f_result[0][0]
    _result['f_statistic_2'] = _f_result[0][1]
    _result['p_value_1'] = _f_result[1][0]
    _result['p_value_2'] = _f_result[1][1]

    y_fitted = lr.predict(x)
    _result['r2_score'] = r2_score(y, y_fitted)
    _result['mape'] = mean_absolute_percentage_error(y, y_fitted)
    _result['mae'] = mean_absolute_error(y, y_fitted)
    _result['rmse'] = np.sqrt(mean_squared_error(y, y_fitted))

    # _result['hypotheses'] = np.where((_result['p_value_1'] < 0.05) & (_result['p_value_2'] < 0.05) & (_result['r2_score'] > 0.6), "Y", "N")
    _result['treshold_1'] = np.where((_result['p_value_1'] < 0.05) & (_result['p_value_2'] < 0.05) & (_result['r2_score'] > 0.6), "Y", "N")
    _result['treshold_2'] = np.where((_result['p_value_1'] < 0.05) & (_result['p_value_2'] < 0.05) & (_result['r2_score'] > 0.5), "Y", "N")
    _result['treshold_3'] = np.where((_result['p_value_1'] < 0.1) & (_result['p_value_2'] < 0.1) & (_result['r2_score'] > 0.4), "Y", "N")

    item['y_fitted'] = y_fitted
    mfa_result.append(_result)

**Save to file: <br>1. pearson-correlation mev <br>2. pearson-correlation odr_pairwise <br>3. SFA result <br>4. MFA 2-combination result <br>5. Model variables check**

In [None]:
sfa_result = pd.DataFrame(sfa_result)
mfa_result = pd.DataFrame(mfa_result)

y_odr.to_excel(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"variable_list.xlsx"])), sheet_name="odr")

"""Initiate the excel files by specified the contents in full output"""
x_corr.to_excel(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"pearson_corr.xlsx"])), sheet_name="MEV")
sfa_result.to_excel(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"sfa_result.xlsx"])), sheet_name="SFA_Result")
mfa_result.to_excel(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"mfa_result.xlsx"])), sheet_name="MFA_Result")

"""Append the MEV used for the models"""
with pd.ExcelWriter(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"variable_list.xlsx"])),mode="a") as w:
    x_mev.to_excel(w, sheet_name='mev')

"""Split and append the MFA result to follow the product respectively"""
unique_product = mfa_result['product'].unique()
with pd.ExcelWriter(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"mfa_result.xlsx"])),mode="a") as w:
    for item in unique_product:
        query = mfa_result.query(f'product == \"{item}\"')
        query.to_excel(w, sheet_name=item)

"""Split and append the pearson correlation output based on the product respectively"""
with pd.ExcelWriter(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"pearson_corr.xlsx"])),mode="a") as w:
    for i, item in enumerate(corr_pairwise):
        model, data = item[0], item[1]
        data.to_excel(w, sheet_name=f'{i}_{model}')

"""Save the variable models used for the models in MFA, validate the length and stationary checking ad fuller testing"""
check_result = []
for item in mfa_models:
    check = {'id': item['id'], 'product': item['product'], 'model': item['model'],'len_xs': len(item['xs']), 'len_y': len(item['y'])}
    check_result.append(check)

check_result = pd.DataFrame(check_result)
with pd.ExcelWriter(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"variable_list.xlsx"])),mode="a") as w:
    check_result.to_excel(w, sheet_name='model_variable_check')

"""Stationary Check Augmented Dickey-Fuller testing"""
y_statistic = []

for item in odr_df:
    normal = {'product': item.name,'transform':'normal','ADF Statistic': adfuller(item)[0], 'p_value': adfuller(item)[1]}
    z_score = {'product': item.name,'transform':'z_score','ADF Statistic': adfuller((item.fillna(0.0) - item.fillna(0.0).mean()) / item.fillna(0.0).std(ddof=0))[0], 'p_value': adfuller((item.fillna(0.0) - item.fillna(0.0).mean()) / item.fillna(0.0).std(ddof=0))[1]}
    diff_1M = {'product': item.name,'transform':'diff_1M','ADF Statistic': adfuller((item.fillna(0.0)).diff()[1:])[0], 'p_value': adfuller((item.fillna(0.0)).diff()[1:])[1]}
    moving_average_1M = {'product': item.name,'transform':'moving_average_1M','ADF Statistic': adfuller(item.fillna(0.0).rolling(window=1).mean())[0], 'p_value': adfuller(item.fillna(0.0).rolling(window=1).mean())[1]}
    # log_n = {'product': item.name,'name':'log_n','ADF Statistic': adfuller(np.log(item.dropna()))[0], 'p_value': adfuller(np.log(item.dropna()))[1]}
    
    y_statistic.append(normal)
    y_statistic.append(z_score)
    y_statistic.append(diff_1M)
    y_statistic.append(moving_average_1M)
    # y_statistic.append(log_n)

with pd.ExcelWriter(Path().joinpath(model_args["OUTPUT"], "_".join([model_args['USE_ODR'], model_args['suffix'],"variable_list.xlsx"])),mode="a") as w:
    pd.DataFrame(y_statistic).to_excel(w, sheet_name='stationary_check')

t_end = time.time()

print("Success Running ...")
print("Script finished on: ", t_end - t_start)