In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold, train_test_split, StratifiedKFold, cross_val_predict, cross_val_score, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler, RobustScaler, OrdinalEncoder
from sklearn.metrics import roc_auc_score, precision_recall_curve, average_precision_score, f1_score
from sklearn.isotonic import IsotonicRegression

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

import optuna
from optuna.integration import XGBoostPruningCallback
from optuna.integration import LightGBMPruningCallback
from optuna.integration import CatBoostPruningCallback

# Import Loan Dataset

In [None]:
# import dataset
df_train = pd.read_csv("train.csv")
df_test = pd.read_csv("test.csv") # The testing dataset does not contain the target variable "loan_status"

In [None]:
# info on training dataset
print(df_train.columns, '\n')
print(df_train.shape)
display(df_train.describe())

In [None]:
# info on testing dataset
print(df_test.columns, '\n')
print(df_test.shape)
display(df_test.describe())

# Cleaning Dataset

## Checking for Nulls

In [None]:
df_train.isna().sum()

In [None]:
df_test.isna().sum()

## Remove unnecessary features

In [None]:
# Drop the 'id' column
df_train.drop('id', axis=1, inplace=True)
df_test.drop('id', axis=1, inplace=True)

In [None]:
df_train.head()

# EDA: Exploratory Data Analysis

Fields in the dataset:
- `person_age`
    - The age of the borrower
- `person_income`
    - The annual income of the borrower
- `person_home_ownership`
    - The home ownership status of the borrower
- `person_emp_length`
    - How long (in years) the borrower has been in employment
- `loan_intent`
    - The borrower's intended use of the loan
- `loan_grade`
    - The loan grade, measuring the loan default rate
- `loan_amnt`
    - Amount borrowed by the borrower
- `loan_int_rate`
    - Loan interest rate
- `loan_percent_income`
    - The ratio between `loan_amnt` and `person_income`
- `cb_person_default_on_file`
    - The borrower's prior default status
- `cb_person_cred_hist_length`
    - The length of the borrower's credit history
- `loan_status`
    - This is the target variable
    - `0` indicates non-default; `1` indicates default

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

EDA will be performed in Tableau
- Refer to https://www.kaggle.com/code/satyaprakashshukl/loan-approval-prediction/notebook, to give ideas on what charts to be generated

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

In [None]:
# Check the data type of each field
df_train.info()

In [None]:
# Check for data imbalance
df_train['loan_status'].value_counts()

The dataset is very imbalanced! This will be addressed later by applying class weights and threshold tuning.

# Feature Transformation

- One-hot-encoding -> Nomical categorical features
    - person_home_ownership, loan_intent, cb_person_default_on_file
- Ordinal Encoder -> Ordinal categorical features
    - loan_grade
- Robust Scaling -> Numerical features
    - person_age, person_income, person_emp_length, loan_amnt, loan_int_rate, loan_percent_income, cb_person_cred_hist_length
- Target variable
    - loan_status

In [None]:
df_train_trans = df_train.copy()
df_test_trans = df_test.copy()

In [None]:
def robust_scaling(df_train, df_test, feature_list):
    '''
    Use Robust Scaler to scale numerical features
    
    Input
    -----
    - df: DataFrame that stores all features
    - feature_list: List of features in df being transformed
    
    Output
    ------
    A subset of the orignial df, containing only the transformed features
    '''
    
    scaler = RobustScaler()
    scaled_df_train = scaler.fit_transform(df_train[feature_list])
    scaled_df_test = scaler.transform(df_test[feature_list])
    
    return scaled_df_train, scaled_df_test

In [None]:
# Perform robust scaling on numerical features
num_features = [
    'person_age', 'person_income', 'person_emp_length', 'loan_amnt', 'loan_int_rate', 'loan_percent_income', 'cb_person_cred_hist_length'
]

df_train_trans[num_features], df_test_trans[num_features] = robust_scaling(df_train, df_test, num_features)

In [None]:
# Perform one-hot encoding on nominal cateogrical features
nom_cat_features = [
    'person_home_ownership', 'loan_intent', 'cb_person_default_on_file'
]

df_train_trans = pd.get_dummies(df_train_trans, columns=nom_cat_features)
df_test_trans = pd.get_dummies(df_test_trans, columns=nom_cat_features)

In [None]:
# Perform ordinal encoding on ordinal cateogrical features
ord_cat_features = [
    'loan_grade'
]

ord_encoder = OrdinalEncoder(categories=[
    ['A', 'B', 'C', 'D', 'E', 'F', 'G']
])

df_train_trans[ord_cat_features] = ord_encoder.fit_transform(df_train[ord_cat_features])
df_test_trans[ord_cat_features] = ord_encoder.transform(df_test[ord_cat_features])

In [None]:
# Separate features and target variables
X_train, y_train = df_train_trans.drop('loan_status', axis=1), df_train_trans['loan_status']
X_test = df_test_trans

# Modelling

In [None]:
class ThresholdTunedClassifier():
    """
    A meta-estimator that:
      1) tunes hyperparameters via inner CV (RandomizedSearchCV, scoring=AP by default),
      2) produces OOF probabilities via outer CV,
      3) (optionally) learns a calibration mapping on OOF probs (isotonic or Platt),
      4) selects the F1-optimal decision threshold on (calibrated or raw) OOF probs,
      5) refits the best model on the full training set, and
      6) predicts with the frozen threshold.
    """

    def __init__(
        self,
        random_state=123,
        n_jobs=-1,
        verbose=0,
    ):
        self.random_state = random_state
        self.n_jobs = n_jobs
        self.verbose = verbose

        # Learned attributes after fit()/tuning
        self.best_estimator_ = None
        self.calibrator_ = None
        self.use_calibration_ = False
        self.threshold_ = 0.5

    # ------------ helpers ------------

    @staticmethod
    def _max_f1_threshold(y_true, y_proba):
        """
        Find decision threshold that maximizes F1.

        Note: precision_recall_curve returns precision/recall length (n_thr+1),
        thresholds length (n_thr). Align F1 to thresholds using f1[:-1].
        """
        prec, rec, thr = precision_recall_curve(y_true, y_proba)
        f1 = 2 * prec * rec / (prec + rec + 1e-12)

        if thr.size == 0:
            # degenerate case (e.g., constant scores); fall back to 0.5
            return 0.5, float(f1_score(y_true, (y_proba >= 0.5).astype(int)))

        idx = int(np.nanargmax(f1[:-1]))     # align with thresholds
        optimal_thr = float(thr[idx])
        optimal_f1  = float(f1[:-1][idx])
        return optimal_thr, optimal_f1

    def _fit_calibrator(self, y_proba, y_true):
        """
        Fit probability calibrator (isotonic or Platt).
        """
        if self.calibration_method == "isotonic":
            iso = IsotonicRegression(out_of_bounds="clip")
            iso.fit(y_proba, y_true)
            return iso
        elif self.calibration_method == "platt":
            lr = LogisticRegression(solver="lbfgs", max_iter=1000)
            lr.fit(y_proba.reshape(-1, 1), y_true)  # requires 2D
            return lr
        else:
            raise ValueError("calibration_method must be 'isotonic' or 'platt'")

    def _apply_calibrator(self, calibrator, y_proba):
        """
        Apply fitted calibrator to raw probabilities.
        """
        if calibrator is None:
            return y_proba
        if isinstance(calibrator, IsotonicRegression):
            return calibrator.transform(y_proba)
        elif isinstance(calibrator, LogisticRegression):
            return calibrator.predict_proba(y_proba.reshape(-1, 1))[:, 1]
        else:
            raise TypeError("Unknown calibrator type")

    # ------------ core API ------------

    def model_tuning(self, X, y, base_estimator, param_distributions, n_iter=1, n_folds=3, tuner_scoring="average_precision"):
        """
        Inner-CV hyperparameter tuning. Sets self.best_estimator_ and returns it.
        """
        self.base_estimator = base_estimator
        self.param_distributions = param_distributions
        self.n_iter = n_iter
        self.tuner_scoring = tuner_scoring

        cv = StratifiedKFold(
            n_splits=n_folds, shuffle=True, random_state=self.random_state
        )

        tuner = RandomizedSearchCV(
            estimator=self.base_estimator,
            param_distributions=self.param_distributions,
            n_iter=self.n_iter,
            scoring=self.tuner_scoring,
            cv=cv,
            n_jobs=self.n_jobs,
            random_state=self.random_state,
            verbose=self.verbose,
            refit=True,
        )
        tuner.fit(X, y)
        self.best_estimator_ = tuner.best_estimator_
        return self.best_estimator_
    
    def enter_best_model_config(self, best_model):
        """
        Manually define the hyper-parameter configuration for the best model
        """
        self.best_estimator_ = best_model
        return self.best_estimator_
    
    def decision_thr_tuning(self, X, y, n_folds=3):
        """
        Build OOF probabilities (using OUTER CV), compute F1-opt threshold,
        and return (oof_proba, thr_raw, f1_raw).
        """
        cv = StratifiedKFold(
            n_splits=n_folds, shuffle=True, random_state=self.random_state
        )

        oof_proba = cross_val_predict(
            self.best_estimator_, X, y, cv=cv, method="predict_proba", n_jobs=self.n_jobs
        )[:, 1]

        thr_raw, f1_raw = self._max_f1_threshold(y, oof_proba)
        self.threshold_ = thr_raw
        return oof_proba, thr_raw, f1_raw

    def calibrate_prob(self, y, oof_proba, thr_raw, f1_raw, calibration_method="isotonic"):
        """
        Calibration learned on OOF probs.
        """
        self.calibration_method = calibration_method

        calibrator = self._fit_calibrator(oof_proba, y)
        oof_proba_cal = self._apply_calibrator(calibrator, oof_proba)
        thr_cal, f1_cal = self._max_f1_threshold(y, oof_proba_cal)

        use_cal = f1_cal > f1_raw

        if use_cal:
            self.use_calibration_ = True
            self.calibrator_ = calibrator
            self.threshold_ = thr_cal
        else:
            self.use_calibration_ = False
            self.calibrator_ = None
            self.threshold_ = thr_raw

        return self.threshold_

    def model_training(self, X, y):
        if self.best_estimator_ is None:
            raise RuntimeError("Call model_tuning() before model_training().")
        self.best_estimator_.fit(X, y)

    def generate_oof_proba_preds(self, X, y, n_folds):
        # Compute OOF probabilities
        cv = StratifiedKFold(
            n_splits=n_folds, shuffle=True, random_state=self.random_state
        )
        y_proba_pred = cross_val_predict(
            self.best_estimator_, X, y, cv=cv, method="predict_proba", n_jobs=self.n_jobs
        )[:, 1]
        
        return y_proba_pred
    
    def analyze_performance(self, X, y, n_folds):
        # Compute OOF probabilities
        y_proba_pred = self.generate_oof_proba_preds(X, y, n_folds)
        
        # Apply calibration (if enabled)
        if self.use_calibration_:
            y_proba_eval = self._apply_calibrator(self.calibrator, y_proba_pred)
        else:
            y_proba_eval = y_proba_pred
        y_proba_eval = np.clip(y_proba_eval, 0.0, 1.0)
        
        # Metrics independent on Decision Threshold
        roc_auc = roc_auc_score(y, y_proba_pred)
        average_precision = average_precision_score(y, y_proba_pred)
        
        # Metrics dependent on Decision Threshold
        y_pred = (y_proba_eval >= self.threshold_).astype(int)
        f1_pos = f1_score(y, y_pred, pos_label=1, average="binary")  # class 1
        f1_neg = f1_score(y, y_pred, pos_label=0, average="binary")  # class 0
        f1_macro = f1_score(y, y_pred, average="macro")
        
        results = {
            "roc_auc": float(roc_auc),
            "average_precision": float(average_precision),
            "f1_class_1": float(f1_pos),
            "f1_class_0": float(f1_neg),
            "f1_macro": float(f1_macro),
        }
        self.last_cv_metrics_ = results
        
        return results
    
    def predict_proba(self, X):
        """
        Predict class probabilities; applies calibration if enabled.
        Returns shape (n_samples, 2) = [P(class 0), P(class 1)].
        """
        if self.best_estimator_ is None:
            raise RuntimeError("Call model_training() before predict_proba().")
        proba = self.best_estimator_.predict_proba(X)[:, 1]
        if self.use_calibration_:
            proba = self._apply_calibrator(self.calibrator_, proba)
        proba = np.clip(proba, 0.0, 1.0)
        return np.vstack([1 - proba, proba]).T

    def predict(self, X):
        """
        Thresholded class prediction using self.threshold_.
        """
        p = self.predict_proba(X)[:, 1]
        return (p >= self.threshold_).astype(int)

## Linear Regression

In [None]:
# Define a new ThresholdTunedClassifier object
logit = ThresholdTunedClassifier()

# Perform hyperparamater tuning
logit_base = LogisticRegression(max_iter=1000, class_weight="balanced", solver="saga", random_state=123)
logit_param_grid = {"C": np.logspace(-3, 3, 13), "penalty": ["l1", "l2"]}
logit.model_tuning(X_train, y_train, logit_base, logit_param_grid, n_iter=1, n_folds=3, tuner_scoring="average_precision")

In [None]:
# Tune Decision Threshold
logit_oof_proba, logit_thr_raw, logit_f1_raw = logit.decision_thr_tuning(X_train, y_train, n_folds=5)

In [None]:
# Calibrate the model
logit_thr_cal = logit.calibrate_prob(y_train, logit_oof_proba, logit_thr_raw, logit_f1_raw, calibration_method='isotonic')

In [None]:
# Train the optimal model with the best set of hyper-parameters
logit.model_training(X_train, y_train)

In [None]:
# Analyze model performance
logit.analyze_performance(X_train, y_train, n_folds=5)

## XGBoost

In [None]:
# Perform hyper-parameter tuning using Optunna
def xgb_objective(trial):
    # define model hyper-parameters and hyper-parameter search space
    scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() # From XGBoost documentation: the weight should be the ratio between the negative class and postiive class, not the other way around
    params = {
        "objective": 'binary:logistic',
        "eval_metric": ["aucpr"], # This has to be defined, because this is the metric that will be used during pruning
        "random_state": 98464,
        "scale_pos_weight": scale_pos_weight,
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 3e-1, log=True),
        "n_estimators": trial.suggest_int("n_estimators", 300, 1000, step=100),
        "max_depth": trial.suggest_int("max_depth", 3, 12),
        "min_child_weight": trial.suggest_float("min_child_weight", 1.0, 32.0, log=True),
        "gamma": trial.suggest_float("gamma", 0.0, 1.0),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "colsample_bynode": trial.suggest_float("colsample_bynode", 0.7, 1.0),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 1e+1, log=True),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 1e+1, log=True)
    }
    
    model = XGBClassifier(**params)

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=67)
    scores = []

    for train_idx, val_idx in cv.split(X_train, y_train):
        pruning_cb = XGBoostPruningCallback(trial, "validation_0-aucpr") # Boosting-round based pruning
        model.fit(
            X_train.iloc[train_idx,:], y_train.iloc[train_idx],
            eval_set=[(X_train.iloc[val_idx,:], y_train.iloc[val_idx])],
            callbacks=[pruning_cb],          # pruning each boosting round
            verbose=False
        )
        proba = model.predict_proba(X_train.iloc[val_idx,:])[:, 1]
        scores.append(average_precision_score(y_train.iloc[val_idx], proba))

    return float(np.mean(scores))

# Define the pruner
pruner = optuna.pruners.SuccessiveHalvingPruner(
    min_resource=1,          # start checking at first fold
    reduction_factor=3,      # roughly keep top 1/3 at each rung
    min_early_stopping_rate=0
)

# Perform hyper-parameter tuning
xgb_study = optuna.create_study(
    direction="maximize",
    sampler=optuna.samplers.TPESampler(seed=2025, n_startup_trials=20),
    pruner=pruner
)
xgb_study.optimize(xgb_objective, n_trials=200)

In [None]:
print("Best AP:", xgb_study.best_value)
print("Best params:", xgb_study.best_params)

In [None]:
# Define a new ThresholdTunedClassifier object
xgb = ThresholdTunedClassifier()

# Define the optimized model
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() # From XGBoost documentation: the weight should be the ratio between the negative class and postiive class, not the other way around
xgb.enter_best_model_config(XGBClassifier(
    objective = 'binary:logistic',
    eval_metric = ["aucpr"], # This has to be defined, because this is the metric that will be used during pruning
    random_state = 98464,
    scale_pos_weight = scale_pos_weight,
    **xgb_study.best_params
))

In [None]:
# Tune Decision Threshold
xgb_oof_proba, xgb_thr_raw, xgb_f1_raw = xgb.decision_thr_tuning(X_train, y_train, n_folds=5)

In [None]:
# Calibrate the model
xgb_thr_cal = xgb.calibrate_prob(y_train, xgb_oof_proba, xgb_thr_raw, xgb_f1_raw, calibration_method='isotonic')

In [None]:
# Train the optimal model with the best set of hyper-parameters
xgb.model_training(X_train, y_train)

In [None]:
# Analyze model performance
xgb.analyze_performance(X_train, y_train, n_folds=5)

In [None]:
# Perform probability OOF prediction on training dataset
xgb_train_proba_pred = xgb.generate_oof_proba_preds(X_train, y_train, n_folds=5)

In [None]:
# Perform probability prediction on testing dataset
xgb_test_proba_pred = xgb.predict_proba(X_test)[:,1]

## LightGBM

In [None]:
# Perform hyper-parameter tuning using Optunna
def lgbm_objective(trial):
    # define model hyper-parameters and hyper-parameter search space
    scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() # From XGBoost documentation: the weight should be the ratio between the negative class and postiive class, not the other way around
    
    boosting_type = trial.suggest_categorical("boosting_type", ["gbdt", "goss"])
    
    cond_params = {}
    if boosting_type == "gbdt":
        cond_params["subsample"] = trial.suggest_float("subsample", 0.7, 1.0)
        cond_params["bagging_freq"] = trial.suggest_int("bagging_freq", 1, 5)
    elif boosting_type == "goss":
        top_rate = trial.suggest_float("top_rate", 0.1, 0.5)
        other_rate = trial.suggest_float("other_rate", 0.1, 0.5)
        # Ensure top_rate + other_rate < 1
        other_rate = min(other_rate, 0.99-top_rate)
        cond_params["top_rate"] = top_rate
        cond_params["other_rate"] = other_rate
    
    params = {
        "objective": "binary",
        "metric": "average_precision", # ensures LGBM logs AP, as it is the metric monitored during Optuna's pruning
        "random_state": 42,
        "scale_pos_weight": scale_pos_weight,
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.2, log=True),
        "n_estimators": trial.suggest_int("n_estimators", 600, 1800, step=100),
        "num_leaves": trial.suggest_int("num_leaves", 31, 500, log=True),
        "max_depth": trial.suggest_int("max_depth", -1, 12), # max_depth=-1 lets LGBM cap by num_leaves.
        "min_child_samples": trial.suggest_int("min_child_samples", 10, 200),
        "min_sum_hessian_in_leaf": trial.suggest_float("min_sum_hessian_in_leaf", 1e-3, 10.0, log=True),
        "min_split_gain": trial.suggest_float("min_split_gain", 0.0, 0.5),
        "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True),
        "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
        "feature_fraction_bynode": trial.suggest_float("feature_fraction_bynode", 0.6, 1.0),
        "max_bin": trial.suggest_int("max_bin", 63, 300, step=32),
        **cond_params # Append the conditional hyper-parameters
    }
    
    model = LGBMClassifier(**params)

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=67)
    scores = []

    for train_idx, val_idx in cv.split(X_train, y_train):
        pruning_cb = LightGBMPruningCallback(trial, "average_precision") # Boosting-round based pruning
        model.fit(
            X_train.iloc[train_idx,:], y_train.iloc[train_idx],
            eval_set=[(X_train.iloc[val_idx,:], y_train.iloc[val_idx])],
            callbacks=[pruning_cb]          # pruning each boosting round
        )
        proba = model.predict_proba(X_train.iloc[val_idx,:])[:, 1]
        scores.append(average_precision_score(y_train.iloc[val_idx], proba))

    return float(np.mean(scores))

# Define the pruner
pruner = optuna.pruners.SuccessiveHalvingPruner(
    min_resource=1,          # start checking at first fold
    reduction_factor=3,      # roughly keep top 1/3 at each rung
    min_early_stopping_rate=0
)

# Perform hyper-parameter tuning
lgbm_study = optuna.create_study(
    direction="maximize",
    sampler=optuna.samplers.TPESampler(seed=2025, n_startup_trials=20),
    pruner=pruner
)
lgbm_study.optimize(lgbm_objective, n_trials=200)

In [None]:
print("Best AP:", lgbm_study.best_value)
print("Best params:", lgbm_study.best_params)

In [None]:
# Define a new ThresholdTunedClassifier object
lgbm = ThresholdTunedClassifier()

# Define the optimized model
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
lgbm.enter_best_model_config(LGBMClassifier(
    objective = "binary",
    metric = "average_precision",
    random_state = 42,
    scale_pos_weight = scale_pos_weight,
    **lgbm_study.best_params
))

In [None]:
# Tune Decision Threshold
lgbm_oof_proba, lgbm_thr_raw, lgbm_f1_raw = lgbm.decision_thr_tuning(X_train, y_train, n_folds=5)

In [None]:
# Calibrate the model
lgbm_thr_cal = lgbm.calibrate_prob(y_train, lgbm_oof_proba, lgbm_thr_raw, lgbm_f1_raw, calibration_method='isotonic')

In [None]:
# Train the optimal model with the best set of hyper-parameters
lgbm.model_training(X_train, y_train)

In [None]:
# Analyze model performance
lgbm.analyze_performance(X_train, y_train, n_folds=5)

In [None]:
# Perform probability OOF prediction on training dataset
lgbm_train_proba_pred = lgbm.generate_oof_proba_preds(X_train, y_train, n_folds=5)

In [None]:
# Perform probability prediction on testing dataset
lgbm_test_proba_pred = lgbm.predict_proba(X_test)[:,1]

## CatBoost

In [None]:
# Use pre-transformed categorical features (e.g. categorical features not encoded)
X_train_catb = df_train.copy().drop('loan_status', axis=1)
X_test_catb = df_test.copy()

In [None]:
# Identify categorical columns (i.e. of type "object" or "category")
catb_cols = X_train_catb.select_dtypes(include=['object', 'category']).columns.to_list()
catb_idx = [X_train_catb.columns.get_loc(col) for col in catb_cols]

In [None]:
# Perform hyper-parameter tuning using Optunna
def catb_objective(trial):
    # define model hyper-parameters and hyper-parameter search space
    scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
    class_weights = {0: 1.0, 1: scale_pos_weight}
    
    params = {
        "loss_function": 'Logloss',
        "eval_metric": 'PRAUC',           # Precision-Recall AUC
        "grow_policy": 'SymmetricTree',    # Default
        "bootstrap_type": 'Bayesian',     # Default
        "cat_features": catb_idx,          # list of categorical column indices
        "random_seed": 42,
        "class_weights": class_weights,
        'iterations': trial.suggest_int("iterations", 600, 1800, step=100),
        'learning_rate': trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
        'depth': trial.suggest_int("depth", 4, 10),
        'l2_leaf_reg': trial.suggest_float("l2_leaf_reg", 1e-2, 50.0, log=True),
        'random_strength': trial.suggest_float("random_strength", 0.0, 2.0),
        'bagging_temperature': trial.suggest_float("bagging_temperature", 0.0, 5.0),   # for Bayesian bootstrap
        'colsample_bylevel': trial.suggest_float("colsample_bylevel", 0.6, 1.0),
        'border_count': trial.suggest_int("border_count", 64, 255, step=32),
        'one_hot_max_size': trial.suggest_int("one_hot_max_size", 2, 10),  # threshold for OHE vs Ordered CTR
        'max_ctr_complexity': trial.suggest_int("max_ctr_complexity", 1, 2) # Ordered Combo CTR depth
    }
    
    
    model = CatBoostClassifier(**params)

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=67)
    scores = []

    for train_idx, val_idx in cv.split(X_train_catb, y_train):
        pruning_cb = CatBoostPruningCallback(trial, "PRAUC") # Boosting-round based pruning
        model.fit(
            X_train_catb.iloc[train_idx,:], y_train.iloc[train_idx],
            eval_set=[(X_train_catb.iloc[val_idx,:], y_train.iloc[val_idx])],
            callbacks=[pruning_cb]          # pruning each boosting round
        )
        proba = model.predict_proba(X_train_catb.iloc[val_idx,:])[:, 1]
        scores.append(average_precision_score(y_train.iloc[val_idx], proba))

    return float(np.mean(scores))

# Define the pruner
pruner = optuna.pruners.SuccessiveHalvingPruner(
    min_resource=1,          # start checking at first fold
    reduction_factor=3,      # roughly keep top 1/3 at each rung
    min_early_stopping_rate=0
)

# Perform hyper-parameter tuning
catb_study = optuna.create_study(
    direction="maximize",
    sampler=optuna.samplers.TPESampler(seed=2025, n_startup_trials=20),
    pruner=pruner
)
catb_study.optimize(catb_objective, n_trials=200)

In [None]:
print("Best AP:", catb_study.best_value)
print("Best params:", catb_study.best_params)

In [None]:
# Define a new ThresholdTunedClassifier object
catb = ThresholdTunedClassifier()

# Define the optimized model
# define model hyper-parameters and hyper-parameter search space
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
class_weights = {0: 1.0, 1: scale_pos_weight}
catb.enter_best_model_config(CatBoostClassifier(
    loss_function = 'Logloss',
    eval_metric = 'PRAUC',
    grow_policy = 'SymmetricTree',
    bootstrap_type = 'Bayesian',
    cat_features = catb_idx,
    random_seed = 42,
    class_weights = class_weights,
    **catb_study.best_params
))

In [None]:
# Tune Decision Threshold
catb_oof_proba, catb_thr_raw, catb_f1_raw = catb.decision_thr_tuning(X_train_catb, y_train, n_folds=5)

In [None]:
# Calibrate the model
catb_thr_cal = catb.calibrate_prob(y_train, catb_oof_proba, catb_thr_raw, catb_f1_raw, calibration_method='isotonic')

In [None]:
# Train the optimal model with the best set of hyper-parameters
catb.model_training(X_train_catb, y_train)

In [None]:
# Analyze model performance
catb.analyze_performance(X_train_catb, y_train, n_folds=5)

In [None]:
# Perform probability OOF prediction on training dataset
catb_train_proba_pred = catb.generate_oof_proba_preds(X_train_catb, y_train, n_folds=5)

In [None]:
# Perform probability prediction
catb_test_proba_pred = catb.predict_proba(X_test_catb)[:,1]

- Do I perform one-hot-encoding on the Categorical features? Or should I just leave them as it is?
- Do CatBoost treat ordinal and nominal categorical features the same way?
- Apply class weights during training

# Stacking

In [None]:
# Combine probability predicitons of the base models on trianing dataset
X_train_stacking = np.stack([xgb_train_proba_pred, lgbm_train_proba_pred, catb_train_proba_pred], axis=1)

In [None]:
# Define a new ThresholdTunedClassifier object
stacking_model = ThresholdTunedClassifier()

# Perform hyperparamater tuning
stacking_base = LogisticRegression(max_iter=1000, class_weight="balanced", solver="saga", random_state=123)
stacking_param_grid = {"C": np.logspace(-3, 3, 13), "penalty": ["l1", "l2"]}
stacking_model.model_tuning(X_train_stacking, y_train, stacking_base, stacking_param_grid, n_iter=20, n_folds=3, tuner_scoring="average_precision")

In [None]:
# Tune Decision Threshold
stacking_oof_proba, stacking_thr_raw, stacking_f1_raw = stacking_model.decision_thr_tuning(X_train_stacking, y_train, n_folds=5)

In [None]:
# Calibrate the model
stacking_thr_cal = stacking_model.calibrate_prob(y_train, stacking_oof_proba, stacking_thr_raw, stacking_f1_raw, calibration_method='isotonic')

In [None]:
# Train the optimal model with the best set of hyper-parameters
stacking_model.model_training(X_train_stacking, y_train)

In [None]:
# Analyze model performance
stacking_model.analyze_performance(X_train_stacking, y_train, n_folds=5)

## Prediction on Testing Dataset

Generate prediciton on testing dataset using the stacking model. Ready for submission!

In [None]:
# Combine probability predicitons of the base models on testing dataset
X_test_stacking = np.stack([xgb_test_proba_pred, lgbm_test_proba_pred, catb_test_proba_pred], axis=1)

In [None]:
# Class prediction on testing dataset
stacking_model.predict(X_test_stacking)