<a href="https://colab.research.google.com/github/PythonDecorator/Loan-Default-and-Credit-Risk-Classifier/blob/master/Loan_Default_and_Credit_Risk_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Libraries

In [1]:
!pip install catboost



# Import Statement

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import joblib

import warnings
warnings.filterwarnings('ignore')

# for displaying markdown
from IPython.display import display, Markdown

# sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PolynomialFeatures, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

# metrics
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    brier_score_loss,
    cohen_kappa_score,
    confusion_matrix,
    RocCurveDisplay,
    matthews_corrcoef,
    RocCurveDisplay,
    ConfusionMatrixDisplay,
)

from sklearn.model_selection import cross_val_score, learning_curve
from sklearn.calibration import calibration_curve

# others insatll
from imblearn.over_sampling import SMOTE
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier

# for predicting best features
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, RFE

# Mount GDrive

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Load Data

In [4]:
file_path_accepted = '/content/drive/MyDrive/MY WORK/ML/LoanDefault/accepted_2007_to_2018Q4.csv'
file_path_rejected = '/content/drive/MyDrive/MY WORK/ML/LoanDefault/rejected_2007_to_2018Q4.csv'

In [5]:
class DataLoader:
    def __init__(self, filepath):
        self.filepath = filepath
        self.df = None

    def load_csv(self, usecols: list = [], dtypes: dict = {}, chunksize: int = 0, nrows: int = 0):
        """
        Load CSV with optional column selection, dtypes, chunking, and limited rows.
        """
        if nrows:
            self.df = pd.read_csv(self.filepath, usecols=usecols or None, dtype=dtypes or None, nrows=nrows)
        elif chunksize:
            chunks = []
            for chunk in pd.read_csv(self.filepath, usecols=usecols or None, dtype=dtypes or None, chunksize=chunksize):
                chunks.append(chunk)
            self.df = pd.concat(chunks, ignore_index=True)
        else:
            self.df = pd.read_csv(self.filepath, usecols=usecols or None, dtype=dtypes or None)
        return self.df

    def preview_columns(self, start_col=0, ncols=10, nrows=10):
        """
        Preview a subset of the dataframe:
        - First nrows
        - Columns from start_col to start_col + ncols
        """
        if self.df is None:
            raise ValueError("Dataframe not loaded. Call load_csv() first.")
        end_col = start_col + ncols
        return self.df.iloc[:nrows, start_col:end_col]


In [6]:
cols = [
    # 'id',
    'loan_amnt',
    'funded_amnt',
    'funded_amnt_inv',
    'term',
    'int_rate',
    'installment',
    'grade',
    'sub_grade',
    # 'emp_title',
    'emp_length',
    'home_ownership',
    'annual_inc',
    'verification_status',
    # 'issue_d',
    'loan_status',
    'pymnt_plan',
    # 'url',
    'purpose',
    # 'title',
    # 'zip_code',
    # 'addr_state',
    'dti',
    'delinq_2yrs',
    # 'earliest_cr_line',
    'fico_range_low',
    'fico_range_high',
    'inq_last_6mths',
    'open_acc',
    'pub_rec',
    'revol_bal',
    'revol_util',
    'total_acc',
    'initial_list_status',
    'out_prncp',
    'out_prncp_inv',
    'total_pymnt',
    'total_pymnt_inv',
    'total_rec_prncp',
    'total_rec_int',
    'total_rec_late_fee',
    'recoveries',
    'collection_recovery_fee',
    # 'last_pymnt_d',
    'last_pymnt_amnt',
    # 'last_credit_pull_d',
    'last_fico_range_high',
    'last_fico_range_low',
    'collections_12_mths_ex_med',
    'mths_since_last_delinq',
    'policy_code',
    'application_type',
    'acc_now_delinq',
    'tot_coll_amt',
    'tot_cur_bal',
    'open_acc_6m',
    'open_act_il',
    'open_il_12m',
    'open_il_24m',
    'mths_since_rcnt_il',
    'total_bal_il',
    # 'il_util',
    'open_rv_12m',
    'open_rv_24m',
    'max_bal_bc',
    'all_util',
    'total_rev_hi_lim',
    'inq_fi',
    'total_cu_tl',
    'inq_last_12m',
    'acc_open_past_24mths',
    'avg_cur_bal',
    'bc_open_to_buy',
    'bc_util',
    'chargeoff_within_12_mths',
    'delinq_amnt',
    'mo_sin_old_il_acct',
    'mo_sin_old_rev_tl_op',
    'mo_sin_rcnt_rev_tl_op',
    'mo_sin_rcnt_tl',
    'mort_acc',
    'mths_since_recent_bc',
    'mths_since_recent_inq',
    'num_accts_ever_120_pd',
    'num_actv_bc_tl',
    'num_actv_rev_tl',
    'num_bc_sats',
    'num_bc_tl',
    'num_il_tl',
    'num_op_rev_tl',
    'num_rev_accts',
    'num_rev_tl_bal_gt_0',
    'num_sats',
    # 'num_tl_120dpd_2m',
    'num_tl_30dpd',
    'num_tl_90g_dpd_24m',
    'num_tl_op_past_12m',
    'pct_tl_nvr_dlq',
    'percent_bc_gt_75',
    'pub_rec_bankruptcies',
    'tax_liens',
    'tot_hi_cred_lim',
    'total_bal_ex_mort',
    'total_bc_limit',
    'total_il_high_credit_limit',
    'hardship_flag',
    'disbursement_method',
    'debt_settlement_flag'
]

In [7]:
dtypes = {
    'loan_amnt': 'float64',
    'funded_amnt': 'float64',
    'funded_amnt_inv': 'float64',
    'term': 'object',
    'int_rate': 'float64',
    'installment': 'float64',
    'grade': 'object',
    'sub_grade': 'object',
    'emp_title': 'object',
    'emp_length': 'object',
    'home_ownership': 'object',
    'annual_inc': 'float64',
    'verification_status': 'object',
    'issue_d': 'object',
    'loan_status': 'object',
    'pymnt_plan': 'object',
    'purpose': 'object',
    'dti': 'float64',
    'delinq_2yrs': 'float64',
    'earliest_cr_line': 'object',
    'fico_range_low': 'float64',
    'fico_range_high': 'float64',
    'inq_last_6mths': 'float64',
    'open_acc': 'float64',
    'pub_rec': 'float64',
    'revol_bal': 'float64',
    'revol_util': 'float64',
    'total_acc': 'float64',
    'initial_list_status': 'object',
    'out_prncp': 'float64',
    'out_prncp_inv': 'float64',
    'total_pymnt': 'float64',
    'total_pymnt_inv': 'float64',
    'total_rec_prncp': 'float64',
    'total_rec_int': 'float64',
    'total_rec_late_fee': 'float64',
    'recoveries': 'float64',
    'collection_recovery_fee': 'float64',
    'last_pymnt_d': 'object',
    'last_pymnt_amnt': 'float64',
    'last_credit_pull_d': 'object',
    'last_fico_range_high': 'float64',
    'last_fico_range_low': 'float64',
    'collections_12_mths_ex_med': 'float64',
    'mths_since_last_delinq': 'float64',
    'policy_code': 'float64',
    'application_type': 'object',
    'acc_now_delinq': 'float64',
    'tot_coll_amt': 'float64',
    'tot_cur_bal': 'float64',
    'open_acc_6m': 'float64',
    'open_act_il': 'float64',
    'open_il_12m': 'float64',
    'open_il_24m': 'float64',
    'mths_since_rcnt_il': 'float64',
    'total_bal_il': 'float64',
    'open_rv_12m': 'float64',
    'open_rv_24m': 'float64',
    'max_bal_bc': 'float64',
    'all_util': 'float64',
    'total_rev_hi_lim': 'float64',
    'inq_fi': 'float64',
    'total_cu_tl': 'float64',
    'inq_last_12m': 'float64',
    'acc_open_past_24mths': 'float64',
    'avg_cur_bal': 'float64',
    'bc_open_to_buy': 'float64',
    'bc_util': 'float64',
    'chargeoff_within_12_mths': 'float64',
    'delinq_amnt': 'float64',
    'mo_sin_old_il_acct': 'float64',
    'mo_sin_old_rev_tl_op': 'float64',
    'mo_sin_rcnt_rev_tl_op': 'float64',
    'mo_sin_rcnt_tl': 'float64',
    'mort_acc': 'float64',
    'mths_since_recent_bc': 'float64',
    'mths_since_recent_inq': 'float64',
    'num_accts_ever_120_pd': 'float64',
    'num_actv_bc_tl': 'float64',
    'num_actv_rev_tl': 'float64',
    'num_bc_sats': 'float64',
    'num_bc_tl': 'float64',
    'num_il_tl': 'float64',
    'num_op_rev_tl': 'float64',
    'num_rev_accts': 'float64',
    'num_rev_tl_bal_gt_0': 'float64',
    'num_sats': 'float64',
    'num_tl_30dpd': 'float64',
    'num_tl_90g_dpd_24m': 'float64',
    'num_tl_op_past_12m': 'float64',
    'pct_tl_nvr_dlq': 'float64',
    'percent_bc_gt_75': 'float64',
    'pub_rec_bankruptcies': 'float64',
    'tax_liens': 'float64',
    'tot_hi_cred_lim': 'float64',
    'total_bal_ex_mort': 'float64',
    'total_bc_limit': 'float64',
    'total_il_high_credit_limit': 'float64',
    'hardship_flag': 'object',
    'disbursement_method': 'object',
    'debt_settlement_flag': 'object'
}

In [8]:
data_loader = DataLoader(file_path_accepted)
accepted_df = data_loader.load_csv(usecols=cols, dtypes=dtypes, chunksize=100_000)

# Data Visualization EDA

In [9]:
class DataVisualization:
    def __init__(self, data: pd.DataFrame):
        self.data = data

    def basic_info(self):
        """Show shape, columns, head/tail, duplicates."""
        display(Markdown("## Basic Info"))
        display(Markdown(f"- Shape: **{self.data.shape}**"))

        display(Markdown("### Top 5 rows"))
        display(self.data.head())

        display(Markdown("### Duplicate rows"))
        duplicates = self.data[self.data.duplicated(keep=False)]
        display(duplicates)

    def export_combined_data_summary(self, filename: str = "data_summary.csv", data = None):
        """Summary of each column saved to CSV and displayed as Markdown table."""
        if data is None:
            data = self.data

        summary = pd.DataFrame({
            "Column Name": self.data.columns,
            "First Row": self.data.iloc[0].values,
            "Last Row": self.data.iloc[-1].values,
            "Data Type": self.data.dtypes.values,
            "Missing Count": self.data.isnull().sum().values,
            "Missing %": (self.data.isnull().sum() / len(self.data) * 100).round(2).values,
            "Unique Values": self.data.nunique().values
        })
        summary.to_csv(filename, index=False)
        # display(Markdown(f"✅ Combined data summary saved to **{filename}**"))

        # md_table = "## Combined Data Summary\n"
        # md_table += "| Column Name | First Row | Last Row | Data Type | Missing Count | Missing % | Unique Values |\n"
        # md_table += "|---|---|---|---|---|---|---|\n"
        # for _, row in summary.iterrows():
        #     md_table += f"| {row['Column Name']} | {row['First Row']} | {row['Last Row']} | {row['Data Type']} | {row['Missing Count']} | {row['Missing %']} | {row['Unique Values']} |\n"
        # display(Markdown(md_table))

    def describe_numeric(self, columns=None):
        """Describe numeric columns (rounded to 2 decimals)."""
        cols = columns if columns else self.data.select_dtypes(include=np.number).columns
        display(Markdown("## Numeric Summary"))
        display(self.data[cols].describe().round(2))

    def outliers_zscore(self, column: str, threshold=3):
        """Show rows that are outliers in `column` by Z-score."""
        display(Markdown(f"## Outliers in `{column}` (|Z|>{threshold})"))
        z_scores = np.abs(stats.zscore(self.data[column].dropna()))
        outliers = self.data.loc[z_scores.index[z_scores > threshold]]
        display(outliers)

    def unique_values(self, columns: list):
        """Display unique values for a list of columns as a Markdown table."""
        display(Markdown("## Unique Values"))
        max_len = max(self.data[col].nunique() for col in columns)
        md_table = "| " + " | ".join(columns) + " |\n"
        md_table += "| " + " | ".join(["---"] * len(columns)) + " |\n"
        for i in range(max_len):
            row = []
            for col in columns:
                uniques = self.data[col].unique()
                row.append(str(uniques[i]) if i < len(uniques) else "-")
            md_table += "| " + " | ".join(row) + " |\n"
        display(Markdown(md_table))

    def plot_bar_kde(self, columns: list):
        """Plot histogram + KDE for numeric columns."""
        for col in columns:
            plt.figure(figsize=(6,4))
            sns.histplot(self.data[col], kde=True)
            plt.title(f"Distribution of {col}")
            plt.show()

    def plot_scatter(self, x: str, y: str, hue = None):
        """Plot scatter between two numeric columns, optionally with hue."""
        plt.figure(figsize=(6,4))
        sns.scatterplot(data=self.data, x=x, y=y, hue=hue, alpha=0.6)
        plt.title(f"{x} vs {y}")
        plt.show()

    def run(self, numeric_cols=None, outlier_col=None, unique_cols=None, bar_cols=None, scatter=None):
        """
        Run a quick full EDA pipeline.
        """
        self.basic_info()
        self.export_combined_data_summary()

        if numeric_cols:
            self.describe_numeric(numeric_cols)
        if outlier_col:
            self.outliers_zscore(outlier_col)
        if unique_cols:
            self.unique_values(unique_cols)
        if bar_cols:
            self.plot_bar_kde(bar_cols)
        if scatter:
            self.plot_scatter(*scatter)




In [10]:
eda = DataVisualization(accepted_df)
eda.run()


## Basic Info

- Shape: **(2260701, 96)**

### Top 5 rows

Unnamed: 0,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_length,home_ownership,...,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,hardship_flag,disbursement_method,debt_settlement_flag
0,3600.0,3600.0,3600.0,36 months,13.99,123.03,C,C4,10+ years,MORTGAGE,...,0.0,0.0,0.0,178050.0,7746.0,2400.0,13734.0,N,Cash,N
1,24700.0,24700.0,24700.0,36 months,11.99,820.28,C,C1,10+ years,MORTGAGE,...,7.7,0.0,0.0,314017.0,39475.0,79300.0,24667.0,N,Cash,N
2,20000.0,20000.0,20000.0,60 months,10.78,432.66,B,B4,10+ years,MORTGAGE,...,50.0,0.0,0.0,218418.0,18696.0,6200.0,14877.0,N,Cash,N
3,35000.0,35000.0,35000.0,60 months,14.85,829.9,C,C5,10+ years,MORTGAGE,...,0.0,0.0,0.0,381215.0,52226.0,62500.0,18000.0,N,Cash,N
4,10400.0,10400.0,10400.0,60 months,22.45,289.91,F,F1,3 years,MORTGAGE,...,60.0,0.0,0.0,439570.0,95768.0,20300.0,88097.0,N,Cash,N


### Duplicate rows

Unnamed: 0,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_length,home_ownership,...,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,hardship_flag,disbursement_method,debt_settlement_flag
421095,,,,,,,,,,,...,,,,,,,,,,
421096,,,,,,,,,,,...,,,,,,,,,,
528961,,,,,,,,,,,...,,,,,,,,,,
528962,,,,,,,,,,,...,,,,,,,,,,
651664,,,,,,,,,,,...,,,,,,,,,,
651665,,,,,,,,,,,...,,,,,,,,,,
749520,,,,,,,,,,,...,,,,,,,,,,
749521,,,,,,,,,,,...,,,,,,,,,,
877716,,,,,,,,,,,...,,,,,,,,,,
877717,,,,,,,,,,,...,,,,,,,,,,


# Data Cleaning

In [11]:
class DataCleaner:
    """Class for cleaning and preprocessing a DataFrame."""

    def __init__(self, data: pd.DataFrame):
        # copy the data to avoid changing the original data
        self.data = data.copy()

    def remove_duplicates(self):
        """Remove duplicate rows from the DataFrame."""
        self.data.drop_duplicates(inplace=True)

    def drop_cols_with_over_30_missing(self, threshold: float = 0.3):
        """
        Drop columns with more than `threshold` missing values.
        Default threshold = 0.3 (30%).
        """
        missing_fraction = self.data.isna().mean()
        cols_to_drop = missing_fraction[missing_fraction > threshold].index
        self.data.drop(columns=cols_to_drop, inplace=True)

    def normalize_emp_length_to_numeric(self):
        """
        Convert 'emp_length' column to numeric values.
        '10+ years' -> 10
        '3 years'   -> 3
        -> np.nan (then filled)
        """
        if 'emp_length' in self.data.columns:
            def convert_emp_length(val):
                if pd.isna(val):
                    return np.nan
                val = str(val).lower().strip()
                if '<' in val:          # '< 1 year'
                    return 0.5
                if '10+' in val:        # '10+ years'
                    return 10
                if 'n/a' in val or 'na' == val:
                    return np.nan
                digits = ''.join(c for c in val if c.isdigit())
                return float(digits) if digits else np.nan

            self.data['emp_length'] = self.data['emp_length'].apply(convert_emp_length)

            # fill missing with mean (or other estimate)
            mean_val = self.data['emp_length'].mean()
            self.data['emp_length'].fillna(mean_val, inplace=True)

    def fill_other_missing_values(self):
        """
        Fill missing values:
        - For numeric columns, fill with mean.
        - For object/categorical columns, fill with mode.
        """
        for col in self.data.columns:
            if self.data[col].dtype in [np.float64, np.float32, np.int64, np.int32]:
                self.data[col].fillna(self.data[col].mean(), inplace=True)
            else:
                mode_val = self.data[col].mode()
                if not mode_val.empty:
                    self.data[col].fillna(mode_val[0], inplace=True)

    def convert_to_categorical(self, columns=None):
        """
        Convert specified columns to categorical dtype.
        If columns is None, convert all object/string columns.
        """
        if columns is None:
            # Convert all object dtype columns
            cols_to_convert = self.data.select_dtypes(include='object').columns
        else:
            cols_to_convert = columns

        for col in cols_to_convert:
            self.data[col] = self.data[col].astype('category')

    def export_combined_data_summary(self, filename: str = "final_data_summary.csv"):
        """Summary of each column saved to CSV and displayed as Markdown table."""
        summary = pd.DataFrame({
            "Column Name": self.data.columns,
            "First Row": self.data.iloc[0].values,
            "Last Row": self.data.iloc[-1].values,
            "Data Type": self.data.dtypes.values,
            "Missing Count": self.data.isnull().sum().values,
            "Missing %": (self.data.isnull().sum() / len(self.data) * 100).round(2).values,
            "Unique Values": self.data.nunique().values
        })
        summary.to_csv(filename, index=False)

    def clean(self, filename: str = "final_data_summary.csv"):
        """Run the full cleaning pipeline."""
        self.remove_duplicates()
        self.drop_cols_with_over_30_missing()  # default 30% threshold
        self.normalize_emp_length_to_numeric()
        self.fill_other_missing_values()
        self.convert_to_categorical()
        self.export_combined_data_summary(filename=filename)

    def get_cleaned_data(self):
        """Get the cleaned DataFrame."""
        print(f"✅ Data cleaning complete! - Shape {self.data.shape}")
        return self.data


# Get Accepted Cleaded Data

In [12]:
# cleaner instance
data_cleaner = DataCleaner(accepted_df)

# perform data cleaning
data_cleaner.clean()

# get the cleaned data
cleaned_accepted_df = data_cleaner.get_cleaned_data()

✅ Data cleaning complete! - Shape (2260669, 82)


In [13]:
# check the target col, loan_status
cleaned_accepted_df['loan_status'].value_counts()

Unnamed: 0_level_0,count
loan_status,Unnamed: 1_level_1
Fully Paid,1076752
Current,878317
Charged Off,268559
Late (31-120 days),21467
In Grace Period,8436
Late (16-30 days),4349
Does not meet the credit policy. Status:Fully Paid,1988
Does not meet the credit policy. Status:Charged Off,761
Default,40


# Loan Status Distribution

- **Good Loans (Fully Paid + DNM FP):** 1,078,740 (~78%)
- **Bad Loans (Charged Off + Default + Late + In Grace + DNM CO):** 303,612 (~22%)
- **Current Loans:** 878,317 (kept as separate test set)

## Observations

- Bad loans are a minority → class imbalance is expected.
- Model will see more “good” examples than “bad” during training.
- Current loans are kept separate to evaluate future defaults.

## Handling Class Imbalance

- Use **class weights** in the model (`class_weight='balanced'` in sklearn).  
- **Oversample** bad loans (SMOTE, RandomOverSampler).  
- **Undersample** good loans (with caution).  
- Evaluate with metrics robust to imbalance: **ROC-AUC**, **Precision-Recall**.


# Split cleaned data to test and train

In [14]:

class DataSplitter:
    """
    Prepare Lending Club data for default prediction.

    - Training/testing split is only on finalized loans.
    - Current loans are kept separate for future scoring.
    - Creates binary target 'default_flag' (1 = bad, 0 = good).
    - Optional: oversample the minority class in the training data.
    """

    def __init__(self, cleaned_data: pd.DataFrame):
        self.data = cleaned_data.copy()
        self.finalized_df = None
        self.current_df = None

        # run preparation
        self.prepare_data()

    def prepare_data(self):
        """Create default_flag and separate finalized and current loans."""
        bad_statuses = [
            'Charged Off',
            'Default',
            'Does not meet the credit policy. Status:Charged Off',
            'Late (31-120 days)',
            'Late (16-30 days)',
            'In Grace Period'
        ]
        good_statuses = [
            'Fully Paid',
            'Does not meet the credit policy. Status:Fully Paid'
        ]

        # Finalized loans: used for train/test split
        self.finalized_df = self.data[self.data['loan_status'].isin(bad_statuses + good_statuses)].copy()
        self.finalized_df['default_flag'] = np.where(self.finalized_df['loan_status'].isin(bad_statuses), 1, 0)

        # Current loans: kept separately for scoring
        self.current_df = self.data[self.data['loan_status'] == 'Current'].copy()

        return self.finalized_df, self.current_df

    def split_train_test(self, test_size: float = 0.2, random_state: int = 42, oversample: bool = False):
        """
        Split finalized loans into training and testing sets.
        Optionally oversample the minority class in the training set.
        """
        if self.finalized_df is None:
            raise ValueError("Call prepare_data() first.")

        X = self.finalized_df.drop(['loan_status', 'default_flag'], axis=1)
        y = self.finalized_df['default_flag']

        x_train, x_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=y
        )

        return x_train, x_test, y_train, y_test

    def get_finalized_and_current(self):
        """Return finalized (for training/testing) and current (for scoring) DataFrames."""
        if self.finalized_df is None or self.current_df is None:
            raise ValueError("Call prepare_data() first.")
        return self.finalized_df, self.current_df



# Perform Feature Scaling

In [15]:
class FeatureScaler:
    """Feature Scaling class to apply various scaling techniques."""

    def __init__(self, x_train, x_test):
        # Store training and testing feature sets
        self.x_train = x_train
        self.x_test = x_test

    def _apply_scaler(self, scaler) -> tuple:
        """Internal method to apply any scaler to train and test data."""
        # Fit scaler on training data and transform
        x_train_scaled = scaler.fit_transform(self.x_train)

        # Transform test data using the same parameters
        x_test_scaled = scaler.transform(self.x_test)
        return x_train_scaled, x_test_scaled, scaler

    def standard_scaling(self) -> tuple:
        """Apply StandardScaler (Z-score normalization)."""
        # Center features around 0 and scale to unit variance
        return self._apply_scaler(StandardScaler())

    def min_max_scaling(self, feature_range: tuple=(0, 1)) -> tuple:
        """Apply MinMaxScaler (scales features to a specified range)."""
        # Scale features to a fixed range, e.g., 0-1
        return self._apply_scaler(MinMaxScaler(feature_range=feature_range))


# Feature Selecting

In [16]:
class FeatureSelector:
    """Feature Selection class to reduce input features to the most relevant ones."""

    def __init__(self, x_train, x_test, y_train):
        # Store training and testing feature sets and the target for supervised selection
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train

    def select_k_best(self, score_func=f_classif, k=10):
        """Select top k features using SelectKBest (univariate feature selection)."""
        # Initialize selector with given scoring function and number of features
        selector = SelectKBest(score_func=score_func, k=k)

        # Fit on training data and transform
        x_train_selected = selector.fit_transform(self.x_train, self.y_train)

        # Transform test data using same selected features
        x_test_selected = selector.transform(self.x_test)
        return x_train_selected, x_test_selected

    def recursive_feature_elimination(self, estimator=None, n_features_to_select=10):
        """Use RFE to iteratively remove less important features (model-based)."""
        if estimator is None:
            # Default to Logistic Regression if no estimator is provided
            estimator = LogisticRegression(max_iter=1000)

        # Initialize RFE with estimator and number of features to select
        selector = RFE(estimator=estimator, n_features_to_select=n_features_to_select
                       )
        # Fit RFE on training data and transform
        x_train_selected = selector.fit_transform(self.x_train, self.y_train)

        # Transform test data using selected features
        x_test_selected = selector.transform(self.x_test)
        return x_train_selected, x_test_selected


# Perform Feature Engineering/Encoding

In [17]:
class FeatureEngineer:
    """Feature Engineering class to create new features and encode categorical data."""

    def __init__(self, cleaned_data: pd.DataFrame):
        # Store the dataset to be transformed
        self.data = cleaned_data

    def add_polynomial_features(self, degree=2, interaction_only=False, include_bias=False) -> pd.DataFrame:
        """Add polynomial and interaction terms to the dataset."""
        # Initialize polynomial feature transformer
        poly = PolynomialFeatures(degree=degree,
                                  interaction_only=interaction_only,
                                  include_bias=include_bias)

        # Fit and transform the dataset
        transformed = poly.fit_transform(self.data)

        # Get meaningful column names for the new features
        feature_names = poly.get_feature_names_out(self.data.columns)

        # Return a new DataFrame with the polynomial features
        return pd.DataFrame(transformed, columns=feature_names)

    def one_hot_encode(self, categorical_columns: list):
        """Fit OneHotEncoder on the dataset and return transformed DataFrame and transformer."""
        encoder = ColumnTransformer(
            transformers=[('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), categorical_columns)],
            remainder='passthrough'
        )

        # Fit and transform training data
        transformed = encoder.fit_transform(self.data)

        # If sparse matrix, convert to dense
        if hasattr(transformed, "toarray"):
            transformed = transformed.toarray()

        # Get feature names and preserve index
        feature_names = encoder.get_feature_names_out()
        df_transformed = pd.DataFrame(transformed, columns=feature_names, index=self.data.index)

        return df_transformed, encoder

# Model Metric

In [18]:
class Metrics:
    """Class for calculating and visualizing classification metrics with probability-based analysis and model diagnostics."""

    def __init__(self, x_train, x_test, y_train, y_test, model=None):
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train
        self.y_test = y_test
        self.model = model  # CatBoost or any sklearn-like model

    # ------------------------- Core Metric Plots -------------------------
    def plot_confusion_matrix(self, y_true, y_pred, model_name="Model"):
        cm = confusion_matrix(y_true, y_pred)
        plt.figure(figsize=(5, 4))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title(f"{model_name} - Confusion Matrix")
        plt.show()

    def plot_roc_curve(self, y_true, y_proba, model_name="Model"):
        if y_proba is not None:
            RocCurveDisplay.from_predictions(y_true, y_proba)
            plt.title(f"{model_name} - ROC Curve")
            plt.show()

    # ------------------------- Metrics Calculation -------------------------
    def get_classification_metrics(self, y_true, y_pred, y_proba=None, model_name="Model", plot=True):
        acc = accuracy_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred)
        roc_auc = roc_auc_score(y_true, y_proba) if y_proba is not None else None
        brier = brier_score_loss(y_true, y_proba) if y_proba is not None else None
        mcc = matthews_corrcoef(y_true, y_pred)
        kappa = cohen_kappa_score(y_true, y_pred)

        results = {
            "Model": model_name,
            "Accuracy": acc,
            "F1": f1,
            "Precision": prec,
            "Recall": rec,
            "ROC-AUC": roc_auc,
            "Cohen_Kappa": kappa,
            "Brier_Score": brier
        }

        if plot:
            display(Markdown(f"### {model_name} Metrics"))
            print(results)
            self.plot_confusion_matrix(y_true, y_pred, model_name)
            if y_proba is not None:
                self.plot_roc_curve(y_true, y_proba, model_name)
            print("\n")

        return results

    # ------------------------- Overfitting & Cross-Validation -------------------------
    def cross_validation_check(self, cv=5, scoring='f1'):
        if self.model is None:
            print("No model provided for cross-validation.")
            return None
        cv_scores = cross_val_score(self.model, self.x_train, self.y_train, cv=cv, scoring=scoring)
        print(f"Cross-Validation ({cv}-fold) {scoring.upper()} Scores: {cv_scores}")
        print(f"Mean {scoring.upper()}: {np.mean(cv_scores):.4f}, Std: {np.std(cv_scores):.4f}")
        return cv_scores

    def plot_learning_curve(self, cv=5, scoring='f1'):
        if self.model is None:
            print("No model provided for learning curve.")
            return
        train_sizes, train_scores, val_scores = learning_curve(
            self.model, self.x_train, self.y_train,
            cv=cv, scoring=scoring, train_sizes=np.linspace(0.1, 1.0, 5)
        )
        plt.figure(figsize=(6,4))
        plt.plot(train_sizes, np.mean(train_scores, axis=1), label="Train Score")
        plt.plot(train_sizes, np.mean(val_scores, axis=1), label="Validation Score")
        plt.xlabel("Training Size")
        plt.ylabel(scoring.upper())
        plt.title("Learning Curve")
        plt.legend()
        plt.show()

    # ------------------------- Calibration Check -------------------------
    def plot_calibration_curve(self):
        if self.model is None:
            print("No model provided for calibration plot.")
            return
        if hasattr(self.model, "predict_proba"):
            y_prob = self.model.predict_proba(self.x_test)[:, 1]
            prob_true, prob_pred = calibration_curve(self.y_test, y_prob, n_bins=10)
            plt.figure(figsize=(5,4))
            plt.plot(prob_pred, prob_true, marker='o', label='Calibration')
            plt.plot([0,1],[0,1], linestyle='--', label='Perfectly Calibrated')
            plt.xlabel("Predicted Probability")
            plt.ylabel("True Probability")
            plt.title("Calibration Plot")
            plt.legend()
            plt.show()
        else:
            print("Model does not support probability predictions.")

    # ------------------------- Data Leakage Check -------------------------
    def check_data_leakage(self):
        # Overlapping rows between train and test
        overlap = pd.merge(self.x_train, self.x_test, how='inner')
        print(f"Overlapping rows between train and test: {len(overlap)}")

        # Feature correlation with target
        corr_with_target = self.x_train.corrwith(self.y_train)
        print("\nTop 5 features correlated with target:")
        print(corr_with_target.sort_values(ascending=False).head())

    # ------------------------- Feature Importance -------------------------
    def plot_feature_importance(self):
        if self.model is None:
            print("No model provided for feature importance.")
            return
        if hasattr(self.model, "get_feature_importance"):
            importance = self.model.get_feature_importance()
            features = self.x_train.columns
            plt.figure(figsize=(6,8))
            plt.barh(features, importance)
            plt.xlabel("Feature Importance")
            plt.ylabel("Feature")
            plt.title("Feature Importance")
            plt.show()
        else:
            print("Model does not support feature importance.")



# Build Models

In [19]:
class BuildClassifierModels:
    """Class to build, train, and evaluate classification models for loan default prediction."""

    def __init__(self, x_train, x_test, y_train, y_test, feature_selector=None):
        # Store train/test data
        self.x_train = x_train
        self.y_train = y_train
        self.x_test = x_test
        self.y_test = y_test

        # Optional feature selector
        self.feature_selector = feature_selector

        # Metrics class for classification evaluation
        self.metrics = Metrics(x_train, x_test, y_train, y_test)

    def _train_and_evaluate(self, model, model_name="Model", plot=True):
        """Train a classifier and evaluate using ROC-AUC, F1, Precision, Recall."""
        # Apply feature selection if provided
        if self.feature_selector:
            self.x_train = self.feature_selector.fit_transform(self.x_train, self.y_train)
            self.x_test = self.feature_selector.transform(self.x_test)

        # Fit the model
        model.fit(self.x_train, self.y_train)

        # Predict on test data
        y_pred = model.predict(self.x_test)
        y_proba = model.predict_proba(self.x_test)[:, 1] if hasattr(model, "predict_proba") else None

        # Compute classification metrics
        metrics = self.metrics.get_classification_metrics(
            self.y_test, y_pred, y_proba=y_proba, model_name=model_name, plot=plot
        )
        return model, metrics

    def random_forest(self, **kwargs):
        return self._train_and_evaluate(
            RandomForestClassifier(**kwargs), model_name="Random Forest"
        )

    def xgboost(self, **kwargs):
        return self._train_and_evaluate(
            xgb.XGBClassifier(**kwargs), model_name="XGBoost"
        )

    def lightgbm(self, **kwargs):
        return self._train_and_evaluate(
            lgb.LGBMClassifier(**kwargs), model_name="LightGBM"
        )

    def catboost(self, **kwargs):
        return self._train_and_evaluate(
            CatBoostClassifier(verbose=0, **kwargs), model_name="CatBoost"
        )

    # def stacking_classifier(self, estimators=None, final_estimator=None, **kwargs):
    #     """
    #     Build, train, and evaluate a stacking classifier.
    #     Default base estimators: Random Forest + XGBoost
    #     Default meta-model: Logistic Regression
    #     """
    #     from sklearn.ensemble import StackingClassifier, RandomForestClassifier
    #     from sklearn.linear_model import LogisticRegression
    #     import xgboost as xgb

    #     # Default base models
    #     if estimators is None:
    #         estimators = [
    #             ('rf', RandomForestClassifier(n_estimators=200, random_state=42)),
    #             ('xgb', xgb.XGBClassifier(n_estimators=300, learning_rate=0.1, random_state=42))
    #         ]

    #     # Default meta-model
    #     if final_estimator is None:
    #         final_estimator = LogisticRegression()

    #     stack_model = StackingClassifier(
    #         estimators=estimators,
    #         final_estimator=final_estimator,
    #         **kwargs
    #     )

    #     return self._train_and_evaluate(stack_model, model_name="Stacking Classifier")


# Pre-processing Pipeline

In [20]:
class PreprocessorPipeline:
    """End-to-End Pipeline for Classification: Preprocessing, Feature Engineering, Model Training."""

    def __init__(self, cleaned_data, save_metrics=False):
        self.cleaned_data = cleaned_data
        self.x_train = self.x_test = self.y_train = self.y_test = pd.DataFrame()
        self.x_train_orig = self.x_test_orig = pd.DataFrame()
        self.model_results = []
        self.save = save_metrics
        self.validation_df = pd.DataFrame()

        self.perform_data_spliting()

    # ---------------- Data Splitting ----------------
    def perform_data_spliting(self, test_size=0.2, random_state=42):
        """Split into train and test sets."""
        splitter = DataSplitter(cleaned_data=self.cleaned_data)
        self.validation_df = splitter.current_df
        self.x_train, self.x_test, self.y_train, self.y_test = splitter.split_train_test()

        self.x_train_orig = self.x_train.copy()
        self.x_test_orig = self.x_test.copy()
        return self.x_train, self.x_test, self.y_train, self.y_test

    # ---------------- Feature Encoding ----------------
    def perform_feature_encoding(self, categorical_columns: list):
        """Encode categorical features using one-hot encoding."""
        if categorical_columns:
            print(f"\n🧩 Encoding categorical columns: {categorical_columns}")
            encoder = FeatureEngineer(self.x_train)
            self.x_train, transformer = encoder.one_hot_encode(categorical_columns)

            transformed_test = transformer.transform(self.x_test)
            if hasattr(transformed_test, "toarray"):
                transformed_test = transformed_test.toarray()
            feature_names = transformer.get_feature_names_out()
            self.x_test = pd.DataFrame(transformed_test, columns=feature_names, index=self.x_test.index)

    # ---------------- Feature Scaling ----------------
    def perform_features_scaling(self, method='standard'):
        """Scale numeric features using StandardScaler or MinMaxScaler."""
        print(f"\n⚖️ Scaling features using: {method}")
        scaler = FeatureScaler(self.x_train, self.x_test)
        if method == 'standard':
            self.x_train, self.x_test = scaler.standard_scaling()
        elif method == 'minmax':
            self.x_train, self.x_test = scaler.min_max_scaling()

    # ---------------- Feature Selection ----------------
    def perform_feature_selection(self, method='k_best', k=10):
        """Select top k features to reduce dimensionality."""
        print(f"\n📌 Selecting top {k} features using {method}...")
        selector = FeatureSelector(self.x_train, self.x_test, self.y_train)
        if method == 'k_best':
            self.x_train, self.x_test = selector.select_k_best(k=k)
        elif method == 'rfe':
            self.x_train, self.x_test = selector.recursive_feature_elimination(n_features_to_select=k)

    # ---------------- Feature Engineering ----------------
    def perform_feature_engineering(self, use_polynomial=False, poly_degree=2):
        """Optionally add polynomial features."""
        if use_polynomial:
            print(f"\n🔧 Applying polynomial feature engineering (degree={poly_degree})")
            fe_train = FeatureEngineer(pd.DataFrame(self.x_train))
            self.x_train = fe_train.add_polynomial_features(degree=poly_degree)
            fe_test = FeatureEngineer(pd.DataFrame(self.x_test))
            self.x_test = fe_test.add_polynomial_features(degree=poly_degree)

    # ---------------- Model Training ----------------
    def create_models(self, save_model=False):
        """Train and evaluate multiple classification models."""
        print("\n🤖 Training classification models...")
        builder = BuildClassifierModels(self.x_train, self.x_test, self.y_train, self.y_test)

        for name, func in {
            # 'RandomForest': builder.random_forest,
            # 'XGBoost': builder.xgboost,
            # 'LightGBM': builder.lightgbm,
            'CatBoost': builder.catboost,
            # 'Stacking': builder.stacking_classifier
        }.items():
            model, metrics = func()
            self.model_results.append({"Model": name, **metrics})
        self.save_and_display_results()

    # ---------------- Save & Display ----------------
    def save_and_display_results(self, filename="classification_results.csv"):
        """Save results to CSV and print Markdown table."""
        df = pd.DataFrame(self.model_results).round(3)
        if self.save:
            df.to_csv(filename, index=False)
            print(f"\n📂 Results saved to {filename}")
        print("\n### 📊 Classification Model Performance\n")
        print(df.to_markdown(index=False))

    # ---------------- Create & Save Best Model Pipeline ----------------
    def create_best_model_pipeline(self, best_model=None, save_model=True, model_name="CatBoost"):
        """
        Build and save a scikit-learn pipeline that includes preprocessing + best model.
        If best_model is None, defaults to CatBoostClassifier.
        """
        # 1️⃣ Detect column types
        categorical_cols = self.x_train_orig.select_dtypes(exclude='number').columns.tolist()
        numeric_cols = self.x_train_orig.select_dtypes(include='number').columns.tolist()

        # 2️⃣ Column transformer
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', StandardScaler(), numeric_cols),
                ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
            ]
        )

        # 3️⃣ Choose model
        if best_model is None:
            best_model = CatBoostClassifier(verbose=0, random_state=42)

        # 4️⃣ Combine
        full_pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('model', best_model)
        ])

        # 5️⃣ Fit pipeline
        full_pipeline.fit(self.x_train_orig, self.y_train)

        # 6️⃣ Evaluate
        acc = full_pipeline.score(self.x_test_orig, self.y_test)
        print(f"✅ {model_name} pipeline trained. Test Accuracy: {acc:.4f}")

        # 7️⃣ Save
        if save_model:
            filename = f"{model_name}_pipeline.joblib"
            joblib.dump(full_pipeline, filename)
            print(f"📂 Pipeline saved to {filename}")

        return full_pipeline

# Create an instance of the pipeline

In [21]:
# create pipeline instance
pipeline = PreprocessorPipeline(cleaned_data=cleaned_accepted_df)

# 🕸️ Train Models with some numerical variables as input features (most important)

In [22]:
# # Using some important numeric columns
# numeric_features = [
#     "loan_amnt",
#     "funded_amnt",
#     "funded_amnt_inv",
#     "annual_inc",
#     "dti",
#     "fico_range_low",
#     "fico_range_high",
#     "open_acc",
#     "revol_bal",
#     "total_acc",
#     "acc_open_past_24mths",
#     "avg_cur_bal",
#     "bc_open_to_buy",
#     "bc_util",
#     "delinq_2yrs",
#     "inq_last_6mths",
#     "installment",
#     "mo_sin_old_il_acct",
#     "tot_cur_bal"
# ]


# pipeline.x_train = pipeline.x_train_orig[numeric_features]
# pipeline.x_test = pipeline.x_test_orig[numeric_features]

In [23]:
# perform feature scaling
# pipeline.perform_features_scaling(method='standard')

In [24]:
# # Clear model results for next models
# pipeline.model_results = []

# # train models
# pipeline.create_models()

# 🕸️ Train models that uses all numeric features

In [25]:
# # build with all numeric feautures
# pipeline.x_train = pipeline.x_train_orig.select_dtypes(include='number')
# pipeline.x_test = pipeline.x_test_orig.select_dtypes(include='number')


In [26]:
# # perform feature scaling
# pipeline.perform_features_scaling(method='standard')

In [27]:
# # # Clear model results for next models
# pipeline.model_results = []

# # # train models
# pipeline.create_models()

# 🕸️ Train models that uses all relevant input variables (both categorical and numerical)

In [28]:
# # perform features encoding
# pipeline.x_train = pipeline.x_train_orig
# pipeline.x_test = pipeline.x_test_orig

# categorical_columns = list(pipeline.x_train.select_dtypes(exclude='number').columns)
# print(pipeline.x_train.shape)
# pipeline.perform_feature_encoding(categorical_columns=categorical_columns)

In [29]:
# # perform feature scaling
# pipeline.perform_features_scaling(method='standard')

In [30]:
# # Clear model results for next models
# pipeline.model_results = []

# # train models
# pipeline.create_models(save_model=True)

# 🧑‍💻 Perform HPO on BEST MODEL

> For now the model is fine, no need for HPO

# ❄️ Develop an Artificial Neural Network (ANN)
If the performance after HPO is still not good, build an ANN

> For now the CatBoost is perfoming really good

# 👏✅ Create the Best Model with best HPO, PipeLine

In [31]:
best_pipe = pipeline.create_best_model_pipeline()

✅ CatBoost pipeline trained. Test Accuracy: 0.9992
📂 Pipeline saved to CatBoost_pipeline.joblib


# ✅🕸️ Use the model to Predict new data

In [32]:
class Predictor:
    """Predictor for dataframes using a saved pipeline/model with schema and feature count validation."""

    def __init__(self, model_filename: str,
                 df_schema: pd.DataFrame,
                 expected_feature_count = None):
        """
        Parameters:
        - model_filename: str, path to saved pipeline (.joblib)
        - df_schema: pandas.DataFrame, dataframe with expected columns and types
        - expected_feature_count: int, optional, expected number of features after preprocessing
        """
        self.model_filename = model_filename
        self.model = joblib.load(model_filename)

        # schema from training data
        self.schema_cols = df_schema.columns.tolist()
        self.schema_dtypes = df_schema.dtypes.to_dict()

        self.expected_feature_count = expected_feature_count

    def predict_df(self, df: pd.DataFrame,
                   add_proba: bool = True,
                   col_pred: str = "Prediction",
                   col_proba: str = "Prediction_Proba") -> pd.DataFrame:
        """
        Predict for a DataFrame and return it with added prediction columns.
        """

        # extract input
        X = df[self.schema_cols]

        # if model is pipeline it will transform internally
        # but we still can check the transformed shape
        if hasattr(self.model, "named_steps"):  # pipeline
            # only transform to check feature count
            transformed = self.model.named_steps['preprocessor'].transform(X)
        else:
            transformed = X

        if self.expected_feature_count is not None:
            n_features = transformed.shape[1]
            if n_features != self.expected_feature_count:
                raise ValueError(
                    f"Feature count mismatch: expected {self.expected_feature_count}, got {n_features}"
                )

        # final prediction using the model
        y_pred = self.model.predict(X)

        result_df = df.copy()
        result_df[col_pred] = y_pred

        if add_proba and hasattr(self.model, "predict_proba"):
            result_df[col_proba] = self.model.predict_proba(X)[:, 1]

        return result_df



# Create Predictor

In [33]:
# expected number of features after encoding/scaling:
expected_features = 148

validation_data = pipeline.validation_df.copy()
validation_data = validation_data.drop(columns=["loan_status"])

validation_data.shape

(878317, 81)

In [34]:
predictor = Predictor(
    model_filename="CatBoost_pipeline.joblib",
    df_schema=pipeline.x_train_orig,
    expected_feature_count=expected_features
)

predicted_df = predictor.predict_df(validation_data)


In [42]:
# predicted_df with only  Prediction = 0
predicted_df[predicted_df["Prediction"] == 0]


Unnamed: 0,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,hardship_flag,disbursement_method,debt_settlement_flag,Prediction,Prediction_Proba
1108,0.0,507075.0,96537.0,71100.0,59878.0,N,Cash,N,0,0.000143
2437,0.0,25197.0,12320.0,3500.0,21697.0,N,Cash,N,0,0.005966
3210,0.0,332486.0,319654.0,21000.0,289886.0,N,Cash,N,0,0.000117
4605,0.0,87587.0,83506.0,28700.0,49587.0,N,Cash,N,0,0.000195
7478,0.0,620448.0,217736.0,26400.0,183173.0,N,Cash,N,0,0.000870
...,...,...,...,...,...,...,...,...,...,...
2257790,0.0,38410.0,23843.0,6200.0,27910.0,N,Cash,N,0,0.000028
2258849,0.0,163483.0,5356.0,6500.0,0.0,N,Cash,N,0,0.000296
2259403,0.0,53588.0,19606.0,7800.0,35988.0,N,Cash,N,0,0.000606
2259733,0.0,185402.0,175962.0,9500.0,164902.0,N,Cash,N,0,0.000066
