In [1]:
import pandas as pd
import numpy as np
import statsmodels.formula.api as smf
from itertools import combinations
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.metrics import roc_auc_score
from sklearn.inspection import permutation_importance
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import Lasso, LassoCV
import shap
import re
from tqdm import tqdm
from optbinning import OptimalBinning
import utils as ut

# Settings
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

### A. Import the dataset

In [2]:
# Import the dataset
data = pd.read_csv('prepared_dataset.csv')

# Fix formats
for col in data.columns:
    data[col] = pd.to_numeric(data[col], errors='coerce')

data.head()

Unnamed: 0,target,year,account_balance_woe,duration_of_credit_month_woe,sex_marital_status_woe,type_of_apartment_woe,payment_status_of_previous_credit_woe,purpose_woe,credit_amount_woe,value_savings_stocks_woe,length_of_current_employment_woe,most_valuable_available_asset_woe,age_years_woe
0,1,2016,0.818099,0.096228,0.235341,0.404445,-0.733741,0.099235,-0.2127,0.271358,0.431137,0.028573,0.528844
1,1,2018,0.818099,-0.553595,-0.165548,0.404445,-0.733741,0.353105,-0.2127,0.271358,0.032103,-0.461035,-0.314115
2,1,2018,0.401392,-0.267315,0.235341,0.404445,0.088319,0.230524,-0.2127,0.139552,-0.298717,-0.461035,0.528844
3,1,2017,0.818099,-0.267315,-0.165548,0.404445,-0.733741,0.353105,-0.2127,0.271358,0.032103,-0.461035,-0.314115
4,1,2019,0.818099,-0.267315,-0.165548,-0.196052,-0.733741,0.353105,-0.2127,0.271358,0.032103,0.028573,-0.314115


### B. Create all possible models

In [3]:
# Define variables and generate the combinations of variables
num_vars = 5
all_vars = [var for var in data.columns if var.endswith('woe')]

# Create all combinations
model_vars = []

for combo in combinations(all_vars, num_vars):
    model_vars.append(list(combo))

In [None]:
def create_model(id, data, target, model_vars):
    # Create formula, fit the model and predict values
    formula = f'{target} ~ ' + ' + '.join(model_vars)
    model = smf.logit(formula=formula, data=data).fit(disp=0)   
    y_pred = model.predict()

    # Create pooling
    data['score'] = y_pred
    optb = OptimalBinning(name='score', dtype="numerical", solver="cp")
    optb.fit(data['score'], data['target'])
    data['bin'] = optb.transform(data['score'], metric="indices")

    # Create calibration
    calibration_df = data.groupby(['bin']).agg({'target': ['mean', 'count']})
    calibration_df = calibration_df.reset_index()
    calibration_df.columns = ['bin', 'pd', 'pd_count']
    data = data.merge(calibration_df, on='bin', how='left')

    # Create model stats
    gini_coeff = ut.calculate_gini(data['target'], y_pred)
    p_values = model.pvalues.drop('Intercept', errors='ignore')
    max_p_value = p_values.max()
    corr_matrix = data[model_vars].corr()
    max_corr = corr_matrix[corr_matrix != 1].stack().abs().max()

    # Calculate RWA
    asset_class = "Other Retail Exposures"
    data['rwa'] = data['pd'].apply(lambda x: ut.calculate_RWA(asset_class, x, 0.35, 100, 1.06))
    rwa_df = data.groupby(['year']).agg({'rwa': ['sum']})
    rwa_df.columns = ['agg_sum']

    # Perform permutation importance
    imp = ut.calculate_most_important_feature(data, 'target', model_vars)
    
    # Determine if pools are monotically increasing
    calibration_df = calibration_df.sort_values(by='bin')
    is_monotonic_increasing = calibration_df['pd'].is_monotonic_increasing

    # Calculate HHI of the last period and PSI for AP vs non-API
    hhi = ut.calculate_hhi(calibration_df, 'pd_count')
    psi = ut.calculate_psi(data, 'year', 'bin')

    # Create dictionary to return
    return {
        "model_id": id,
        "model_gini": gini_coeff,
        "psi_ap_vs_non_api": psi,
        "monotonic_bins": is_monotonic_increasing,
        "HHI_bins_application_portfolio": hhi,
        "max_p_value": max_p_value,
        "max_correlation": max_corr,
        "max_var_importance": imp[1],
        "max_div_min_rwa": rwa_df['agg_sum'].max() / rwa_df['agg_sum'].min(),        
        "num_of_predictors": len(model_vars),
    }

# Create a dataset with stats covering all models
models = []
for i, risk_drivers in enumerate(tqdm(model_vars)):
    res_model = create_model(i, data, 'target', risk_drivers)
    models.append(res_model)

all_models = pd.DataFrame(models)

 89%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▊                   | 410/462 [03:44<00:27,  1.88it/s]

In [None]:
all_models.sample(20)

In [None]:
# Applying style for formatting numerical columns
styled_df = all_models.sort_values(by='model_gini', ascending=False).head().style.format({
    "model_gini": "{:.2%}",
    "psi_ap_vs_non_api": "{:.2%}",
    "HHI_bins_application_portfolio": "{:.2%}",
    "max_p_value": "{:.2%}",
    "max_correlation": "{:.2%}",
    "max_var_importance": "{:,.2f}%",
    "max_div_min_rwa": "{:,.2%}",
})

styled_df

### C. Test other multivariate algos

In [None]:
# Helper functions
def summarise_results(model, data, model_vars):
    # Create model stats
    gini_coeff = ut.calculate_gini(data['target'], data['score'])
    p_values = model.pvalues.drop('Intercept', errors='ignore')
    max_p_value = p_values.max()
    corr_matrix = data[model_vars].corr()
    max_corr = corr_matrix[corr_matrix != 1].stack().abs().max()

    print(f'Gini: {gini_coeff:.2%}, max correlation: {max_corr:.2%}, max_p_value: {max_p_value:.2%}, {model_vars}')

In [None]:
#https://rdrr.io/github/louis-vines/creditr/f/vignettes/using-miv-to-select-variables-in-regression.Rmd
model_vars = []
potential_vars = [var for var in data.columns if var.endswith('woe')]

while True:
    all_stats = pd.DataFrame()

    # Create formula, fit the model and predict values
    if len(model_vars) == 0:
        formula = f'target ~ 1'
    else:
        formula = f'target ~ ' + ' + '.join(model_vars)
    
    model = smf.logit(formula=formula, data=data).fit(disp=0)   
    data['score'] = model.predict()
    
    for potential_var in potential_vars:
        
        # Create marginal information value per potential variables
        stats = ut.observed_expected_woe(data, potential_var, 'target', 'score')
        all_stats = pd.concat([all_stats, stats])

    # Pick the best feature above a threshold from all MIVs per feature
    mivs = all_stats.groupby('feature').agg({'miv': 'max', 'p_val': 'max'}).reset_index()
    p_value_threshold = 0.05
    filtered_df = mivs[mivs['p_val'] < p_value_threshold]

    if len(filtered_df) == 0:
        break
    else:
        best_feature = filtered_df.loc[filtered_df['miv'].idxmax()]['feature']
        
    model_vars.append(best_feature)
    potential_vars.remove(best_feature)

    # Create a summary
    formula = f'target ~ ' + ' + '.join(model_vars)
    model = smf.logit(formula=formula, data=data).fit(disp=0)   
    data['score'] = model.predict()
    summarise_results(model, data, model_vars)
    
    if (len(potential_vars) == 0) | (len(model_vars) == 5):
        break

In [None]:
def mv_lasso(df, target_column, n_features, n_alphas=100, random_state=0):
    # Separate features and target variable
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # Generate a list of alphas
    alpha_max = np.max(np.abs(X.T.dot(y))) / X.shape[0]  # Rough estimate of the upper bound
    alphas = np.logspace(-4, np.log10(alpha_max), n_alphas)

    closest_alpha = None
    closest_feature_count = np.inf

    # Iterate through alphas to find the one closest to desired number of features
    for alpha in alphas:
        lasso = Lasso(alpha=alpha, random_state=random_state)
        lasso.fit(X, y)
        nonzero_features = np.sum(lasso.coef_ != 0)

        # Check if this alpha gives a closer feature count to our target
        if abs(nonzero_features - n_features) < abs(closest_feature_count - n_features):
            closest_alpha = alpha
            closest_feature_count = nonzero_features

        # Stop if we reach the desired number of features
        if nonzero_features == n_features:
            break

    # Fit the final model with the determined alpha
    final_lasso = Lasso(alpha=closest_alpha, random_state=random_state)
    final_lasso.fit(X, y)

    # Extracting feature names
    feature_names = X.columns[final_lasso.coef_ != 0].tolist()
    
    return feature_names

# Create a model with Lasso
target_column = 'target'
potential_vars = [var for var in data.columns if var.endswith('woe')]
feature_names = mv_lasso(data[[target_column] + potential_vars], target_column, n_features=5)

# Create a summary
formula = f'target ~ ' + ' + '.join(feature_names)
model = smf.logit(formula=formula, data=data).fit(disp=0)   
data['score'] = model.predict()
summarise_results(model, data, feature_names)

In [None]:
def mv_stepwise(df, target_column, max_features, threshold_in=0.01, threshold_out=0.05, verbose=False):

    X = df.drop(columns=[target_column])  # Separate the features
    included = []
    while True:
        changed = False
        # forward step
        if len(included) < max_features:
            excluded = list(set(X.columns) - set(included))
            new_pval = pd.Series(index=excluded)
            for new_column in excluded:
                formula = "{} ~ {}".format(target_column, ' + '.join(included + [new_column]))
                model = smf.logit(formula, data=df).fit(disp=0)
                new_pval[new_column] = model.pvalues[new_column]
            best_pval = new_pval.min()
            if best_pval < threshold_in:
                best_feature = new_pval.idxmin()
                included.append(best_feature)
                changed = True
                if verbose:
                    print('Add  {:30} with p-value {:.6}'.format(best_feature, best_pval))

        # backward step
        if len(included) > 0:
            formula = "{} ~ {}".format(target_column, ' + '.join(included))
            model = smf.logit(formula, data=df).fit(disp=0)
            pvalues = model.pvalues.iloc[1:]  # exclude intercept
            worst_pval = pvalues.max()  # null if pvalues is empty
            if worst_pval > threshold_out:
                changed = True
                worst_feature = pvalues.idxmax()
                included.remove(worst_feature)
                if verbose:
                    print('Drop {:30} with p-value {:.6}'.format(worst_feature, worst_pval))

        if not changed:
            break
    return included

# Create a model with Lasso
target_column = 'target'
potential_vars = [var for var in data.columns if var.endswith('woe')]
feature_names = mv_stepwise(data[[target_column] + potential_vars], target_column, 5)

# Create a summary
formula = f'target ~ ' + ' + '.join(feature_names)
model = smf.logit(formula=formula, data=data).fit(disp=0)   
data['score'] = model.predict()
summarise_results(model, data, feature_names)