# Libraries

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# import the required libraries
import pandas as pd
from pandasql import sqldf
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix, precision_recall_curve, auc
from sklearn.feature_selection import f_classif
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from scipy.stats import chi2_contingency
import scorecardpy as sc
from scorecardpy.LogisticRegStats import LogisticRegStats
import random as rd
import re
from IPython.display import display
from matplotlib.backends.backend_pdf import PdfPages

# Data

In [None]:
# data prepare ------
# load germancredit data
smp_full = sc.germancredit()
smp_full['target'] = smp_full['creditability'].apply(lambda x: 1 if x == 'bad' else 0)
smp_full.loc[0:99, 'credit.amount'] = np.nan
smp_full.loc[100:199, 'credit.amount'] = -9999
smp_full['credit.amount.corr'] = smp_full['credit.amount']*2 - 1000
smp_full.loc[0:99, 'purpose'] = np.nan
smp_full.loc[100:109, 'target'] = np.nan

# Artificially multiplying the dataset
for i in range(5):
    smp_full = pd.concat([smp_full, smp_full])

# Generate a list of all month-end dates between Jan 2020 and Sep 2025
month_ends = pd.date_range(start="2020-01-31", end="2025-09-30", freq="ME")

# Randomly assign one of these month-end dates to each row
np.random.seed(123)
smp_full["RepDate_end"] = np.random.choice(month_ends, size=smp_full.shape[0])
smp_full = smp_full.reset_index(drop=True)

# 1. Preliminary analysis of variables (missings, outliers, concentration/distribution) - based on smp_full

In [None]:
# good/bad label
target = "target"

# date column (e.g. snapshot date or application date)
date = "RepDate_end"

# other columns that are not variables
var_skip = ["creditability"]

# special values for numeric variables - TBD
special_values = [-9999]

In [None]:
# all columns that are not variables
var_skip_all = var_skip + [target, date]

# heatmap for the missing values - TBD - size
sc.miss_heatmap(smp_full, var_skip, fig_width=10, fig_height=30)

# variables checks summary - output to Excel
var_cat_summary, var_num_summary, var_list = sc.expl_analysis(
    smp_full, var_skip_all, special_values
)

In [None]:
# variables distribution - TBD - only for numerical vars
sc.var_distr(smp_full, ['age.in.years'], groupby = target, special_values = special_values)

In [None]:
# analysis of shares of missings and bads in target over time
def nan_rate(target):
    return sum(np.isnan(target)) / len(target)

def bad_rate(target):
    return sum(target == 1) / (sum(target == 0) + sum(target == 1))

target_ot = smp_full.groupby(date)[target].agg([nan_rate, bad_rate])

# dates with blank target
pd.DataFrame(target_ot[target_ot["nan_rate"] > 0]["nan_rate"])

In [None]:
# bad rate over time - TBD - date axis
target_ot["bad_rate"].plot.bar()

# 2. Development sample creation

In [None]:
# selection of the development window
smp_dev = smp_full[smp_full[date].between('2020-01-31', '2024-06-30')]

# selection of variables that will be used for the development
smp_dev = smp_dev[var_list + [target, date]]

In [None]:
# check target
print(smp_dev.groupby(target, dropna=False).size())

In [None]:
# delete records with blank target
smp_dev = smp_dev[smp_dev[target].notna()]

In [None]:
# train/test split as 80/20
train, test = sc.split_df(smp_dev, ratio=0.8, seed=123).values()
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

In [None]:
# train/test sample size - TBD - rewrite
query = f"""
        select 'train' as sample, 
               sum({target}) as bads, 
               count(*) as obs, 
               sum({target})*1.00/count({target}) as BR
        from train
        union
        select 'test' as sample, 
               sum({target}) as bads, 
               count(*) as obs, 
               sum({target})*1.00/count({target}) as BR
        from test
    """
# Query execution
sqldf(query)
# pd.DataFrame({'train':pd.Series(train.groupby('target', dropna=False).size()),
#               'test':pd.Series(test.groupby('target', dropna=False).size())})

# 3. Automated binning

In [None]:
# min bin size for fine classing
min_perc_fine_bin = 0.05

# min bin size for coarse classing
count_distr_limit = 0.05

# max number of coarse classes
bin_num_limit = int(1 / count_distr_limit)

# number of decimals for bin intervals
bin_decimals = 4

In [None]:
var_inf = []
# binning
fine_class, coarse_class = sc.woebin(
    train,
    y=target,
    # x = ["age_in_years", "status_of_existing_checking_account", "foreign_worker"],
    var_skip=var_skip_all + var_inf,
    special_values=special_values,
    min_perc_fine_bin=min_perc_fine_bin,
    count_distr_limit=count_distr_limit,
    bin_num_limit=bin_num_limit,
    print_step=10,
    ignore_datetime_cols=False,
    bin_decimals=bin_decimals,
)

In [None]:
# extracting binning results to excel
pd.concat(fine_class.values()).reset_index(drop=True).to_excel("3_1_fine_classing.xlsx")
pd.concat(coarse_class.values()).reset_index(drop=True).to_excel(
    "3_2_coarse_classing_auto.xlsx"
)

# iv for variables after automated binning
coarse_class_iv = sc.vars_iv(coarse_class)
coarse_class_iv.to_excel("3_3_coarse_classing_auto_iv.xlsx")


# fine_class iv for variables after automated binning
fine_class_iv = sc.vars_iv(fine_class)
fine_class_iv.to_excel("3_3_fine_class_auto_iv.xlsx")
fine_class_iv

In [None]:
# automated filtering of variables using iv and correlation from the fine classing
var_list, var_rej_fine = sc.vars_filter(
    train, fine_class, corr_threshold=0.6, iv_threshold=0.1
)
var_rej_fine

# removing excluded variables from coarse_class dictionary
coarse_class_filt = {k: v for k, v in coarse_class.items() if k in var_list}

In [None]:
# binning visualization
var_show = ['status.of.existing.checking.account', 'credit.history','property']
coarse_class_selected = {}
# coarse_class_show = {k: v for k, v in coarse_class.items() if k in var_show}
for k in var_show:
    coarse_class_selected[k] = coarse_class[k]
sc.woebin_plot(coarse_class_selected)

In [None]:
coarse_class_filt_iv = sc.vars_iv(coarse_class_filt)
coarse_class_filt_iv

# 4. Binning adjustments 

In [None]:
# manual review and adjustment of binning (results are being saved to save_breaks_list and can be loaded from load_breaks_list)
breaks_list = sc.woebin_adj(
    train,
    y=target,
    # x = ['N103_1'],
    load_breaks_list="3_5_breaks_list_adj.py",
    save_breaks_list="3_5_breaks_list_adj.py",
    bins=coarse_class_filt,  # used in case load_breaks_list is None or not exists
    init_bins=fine_class,
    adj_all_var=False,  # False - only non-monotonic woe variables
    show_init_bins=True,  # True - to show the table with Fine classing results
    special_values=special_values,
)

In [None]:
vars_trend_excl = [
    'credit.amount',
]

In [None]:
# coarse classing after manual adjustments
_, coarse_class_adj = sc.woebin(
    train,
    y=target,
    x=list(eval(breaks_list).keys()),
    breaks_list=breaks_list,
    var_skip=vars_trend_excl,
    special_values=special_values,
    min_perc_fine_bin=min_perc_fine_bin,
    count_distr_limit=count_distr_limit,
    bin_num_limit=bin_num_limit,
    print_step=10,
    ignore_datetime_cols=False,
    bin_decimals=bin_decimals,
)

# applying woe transformations on train and test samples
train_woe = sc.woebin_ply(train, bins=coarse_class_adj)
test_woe = sc.woebin_ply(test, bins=coarse_class_adj)

# defining woe variables
vars_woe = []
for i in list(coarse_class_adj.keys()):
    vars_woe.append(i + "_woe")

# results of the final coarse classing after manual adjustments !update
pd.concat(coarse_class_adj.values()).reset_index(drop=True).to_excel(
    "3_6_coarse_classing_adj.xlsx"
)
coarse_class_adj_iv = sc.vars_iv(coarse_class_adj)
coarse_class_adj_iv.to_excel("3_7_coarse_classing_adj_iv.xlsx")
coarse_class_adj_iv

In [None]:
# IV for variables by defined subsamples (period, product etc.)
# sc.iv_group(train_woe,
#             var_list = ["age_in_years_woe"],
#             groupby = "personal_status_and_sex",
#             y = target)

# 5. Correlation analysis

In [None]:
# correlation matrix
train_woe_corr = train_woe[vars_woe].corr()
train_woe_corr.to_excel("5_1_correlation_matrix.xlsx")

# plotting correlation heatmap
plt.figure(figsize=(40, 24))
sns.heatmap(train_woe[vars_woe].corr(), cmap="YlGnBu", annot=True)

# displaying heatmap
plt.show()

# displaying heatmap
# plt.savefig('5_2_correlation_heatmap.png')
# plt.show()

In [None]:
# automated filtering of variables using iv and correlation from the fine classing
vars_cand_1, var_rej_corr = sc.vars_filter(
    train,
    coarse_class_adj,
    corr_threshold=0.6,
    iv_threshold=0.1,
    save_to="5_2_correlation_rej.xlsx",
)

In [None]:
# exclusions by corr > threshold
var_rej_corr

In [None]:
# applying woe transformations on train and test samples
train_woe = sc.woebin_ply(train[[target] + vars_cand_1], bins=coarse_class_adj)
test_woe = sc.woebin_ply(test[[target] + vars_cand_1], bins=coarse_class_adj)

In [None]:
print(test_woe.isnull().any())

# 6. Logistic regression

In [None]:
# function to derive p-values
from sklearn import linear_model
import numpy as np
import scipy.stats as stat


class LogisticRegStats:
    """
    Wrapper Class for Logistic Regression which has the usual sklearn instance
    in an attribute self.model, and pvalues, z scores and estimated
    errors for each coefficient in

    self.z_scores
    self.p_values
    self.sigma_estimates

    as well as the negative hessian of the log Likelihood (Fisher information)

    self.F_ij
    """

    def __init__(self, *args, **kwargs):  # ,**kwargs):
        self.model = linear_model.LogisticRegression(*args, **kwargs)  # ,**args)

    def fit(self, X, y):
        self.model.fit(X, y)
        #### Get p-values for the fitted model ####
        denom = 2.0 * (1.0 + np.cosh(self.model.decision_function(X)))
        denom = np.tile(denom, (X.shape[1], 1)).T
        F_ij = np.dot((X / denom).T, X)  ## Fisher Information Matrix
        eps = 1e-4
        F_ij = (
            np.dot((X / denom).T, X) + np.eye(F_ij.shape[0]) * eps
        )  ## Fisher Information Matrix
        Cramer_Rao = np.linalg.inv(F_ij)  ## Inverse Information Matrix
        sigma_estimates = np.sqrt(np.diagonal(Cramer_Rao))
        z_scores = (
            self.model.coef_[0] / sigma_estimates
        )  # z-score for eaach model coefficient
        p_values = [
            stat.norm.sf(abs(x)) * 2 for x in z_scores
        ]  ### two tailed test for p-values

        self.z_scores = z_scores
        self.p_values = p_values
        self.sigma_estimates = sigma_estimates
        self.F_ij = F_ij

## 6.1 Initial candidate

In [None]:
# defining woe variables
# list of woe variables
vars_woe = []
for i in vars_cand_1:
    vars_woe.append(i + "_woe")

# target and variables
y_train = train_woe[target]
X_train = train_woe[vars_woe]
y_test = test_woe[target]
X_test = test_woe[vars_woe]

X = pd.concat([X_train, X_test])
y = pd.concat([y_train, y_test])

# logistic regression ------
lr = LogisticRegression(penalty="l1", C=0.9, solver="saga", n_jobs=-1)
lr.fit(X_train, y_train)

# predicted proability
train_pred = lr.predict_proba(X_train)[:, 1]
test_pred = lr.predict_proba(X_test)[:, 1]
# performance ks & roc ------
train_perf = sc.perf_eva(y_train, train_pred, title="train")
test_perf = sc.perf_eva(y_test, test_pred, title="test")

In [None]:
# train bad rate
train_br = {}
train_br["Total"] = y_train.count()
train_br["Bads"] = int(y_train.sum())
train_br["Bad Rate"] = round(train_br["Bads"] / train_br["Total"], 4)
# test bad rate
test_br = {}
test_br["Total"] = y_test.count()
test_br["Bads"] = int(y_test.sum())
test_br["Bad Rate"] = round(test_br["Bads"] / test_br["Total"], 4)
test_br
# combining bad rate with performance
perf = pd.concat(
    {
        "train": pd.Series({**train_br, **train_perf}),
        "test": pd.Series({**test_br, **test_perf}),
    },
    axis=1,
).convert_dtypes()
perf = perf.loc[~perf.index.isin(["pic"])]
perf.to_excel("6_1_1_perf_train_test.xlsx")
perf

In [None]:
# logistic regression with stats
lr2 = LogisticRegStats(penalty="l1", C=0.9, solver="saga", n_jobs=-1)
lr2.fit(X_train, y_train)

# calculating p-values and exporting to excel
lr_output = pd.DataFrame(
    {
        "Variable": ["intercept"] + X_train.columns.tolist(),
        "Coefficient": [lr2.model.intercept_[0]] + lr2.model.coef_[0].tolist(),
        "P-value": [0] + lr2.p_values,
    }
)

lr_output.to_excel("6_1_2_regr_output.xlsx")
lr_output

In [None]:
# score ------
card = sc.scorecard(coarse_class_adj, lr, X_train.columns, start_zero=True)
# credit score
train_score = sc.scorecard_ply(train, card, print_step=0)
test_score = sc.scorecard_ply(test, card, print_step=0)
# output to excel
scorecard_points = pd.concat(card, ignore_index=True)
scorecard_points.to_excel("6_1_3_scorecard_points.xlsx", sheet_name="scorecard_points")

## 6.2 Excluding p-values > 10% & coefficient > 0

In [None]:
# exclusions by p value = 1
vars_pval_excl = (
    lr_output["Variable"][lr_output["P-value"] > 0.1].to_list()
    + lr_output["Variable"][lr_output["Coefficient"] > 0].to_list()
)

# list of variables
vars_cand_2 = []
for i in vars_cand_1:
    if i + "_woe" not in vars_pval_excl:
        vars_cand_2.append(i)

# list of woe variables
vars_woe = []
for i in vars_cand_2:
    vars_woe.append(i + "_woe")

# target and variables
y_train = train_woe[target]
X_train = train_woe[vars_woe]
y_test = test_woe[target]
X_test = test_woe[vars_woe]

# logistic regression ------
lr = LogisticRegression(penalty="l1", C=0.9, solver="saga", n_jobs=-1)
lr.fit(X_train, y_train)

# predicted proability
train_pred = lr.predict_proba(X_train)[:, 1]
test_pred = lr.predict_proba(X_test)[:, 1]
# performance ks & roc ------
train_perf = sc.perf_eva(y_train, train_pred, title="train")
test_perf = sc.perf_eva(y_test, test_pred, title="test")

In [None]:
# train bad rate
train_br = {}
train_br["Total"] = y_train.count()
train_br["Bads"] = int(y_train.sum())
train_br["Bad Rate"] = round(train_br["Bads"] / train_br["Total"], 4)
# test bad rate
test_br = {}
test_br["Total"] = y_test.count()
test_br["Bads"] = int(y_test.sum())
test_br["Bad Rate"] = round(test_br["Bads"] / test_br["Total"], 4)
test_br
# combining bad rate with performance
perf = pd.concat(
    {
        "train": pd.Series({**train_br, **train_perf}),
        "test": pd.Series({**test_br, **test_perf}),
    },
    axis=1,
).convert_dtypes()

perf = perf.loc[~perf.index.isin(["pic"])]
perf.to_excel("6_2_1_perf_train_test.xlsx")
perf

In [None]:
# logistic regression with stats
lr2 = LogisticRegStats(penalty="l1", C=0.9, solver="saga", n_jobs=-1)
lr2.fit(X_train, y_train)

# calculating p-values and exportign to excel
lr_output = pd.DataFrame(
    {
        "Variable": ["intercept"] + X_train.columns.tolist(),
        "Coefficient": [lr2.model.intercept_[0]] + lr2.model.coef_[0].tolist(),
        "P-value": [0] + lr2.p_values,
    }
)

lr_output.to_excel("6_2_2_regr_output.xlsx")
lr_output

In [None]:
(
    lr_output["Variable"][lr_output["P-value"] > 0.1].to_list()
    + lr_output["Variable"][lr_output["Coefficient"] > 0].to_list()
)

In [None]:
# score ------
card = sc.scorecard(coarse_class_adj, lr, X_train.columns, start_zero=True)
# credit score
train_score = sc.scorecard_ply(train, card, print_step=0)
test_score = sc.scorecard_ply(test, card, print_step=0)
# output to excel
scorecard_points = pd.concat(card, ignore_index=True)

# calculating the weights of the variables
scorecard_points_vars = scorecard_points[scorecard_points['variable'] != 'basepoints']
max_points = scorecard_points_vars.groupby('variable')['points'].max().reset_index(name='max_points')
max_points['weight'] = max_points['max_points'] / max_points['max_points'].sum()

# export to Excel
writer = pd.ExcelWriter("6_2_3_scorecard_points.xlsx", engine="xlsxwriter")
scorecard_points.to_excel(writer, sheet_name="scorecard_points")
max_points.to_excel(writer, sheet_name="variable_weights")
writer.close()

In [None]:
vars_final = lr_output["Variable"].to_list()
# binning visualization
coarse_class_final = {
    k: v for k, v in coarse_class_adj.items() if k + "_woe" in vars_final
}
sc.woebin_plot(coarse_class_final)

In [None]:
# coarse_class_vars = [k for k, v in coarse_class_adj.items() if k + "_woe" in vars_final]

# # manual review and adjustment of binning (results are being saved to save_breaks_list and can be loaded from load_breaks_list)
# breaks_list_final = sc.woebin_adj(
#     train,
#     y=target,
#     x=["agro_flag"],
#     # load_breaks_list="3_5_breaks_list_adj.py",
#     # save_breaks_list="9_9_breaks_list_adj.py",
#     bins=coarse_class_filt,  # used in case load_breaks_list is None or not exists
#     init_bins=fine_class,
#     adj_all_var=True,  # False - only non-monotonic woe variables
#     show_init_bins=True,  # True - to show the table with Fine classing results
#     special_values=special_values,
# )

# 7. Testing

In [None]:
smp_testing = sc.woebin_ply(smp_full, bins=coarse_class_adj, print_step=1)

In [None]:
smp_testing["score"] = sc.scorecard_ply(smp_full, card, print_step=0)

In [None]:
print(smp_testing[vars_woe+['score','target']].isnull().any())

In [None]:
smp_testing = smp_testing[smp_testing[target].notna()]

In [None]:
date = "RepDate_end"
smp_testing_outcome = smp_testing[smp_testing[date].between('2020-01-31', '2024-06-30')]

# adding target
train_score[target] = train[target]
test_score[target] = test[target]

In [None]:
sc.performance_testing(
    smp_testing_outcome=smp_testing_outcome,
    smp_testing=smp_testing,
    train_score=train_score,
    test_score=test_score,
    train_woe=train_woe,
    test_woe=test_woe,
    vars_woe=vars_woe,
    target=target,
    date_col=date,
    groupby_col="housing"
)

# 8. Recalibration

In [None]:
# preparing sample for recalibration
train_score  = sc.scorecard_ply(train, card, print_step=0)
train_score['target'] = train['target']
train_score['pd_regr'] = sc.pd_from_score(train_score['score'])

test_score  = sc.scorecard_ply(test, card, print_step=0)
test_score['target'] = test['target']
test_score['pd_regr'] = sc.pd_from_score(test_score['score'])

smp_calib_score = pd.concat([train_score, test_score], ignore_index=True)

# assigning ratings
bins = [0,500,540,580,620,660,700,740,780,1000]
labels = ['4.5','4.0','3.5','3.0','2.5','2.0','1.5','1.0','0.5']
smp_calib_score['rating'] = pd.cut(smp_calib_score['score'], bins=bins, labels=labels, include_lowest=True)

In [None]:
intercept, slope = sc.calibration(smp_calib_score, score='score', target='target')
print(intercept, slope)

In [None]:
smp_calib_score['score_new'] = smp_calib_score['score']*slope + intercept
smp_calib_score['score_new'] = smp_calib_score['score_new'].astype(int)
smp_calib_score['rating_new'] = pd.cut(smp_calib_score['score_new'], bins=bins, labels=labels, include_lowest=True)
smp_calib_score