# Tuning of Loan Acceptance Model

First, all the necessary libraries are imported.

In [1]:
import pandas as pd
from collections import Counter
from sklearn.metrics import balanced_accuracy_score
from category_encoders import WOEEncoder
from sklearn.preprocessing import TargetEncoder
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from xgboost import XGBClassifier
import optuna
from optuna import Trial
import joblib
from helper_functions.ml_data_prep import (
    X_y_spilt,
    all_combinations,
    stratified_sample,
    FeatureDropper,
)

Data is loaded and appropriately splited. 

In [None]:
# Subsample training and validation if wanting to check the code
# data_train = pd.read_pickle("./data/data_train_mod1.pkl").pipe(
#     stratified_sample, frac=0.1
# )
# data_val = pd.read_pickle("./data/data_val_mod1.pkl").pipe(
#     stratified_sample, frac=0.1
# )
data_train = pd.read_pickle("./data/data_train_mod1.pkl")
data_val = pd.read_pickle("./data/data_val_mod1.pkl")
X_train, y_train = X_y_spilt(data_train)
X_val, y_val = X_y_spilt(data_val)
counter = Counter(y_train)
scale_pos_weight = counter[0] / counter[1]
X_train.dtypes

Necessary subfunctions for Optuna objective function defined.

In [3]:
Encoder = TargetEncoder | WOEEncoder | None
drop_combinations = all_combinations(["loan_amnt", "state", "purpose"])


def instantiate_column_dropper(trial: Trial) -> FeatureDropper:
    drop_subset = trial.suggest_int(
        "drop_subset", 0, len(drop_combinations) - 1
    )
    drop_cols = drop_combinations[drop_subset]
    return FeatureDropper(drop_features=drop_cols)


def instantiate_woe_encoder(trial: Trial) -> WOEEncoder:
    params = {
        "sigma": trial.suggest_float("sigma", 0.001, 5),
        "regularization": trial.suggest_float("regularization", 0, 5),
        "randomized": trial.suggest_categorical("randomized", [True, False]),
    }
    return WOEEncoder(**params)


def instantiate_target_encoder(trial: Trial) -> TargetEncoder:
    return TargetEncoder(random_state=42)


def instantiate_state_encoder(trial: Trial) -> Encoder:
    state_encoders = trial.suggest_categorical(
        "state_encoders", ["target", "woe", "none"]
    )
    if state_encoders == "target":
        state_encoder = instantiate_target_encoder(trial)
    elif state_encoders == "woe":
        state_encoder = instantiate_woe_encoder(trial)
    elif state_encoders == "none":
        state_encoder = None
    return state_encoder


def instantiate_purpose_encoder(trial: Trial) -> Encoder:
    purpose_encoders = trial.suggest_categorical(
        "purpose_encoders", ["target", "woe", "none"]
    )
    if purpose_encoders == "target":
        purpose_encoder = instantiate_target_encoder(trial)
    elif purpose_encoders == "woe":
        purpose_encoder = instantiate_woe_encoder(trial)
    elif purpose_encoders == "none":
        purpose_encoder = None
    return purpose_encoder


def instantiate_risk_encoder(trial: Trial) -> Encoder:
    risk_encoders = trial.suggest_categorical(
        "risk_encoders", ["target", "woe", "none"]
    )
    if risk_encoders == "target":
        risk_encoder = instantiate_target_encoder(trial)
    elif risk_encoders == "woe":
        risk_encoder = instantiate_woe_encoder(trial)
    elif risk_encoders == "none":
        risk_encoder = None
    return risk_encoder


def instantiate_state_transformer(trial: Trial) -> Pipeline:
    return Pipeline([("encoder", instantiate_state_encoder(trial))])


def instantiate_purpose_transformer(trial: Trial) -> Pipeline:
    return Pipeline([("encoder", instantiate_purpose_encoder(trial))])


def instantiate_risk_transformer(trial: Trial) -> Pipeline:
    return Pipeline([("encoder", instantiate_risk_encoder(trial))])


def instantiate_processor(trial: Trial) -> ColumnTransformer:
    processor = ColumnTransformer(
        transformers=[
            (
                "state",
                instantiate_state_transformer(trial),
                make_column_selector(pattern="state"),
            ),
            (
                "purpose",
                instantiate_purpose_transformer(trial),
                make_column_selector(pattern="purpose"),
            ),
            (
                "risk",
                instantiate_risk_transformer(trial),
                make_column_selector(pattern="risk"),
            ),
        ],
        remainder="passthrough",
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")
    return processor


def instantiate_xgboost(trial: Trial) -> XGBClassifier:
    params = {
        "verbosity": 0,
        "n_jobs": 4,
        "random_state": 42,
        "objective": "binary:logistic",
        "booster": "gbtree",
        "tree_method": "hist",
        "grow_policy": "depthwise",
        "eval_metric": "auc",
        "early_stopping_rounds": 5,
        "enable_categorical": True,
        "max_cat_to_onehot": 10,
        "scale_pos_weight": trial.suggest_categorical(
            "scale_pos_weight", [10, 20]
        ),
        "max_depth": trial.suggest_int("max_depth", 3, 15, step=2),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
        "subsample": trial.suggest_float("subsample", 0.2, 0.8, step=0.1),
        "colsample_bytree": trial.suggest_float(
            "colsample_bytree", 0.5, 1.0, step=0.1
        ),
        "gamma": trial.suggest_float("gamma", 1e-4, 10, log=True),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10, log=True),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10, log=True),
    }

    pruning_callback = optuna.integration.XGBoostPruningCallback(
        trial, "validation_0-auc"
    )
    return XGBClassifier(**params, callbacks=[pruning_callback])


def instantiate_model(trial: Trial) -> Pipeline:
    pipeline = Pipeline(
        [
            ("drop", instantiate_column_dropper(trial)),
            ("processor", instantiate_processor(trial)),
            ("estimator", instantiate_xgboost(trial)),
        ]
    )
    return pipeline

Optuna study defined and run.

In [4]:
def objective(trial: Trial) -> float:
    model = instantiate_model(trial)
    prepro = model[:-1].fit(X_train, y_train)
    X_val_prepro = prepro.transform(X_val)
    model.fit(
        X_train,
        y_train,
        estimator__eval_set=[(X_val_prepro, y_val)],
        estimator__verbose=False,
    )
    trial.set_user_attr("n_estimators", model[-1].best_iteration + 1)
    trial.set_user_attr("roc_auc", model[-1].best_score)
    y_pred = model.predict(X_val)
    balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
    return balanced_accuracy


pruner = optuna.pruners.MedianPruner(n_warmup_steps=5, n_min_trials=3)
study = optuna.create_study(
    study_name="acceptance_model_hypertune",
    storage="sqlite:///acceptance_model_hypertune.db",
    load_if_exists=True,
    pruner=pruner,
    direction="maximize",
)
study.optimize(objective, n_trials=100)
best_trial = study.best_trial
n_estimators = best_trial.user_attrs["n_estimators"]
print("Best balanced accuracy value:", study.best_value)
print("Best ROC AUC value:", best_trial.user_attrs["roc_auc"])
print("Best parameters:")
display(study.best_params)

[I 2024-01-29 00:12:36,938] A new study created in RDB with name: model1_hypertune
[I 2024-01-29 00:13:37,440] Trial 0 finished with value: 0.9924136696992802 and parameters: {'drop_subset': 6, 'state_encoders': 'none', 'purpose_encoders': 'woe', 'sigma': 2.9164500480582007, 'regularization': 1.4676046125392395, 'randomized': False, 'risk_encoders': 'target', 'scale_pos_weight': 20, 'max_depth': 15, 'min_child_weight': 7, 'subsample': 0.6000000000000001, 'colsample_bytree': 1.0, 'gamma': 0.00046679403764890415, 'reg_lambda': 0.0023303989033038623, 'reg_alpha': 0.038352673382299564}. Best is trial 0 with value: 0.9924136696992802.
[I 2024-01-29 00:15:07,627] Trial 1 finished with value: 0.9118056114338022 and parameters: {'drop_subset': 5, 'state_encoders': 'woe', 'sigma': 2.896953904613107, 'regularization': 4.998571232410977, 'randomized': True, 'purpose_encoders': 'woe', 'risk_encoders': 'woe', 'scale_pos_weight': 10, 'max_depth': 13, 'min_child_weight': 9, 'subsample': 0.2, 'colsamp

Best balanced accuracy value: 0.9945126042005958
Best ROC AUC value: 0.9994148526779225
Best parameters:


{'drop_subset': 2,
 'state_encoders': 'none',
 'purpose_encoders': 'woe',
 'sigma': 2.4452363975811644,
 'regularization': 2.5042270753252636,
 'randomized': False,
 'risk_encoders': 'woe',
 'scale_pos_weight': 20,
 'max_depth': 13,
 'min_child_weight': 9,
 'subsample': 0.30000000000000004,
 'colsample_bytree': 0.8,
 'gamma': 0.14437331372653028,
 'reg_lambda': 0.0025770871974866976,
 'reg_alpha': 0.01706433230460041}

The final tuned model obtained.

In [5]:
acceptance_model = instantiate_model(best_trial)
acceptance_model.set_params(
    estimator__n_estimators=n_estimators,
    estimator__early_stopping_rounds=None,
    estimator__callbacks=None,
)
acceptance_model

Model fitted to the whole training data (training and validation sets) and dumped for later use.

In [2]:
data_train = pd.read_pickle("./data/data_train_mod1.pkl")
data_val = pd.read_pickle("./data/data_val_mod1.pkl")
data_train = pd.concat([data_train, data_val])
X_train, y_train = X_y_spilt(data_train)
acceptance_model.fit(X_train, y_train)
joblib.dump(acceptance_model, "./tuned_models/acceptance_model.joblib")

['./tuned_models/acceptance_model.joblib']