In [25]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from joblib import Parallel, delayed
from itertools import combinations
from statsmodels.stats.outliers_influence import variance_inflation_factor
import re
import os
import warnings

warnings.filterwarnings("ignore")

p_val_thres = 0.1
correl_thres = 0.05
spss_thres = 0.01
#add adf test threshold
adf_thres = 0.1
vif_thres = 2.5

In [None]:
# MULTIPLE FACTOR FILTERING
def fit_and_check(var_tuple, X, Y):
    X_feature = sm.add_constant(X[list(var_tuple)])
    model = sm.OLS(Y, X_feature).fit()

    if all(model.pvalues[1:] <= p_val_thres) and all([variance_inflation_factor(X_feature.values, i) <= vif_thres for i in range(X_feature.shape[1])]) and all(model.params[1:] >= 0):
        return model
    else:
        return model
        #return None
        

In [39]:
def my_main(raw_data):
    
    start_row = raw_data.iloc[:, 1].first_valid_index()

    X = raw_data.iloc[3:,2:].reset_index(drop=True)
    X = X.astype('float64')

    Y = raw_data.iloc[start_row:,1].reset_index(drop=True)
    Y = Y.astype('float64')

    diff_treatment = raw_data.iloc[1,2:]
    sign_treatment = raw_data.iloc[2,2:]
    group_treatment = raw_data.iloc[0,2:]

    for i in range(len(X.columns)):
        if sign_treatment[i] == '-1' or sign_treatment[i] == -1:
            X.iloc[:,i] = X.iloc[:,i] * -1 / 100
        else:
            X.iloc[:,i] = X.iloc[:,i] / 100

    for i in range(len(X.columns)):
        X.rename(columns={X.columns[i]: X.columns[i]+'_'+group_treatment[i]},inplace=True)
        
    X_diff = X.copy()
    X_diff.loc[:] = np.nan
    for i in range(len(X.columns)):
        if diff_treatment[i] == 'Y':
            X_diff.iloc[:,i] = X.iloc[:,i].diff()

    X_diff = X_diff.dropna(axis=1, how='all')
    X_diff = X_diff.add_suffix('_diff')

    X = pd.concat([X, X_diff], axis = 1)
    X = X.iloc[start_row-3:,:].reset_index(drop=True)

    # STANDARDIZATION & CAUCHY
    X = (X - X.mean()) / X.std(ddof=1)
    # X = 0.5+np.arctan(X)/np.pi
    # X = (X - X.mean()) / X.std(ddof=1)


    # SINGLE FACTOR FILTERING
    roll_corr = X.copy()
    roll_corr.loc[:] = np.nan

    for i in range(2,len(roll_corr)+1):
        for j in range(len(roll_corr.columns)):
            roll_corr.iloc[i-1,j] = np.corrcoef(X.iloc[:i,j],Y[:i])[0,1]
            
    stationary_df = X.copy()
    stationary_df.loc[:] = np.nan
    stationary_df = stationary_df.head(1)

    #adf test for stationary (previous is kpss)
    for i in range(len(stationary_df.columns)):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=sm.tools.sm_exceptions.InterpolationWarning)
            stationary_df.iloc[0,i] = sm.tsa.stattools.kpss(X.iloc[:,i], regression='ct')[1]

    ### (new add)output stationary test result  ###
    stationary_output = np.transpose(stationary_df)
    stationary_output.columns = ['Test_Value']
    stationary_output['Result'] = ['Pass' if val > spss_thres else 'Fail' for val in stationary_output['Test_Value']]

    X = X.drop(columns=X.columns[((stationary_df < spss_thres).squeeze() | (roll_corr.iloc[-1] < correl_thres))])
    
    
    # # RENAME COLUMNS
    # new_column_names = []
    # for col in X.columns:
    #     # Check if the column name contains "_diff"
    #     if "_diff" in col:
    #         # Use regular expressions to extract the relevant parts
    #         match = re.search(r'(.{3})_diff', col)
    #         if match:
    #             # Extract the three characters before "_diff"
    #             prefix = match.group(1)
    #             # Update the column name as per the rule
    #             new_col = col.replace(f"{prefix}_diff", f"diff_{prefix}")
    #             # Append the updated column name to the list
    #             new_column_names.append(new_col)
    #         else:
    #             # Column name didn't match the expected pattern, keep the original name
    #             new_column_names.append(col)
    #     else:
    #         # Column name doesn't contain "_diff", keep the original name
    #         new_column_names.append(col)

    # # Assign the new column names to the DataFrame
    # X.columns = new_column_names

    model_list = []

    for n in range (2,5):
        # Create a list of all possible N-factor variable combinations
        print(f"Testing N = {n} !")
        # Extract suffixes from column names

        var_combinations = list(combinations(pd.Index([col for col in X.columns]), n))

        # Filter out combinations with repeated suffixes
        var_combinations_filtered = []
        for comb in var_combinations:
            # Extract the suffixes from the current combination
            combination_suffixes = [col[-3:] for col in comb]

            # Check if the combination has repeated suffixes
            has_repeated_suffixes = False
            for i in range(len(combination_suffixes)):
                for j in range(i + 1, len(combination_suffixes)):
                    if combination_suffixes[i] == combination_suffixes[j]:
                        has_repeated_suffixes = True
                        break
                if has_repeated_suffixes:
                    break

            if has_repeated_suffixes:
                continue

            var_combinations_filtered.append(comb)

        # Convert the filtered list of combinations to a list of tuples
        var_combinations = [tuple(comb) for comb in var_combinations_filtered]

        #var_combinations = list(combinations(pd.Index([col for col in X.columns]), n))
        
        with Parallel(n_jobs=-1) as parallel:
            model_n_list = parallel(delayed(fit_and_check)(var_tuple, X, Y) for var_tuple in var_combinations)
        
        model_n_list = [model for model in model_n_list if model is not None]
        model_list.extend(model_n_list)

    final_result = pd.DataFrame(columns=['Factor_1', 'Factor_2', 'Factor_3', 'Factor_4', 'Adj. R-Squared',
                            'Intercept', 'Coefficient_1', 'Coefficient_2', 'Coefficient_3', 'Coefficient_4',
                            'P-Value_Intercept', 'P-Value_1', 'P-Value_2', 'P-Value_3', 'P-Value_4', 'VIF_1', 'VIF_2', 'VIF_3', 'VIF_4'])

    # Create an empty list to store model_info dictionaries
    model_info_list = []

    # Iterate over the models in model_list
    for i, model in enumerate(model_list):
        # Extract the model information
        factors = model.params.index[1:5]  # Assuming the factor names are stored as index values in the model parameters
        adj_r_squared = model.rsquared_adj
        intercept = model.params[0]
        coefficients = model.params[1:5]
        p_values = model.pvalues.values[:5]
        vif_factors = [variance_inflation_factor(model.model.exog, j) for j in range(1, len(factors)+1)]
        # Create a dictionary with the model information
        model_info = {'Factor_1': factors[0] if len(factors) > 0 else '',
                    'Factor_2': factors[1] if len(factors) > 1 else '',
                    'Factor_3': factors[2] if len(factors) > 2 else '',
                    'Factor_4': factors[3] if len(factors) > 3 else '',
                    'Adj. R-Squared': adj_r_squared,
                    'Intercept': intercept,
                    'Coefficient_1': coefficients[0] if len(coefficients) > 0 else '',
                    'Coefficient_2': coefficients[1] if len(coefficients) > 1 else '',
                    'Coefficient_3': coefficients[2] if len(coefficients) > 2 else '',
                    'Coefficient_4': coefficients[3] if len(coefficients) > 3 else '',
                    'P-Value_Intercept': p_values[0] if len(p_values) > 0 else '',
                    'P-Value_1': p_values[1] if len(p_values) > 1 else '',
                    'P-Value_2': p_values[2] if len(p_values) > 2 else '',
                    'P-Value_3': p_values[3] if len(p_values) > 3 else '',
                    'P-Value_4': p_values[4] if len(p_values) > 4 else '',
                    'VIF_1': vif_factors[0] if len(vif_factors) > 0 else '',
                    'VIF_2': vif_factors[1] if len(vif_factors) > 1 else '',
                    'VIF_3': vif_factors[2] if len(vif_factors) > 2 else '',
                    'VIF_4': vif_factors[3] if len(vif_factors) > 3 else ''
                    }

        # Append the model information to the list
        model_info_list.append(model_info)

    # Concatenate the model_info dictionaries into a DataFrame
    final_result = pd.concat([final_result, pd.DataFrame(model_info_list)])

    
    
    ##### (new add) test checking (p_vale, coefficient, VIF)#####
    final_result['num_factor'] = 4-final_result[['Coefficient_1', 'Coefficient_2', 'Coefficient_3', 'Coefficient_4']].apply(lambda row: row.str.count('').sum(),axis=1)
    final_result['num_factor'] = final_result['num_factor'].apply(lambda x: int(x))

    #p_value test
    final_result['P-Value_1'] = final_result['P-Value_1'].replace('',999)
    final_result['P-Value_2'] = final_result['P-Value_2'].replace('',999)
    final_result['P-Value_3'] = final_result['P-Value_3'].replace('',999)
    final_result['P-Value_4'] = final_result['P-Value_4'].replace('',999)

    conditions_pvalue = [
        (final_result['P-Value_1'] > p_val_thres) & (final_result['P-Value_1'] != 999),
        (final_result['P-Value_2'] > p_val_thres) & (final_result['P-Value_2'] != 999),
        (final_result['P-Value_3'] > p_val_thres) & (final_result['P-Value_3'] != 999),
        (final_result['P-Value_4'] > p_val_thres) & (final_result['P-Value_4'] != 999),
    ]

    choices_pvalue = [
        'Fail',
        'Fail',
        'Fail',
        'Fail',
    ]

    final_result = (final_result.assign(
        p_value_test = np.select(conditions_pvalue, choices_pvalue, default='Pass')))

    final_result['P-Value_1'] = final_result['P-Value_1'].replace(999,'')
    final_result['P-Value_2'] = final_result['P-Value_2'].replace(999,'')
    final_result['P-Value_3'] = final_result['P-Value_3'].replace(999,'')
    final_result['P-Value_4'] = final_result['P-Value_4'].replace(999,'')


    #Coefficient test
    final_result['Coefficient_1'] = final_result['Coefficient_1'].replace('',999)
    final_result['Coefficient_2'] = final_result['Coefficient_2'].replace('',999)
    final_result['Coefficient_3'] = final_result['Coefficient_3'].replace('',999)
    final_result['Coefficient_4'] = final_result['Coefficient_4'].replace('',999)

    conditions_coe = [
        (final_result['Coefficient_1'] < 0) & (final_result['Coefficient_1'] != 999),
        (final_result['Coefficient_2'] < 0) & (final_result['Coefficient_2'] != 999),
        (final_result['Coefficient_3'] < 0) & (final_result['Coefficient_3'] != 999),
        (final_result['Coefficient_4'] < 0) & (final_result['Coefficient_4'] != 999),   
    ]

    choices_coe = [
        'Fail',
        'Fail',
        'Fail',
        'Fail',
    ]

    final_result = (final_result.assign(
        Coefficient_test = np.select(conditions_coe, choices_coe, default='Pass')))

    final_result['Coefficient_1'] = final_result['Coefficient_1'].replace(999,'')
    final_result['Coefficient_2'] = final_result['Coefficient_2'].replace(999,'')
    final_result['Coefficient_3'] = final_result['Coefficient_3'].replace(999,'')
    final_result['Coefficient_4'] = final_result['Coefficient_4'].replace(999,'')

    #VIF Test
    final_result['VIF_1'] = final_result['VIF_1'].replace('',999)
    final_result['VIF_2'] = final_result['VIF_2'].replace('',999)
    final_result['VIF_3'] = final_result['VIF_3'].replace('',999)
    final_result['VIF_4'] = final_result['VIF_4'].replace('',999)
    
    conditions_vif = [
        (final_result['VIF_1'] > vif_thres) & (final_result['VIF_1'] != 999),
        (final_result['VIF_2'] > vif_thres) & (final_result['VIF_2'] != 999),
        (final_result['VIF_3'] > vif_thres) & (final_result['VIF_3'] != 999),
        (final_result['VIF_4'] > vif_thres) & (final_result['VIF_4'] != 999),
    ]

    choices_vif = [
        'Fail',
        'Fail',
        'Fail',
        'Fail',
    ]

    final_result = (final_result.assign(
        VIF_test = np.select(conditions_vif, choices_vif, default='Pass')))

    final_result['VIF_1'] = final_result['VIF_1'].replace(999,'')
    final_result['VIF_2'] = final_result['VIF_2'].replace(999,'')
    final_result['VIF_3'] = final_result['VIF_3'].replace(999,'')
    final_result['VIF_4'] = final_result['VIF_4'].replace(999,'')

    #final_result
    final_result['test_final'] = final_result[['Coefficient_test','p_value_test','VIF_test']].apply(lambda row: 'Pass' if all(val == "Pass" for val in row) else 'Fail', axis=1)


    # Sort the DataFrame by descending order of the adjusted R-squared values
    final_result = final_result.sort_values(by='Adj. R-Squared', ascending=False).reset_index(drop=True)

    #final_result.to_csv('Regression_Output.csv'), and statinoary_output
    return (final_result, stationary_output, roll_corr)
    

In [40]:
# Get the list of CSV files in the 'input' subfolder
input_folder = r'C:\Users\UV665AR\OneDrive - EY\8.Fusion\MEF model\run_input'
csv_files = [file for file in os.listdir(input_folder) if file.endswith('.csv')]
# Create a writer object to save results to Excel
writer = pd.ExcelWriter(r'C:\Users\UV665AR\OneDrive - EY\8.Fusion\MEF model\run_output\Regression_Output_moodys.xlsx')

# Process each input file
for csv_file in csv_files:
    print('Processing'+csv_file+':')
    result, stationary_output, roll_corr = my_main(pd.read_csv(os.path.join(input_folder, csv_file)))
    ### (new add) input stationary test result to the excel ###
    roll_corr.to_excel(writer, sheet_name='Rolling_Correlation',index=True)
    stationary_output.to_excel(writer, sheet_name='Stationary_Test_Result',index=True)
    worksheet_name = os.path.splitext(csv_file)[0]  # Use the input file name as the worksheet name
    result.to_excel(writer, sheet_name=worksheet_name, index=True)

# Save the Excel file
writer.close()

Processinginput_moodys.csv:
Testing N = 2 !
Testing N = 3 !
Testing N = 4 !
