In [None]:
from pathlib import Path
import sys
current_path = Path.cwd()  
project_root = current_path.parent 
sys.path.insert(0, str(project_root))

In [None]:
from src.utils.Data_Loader import data_loader
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder,RobustScaler,OrdinalEncoder
from sklearn.preprocessing import FunctionTransformer


In [None]:
x_train, x_val, x_test, y_train, y_val, y_test= data_loader()

In [None]:
x_train.shape, x_val.shape, x_test.shape, y_train.shape, y_val.shape, y_test.shape

In [None]:
x_test.iloc[0]

In [None]:
y_test.iloc[0]

In [None]:
datasets = {
    "x_train": x_train,
    "x_val": x_val,
    "x_test": x_test,
    "y_train": y_train,
    "y_val": y_val,
    "y_test": y_test,
}

for name, ds in datasets.items():
    has_nan = ds.isnull().any() if hasattr(ds, "isnull") else False
    if hasattr(has_nan, "any"):
        has_nan = has_nan.any()
    if has_nan:
        print(f"missing values in {name}")
    else:
        print(f"no missing values in {name}")

In [None]:
# from scipy.stats import ks_2samp
# import pandas as pd

# # Simple drift check (KS test) for numeric features

# def drift_report(train_df, other_df, alpha=0.01):
#     numeric_cols = train_df.select_dtypes(exclude=["object", "category"]).columns
#     rows = []
#     for col in numeric_cols:
#         train_col = train_df[col].dropna()
#         other_col = other_df[col].dropna()
#         if len(train_col) == 0 or len(other_col) == 0:
#             rows.append((col, float("nan"), False))
#             continue
#         stat, pval = ks_2samp(train_col, other_col)
#         rows.append((col, pval, pval < alpha))
#     report = pd.DataFrame(rows, columns=["feature", "p_value", "drift_flag"])
#     return report.sort_values("p_value")

# print("Drift vs val (alpha=0.01)")
# print(drift_report(x_train, x_val))
# print("\nDrift vs test (alpha=0.01)")
# print(drift_report(x_train, x_test))


In [None]:
import pandas as pd

def build_preprocessor(X):
    """
    Build preprocessing pipeline with separate handling for:
    - Continuous numeric features (log + scale)
    - Binary flags (no log, just scale)
    - Ordinal features
    - Categorical features
    """
    
    # Define original columns
    yesNoColumns = ["HasMortgage", "HasDependents", "HasCoSigner"]
    categorical_features = list(set(X.select_dtypes(include=['object'])) - set(yesNoColumns))
    
    # Original numeric features
    original_numeric = [
        'Age', 'Income', 'LoanAmount', 'CreditScore', 
        'MonthsEmployed', 'NumCreditLines', 'InterestRate', 
        'LoanTerm', 'DTIRatio'
    ]
    
    

    
    # Continuous numeric transformer (with log)
    continuous_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())  # Better for outliers
    ])
    
    # Binary transformer (NO log, just impute)

    
    # Yes/No columns
    yesNoColumns_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('ordinal', OrdinalEncoder())
    ])
    
    # Categorical
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    # Combine all
    preprocessor = ColumnTransformer(
        transformers=[
            ('continuous', continuous_transformer, original_numeric ),
            ('yesNo', yesNoColumns_transformer, yesNoColumns),
            ('cat', categorical_transformer, categorical_features)
        ],
    )
    
    
    return preprocessor

preprocessor = build_preprocessor(x_train)
x_train_processed = preprocessor.fit_transform(x_train)
x_val_processed = preprocessor.transform(x_val)
x_test_processed = preprocessor.transform(x_test)



In [None]:
x_train_processed.shape, x_val_processed.shape, x_test_processed.shape

In [None]:
num_neg = (y_train == 0).sum()
num_pos = (y_train == 1).sum()
scale_pos_weight_value = num_neg / num_pos
scale_pos_weight_value

In [None]:
import xgboost as xgb
from sklearn.metrics import roc_auc_score

In [None]:
# try raw xgb
xgb_model = xgb.XGBClassifier(
    objective='binary:logistic',
    eval_metric='auc',
    scale_pos_weight=scale_pos_weight_value,
    random_state=42,
    verbosity=0,

)
xgb_model.fit(x_train_processed, y_train, eval_set=[(x_val_processed, y_val)],verbose=False)

# Evaluate on train ,test, val
def evaluate_model(model, X, y, dataset_name="Dataset"):
    y_pred_proba = model.predict_proba(X)[:, 1]
    auc = roc_auc_score(y, y_pred_proba)
    print(f"AUC on {dataset_name}: {auc:.4f}")

evaluate_model(xgb_model, x_train_processed, y_train, "Train")
evaluate_model(xgb_model, x_val_processed, y_val, "Validation")
evaluate_model(xgb_model, x_test_processed, y_test, "Test")

In [None]:
# roc curve
import matplotlib.pyplot as plt
from sklearn.metrics import RocCurveDisplay
RocCurveDisplay.from_estimator(
    xgb_model,
    x_val_processed,
    y_val,
    name="XGBoost ROC Curve"
)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.show()

In [None]:
# catboost
from catboost import CatBoostClassifier
catboost_model = CatBoostClassifier(
    iterations=1000,
    learning_rate=0.05,
    depth=6,
    eval_metric='Logloss',
    scale_pos_weight=scale_pos_weight_value,
    random_seed=42,
    
    verbose=0
)
catboost_model.fit(x_train_processed, y_train , eval_set=(x_val_processed, y_val))
evaluate_model(catboost_model, x_train_processed, y_train, "Train")
evaluate_model(catboost_model, x_val_processed, y_val, "Validation")
evaluate_model(catboost_model, x_test_processed, y_test, "Test")

In [None]:
# catboost classfication report
from sklearn.metrics import classification_report
y_val_pred = catboost_model.predict(x_val_processed)
print(classification_report(y_val, y_val_pred, digits=4))


In [None]:
RocCurveDisplay.from_estimator(
    catboost_model,
    x_val_processed,
    y_val,
    name="CatBoost ROC Curve"
)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.show()

In [None]:
# lightgbm
import lightgbm as lgb
lgb_model = lgb.LGBMClassifier(
    objective='binary',
    metric='auc',
    scale_pos_weight=scale_pos_weight_value,
    random_state=42,
    subsample_freq=1,
)
lgb_model.fit(x_train_processed, y_train,
               eval_set=[(x_val_processed, y_val)],
               callbacks=[#lgb.early_stopping(100), 
                          lgb.log_evaluation(period=0)])

In [None]:
# evaluate on train ,test, val
evaluate_model(lgb_model, x_train_processed, y_train, "Train")
evaluate_model(lgb_model, x_val_processed, y_val, "Validation")
evaluate_model(lgb_model, x_test_processed, y_test, "Test")

In [None]:
RocCurveDisplay.from_estimator(
    lgb_model,
    x_val_processed,
    y_val,
    name="lgb ROC Curve"
)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.show()

# tunning xgb

In [None]:
from sklearn.model_selection import StratifiedKFold
from optuna.samplers import TPESampler
from sklearn.metrics import roc_auc_score
import optuna
import numpy as np
import logging
logging.getLogger("optuna").setLevel(logging.WARNING)

In [None]:

class XgbTuner:
    """Hyperparameter tuning with proper AUC calculation and CV"""
    
    def __init__(self, X_train, y_train, X_val, y_val, scale_pos_weight, use_cv=True):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.scale_pos_weight = scale_pos_weight
        self.use_cv = use_cv

    def xgb_objective(self, trial):
        params = {
            "objective": "binary:logistic",
            "eval_metric": "auc",
            "random_state": 42,
            "scale_pos_weight": self.scale_pos_weight,
            "device":"cuda",
            "tree_method":"hist",
            "n_estimators": trial.suggest_int("n_estimators", 100, 500),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "gamma": trial.suggest_float("gamma", 0, 5),
            "min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
            "reg_lambda": trial.suggest_float("reg_lambda", 0.1, 10.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha", 0.0, 5.0)
        }

        if self.use_cv:
            # Use cross-validation on training data
            cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
            cv_scores = []
            
            for train_idx, val_idx in cv.split(self.X_train, self.y_train):
                X_tr, X_vl = self.X_train[train_idx], self.X_train[val_idx]
                y_tr, y_vl = self.y_train.iloc[train_idx], self.y_train.iloc[val_idx]
                
                model = xgb.XGBClassifier(**params, verbosity=0)
                model.fit(X_tr, y_tr)
                
                probs = model.predict_proba(X_vl)[:, 1]
                cv_scores.append(roc_auc_score(y_vl, probs))
            
            return float(np.mean(cv_scores))
        else:
            # Single validation set
            model = xgb.XGBClassifier(**params, verbosity=0)
            model.fit(self.X_train, self.y_train)
            
            # CRITICAL FIX: Use predict_proba
            probs = model.predict_proba(self.X_val)[:, 1]
            return float(roc_auc_score(self.y_val, probs))
    
    def tune_xgb(self, n_trials=100):
        sampler = TPESampler(seed=42)
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(func=self.xgb_objective, n_trials=n_trials, show_progress_bar=True)
        
        print(f"\nBest AUC: {study.best_trial.value:.4f}")
        return study.best_trial.params, study.best_trial.value


In [None]:
tuner_instance=XgbTuner(X_train=x_train_processed,y_train=y_train,X_val=x_val_processed,y_val=y_val,scale_pos_weight=scale_pos_weight_value)
best_params,best_score=tuner_instance.tune_xgb(n_trials=50)
print('Best auc Score from tuning:',best_score)

# tunning catboost

In [None]:
class CatBoostTuner:
    """Hyperparameter tuning for CatBoost with proper AUC calculation and CV"""
    
    def __init__(self, X_train, y_train, X_val, y_val, scale_pos_weight, use_cv=True):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.scale_pos_weight = scale_pos_weight
        self.use_cv = use_cv

    def catboost_objective(self, trial):
        params = {
            "loss_function": "Logloss",
            "eval_metric": "AUC",
            "random_seed": 42,
            "scale_pos_weight": self.scale_pos_weight,
            # "task_type": "GPU",
            # "devices": "0",
            "verbose": 0,
            "iterations": trial.suggest_int("iterations", 100, 1000),
            "depth": trial.suggest_int("depth", 4, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
            "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-3, 10.0, log=True),
            "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
            "random_strength": trial.suggest_float("random_strength", 1e-3, 10.0, log=True),
            "border_count": trial.suggest_int("border_count", 32, 255),
            "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 50),
            "grow_policy": trial.suggest_categorical("grow_policy", ["SymmetricTree", "Depthwise", "Lossguide"]),
        }
        
        # Add max_leaves only for Lossguide policy
        if params["grow_policy"] == "Lossguide":
            params["max_leaves"] = trial.suggest_int("max_leaves", 16, 64)

        if self.use_cv:
            # Use cross-validation on training data
            cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
            cv_scores = []
            
            for train_idx, val_idx in cv.split(self.X_train, self.y_train):
                X_tr, X_vl = self.X_train[train_idx], self.X_train[val_idx]
                y_tr, y_vl = self.y_train.iloc[train_idx], self.y_train.iloc[val_idx]
                
                model = CatBoostClassifier(**params)
                model.fit(X_tr, y_tr, verbose=0)
                
                probs = model.predict_proba(X_vl)[:, 1]
                cv_scores.append(roc_auc_score(y_vl, probs))
            
            return float(np.mean(cv_scores))
        else:
            # Single validation set
            model = CatBoostClassifier(**params)
            model.fit(self.X_train, self.y_train, verbose=0)
            
            probs = model.predict_proba(self.X_val)[:, 1]
            return float(roc_auc_score(self.y_val, probs))
    
    def tune_catboost(self, n_trials=100):
        sampler = TPESampler(seed=42)
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(func=self.catboost_objective, n_trials=n_trials, show_progress_bar=True)
        
        print(f"\nBest AUC: {study.best_trial.value:.4f}")
        return study.best_trial.params, study.best_trial.value

In [None]:
catboost_tuner = CatBoostTuner(
    X_train=x_train_processed,
    y_train=y_train,
    X_val=x_val_processed,
    y_val=y_val,
    scale_pos_weight=scale_pos_weight_value
)
catboost_best_params, catboost_best_score = catboost_tuner.tune_catboost(n_trials=50)
print('Best AUC Score from CatBoost tuning:', catboost_best_score)

# tunning lgbm

In [None]:
import pandas as pd
import numpy as np

feature_names = preprocessor.get_feature_names_out()

# Convert to DataFrames with feature names (ensure dense array for type safety)
x_train_processed = pd.DataFrame(np.asarray(x_train_processed), columns=feature_names, index=x_train.index)
x_val_processed = pd.DataFrame(np.asarray(x_val_processed), columns=feature_names, index=x_val.index)
x_test_processed = pd.DataFrame(np.asarray(x_test_processed), columns=feature_names, index=x_test.index)

print(f"Feature names ({len(feature_names)}):")
print(feature_names)

In [None]:
class LGBMTuner:
    """Hyperparameter tuning for LightGBM with proper AUC calculation and CV"""
    
    def __init__(self, X_train, y_train, X_val, y_val, scale_pos_weight, use_cv=True):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.scale_pos_weight = scale_pos_weight
        self.use_cv = use_cv

    def lgbm_objective(self, trial):
        params = {
            "objective": "binary",
            "metric": "auc",
            "random_state": 42,
            "scale_pos_weight": self.scale_pos_weight,
            # "device": "gpu",
            # "gpu_platform_id": 0,
            # "gpu_device_id": 0,
            "verbosity": -1,
            "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
            "max_depth": trial.suggest_int("max_depth", 3, 12),
            "num_leaves": trial.suggest_int("num_leaves", 20, 150),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
            "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 10.0, log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log=True),
            "min_split_gain": trial.suggest_float("min_split_gain", 0.0, 1.0),
            "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        }

        if self.use_cv:
            # Use cross-validation on training data
            cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
            cv_scores = []
            
            for train_idx, val_idx in cv.split(self.X_train, self.y_train):
                X_tr, X_vl = self.X_train.iloc[train_idx], self.X_train.iloc[val_idx]
                y_tr, y_vl = self.y_train.iloc[train_idx], self.y_train.iloc[val_idx]
                
                model = lgb.LGBMClassifier(**params)
                model.fit(X_tr, y_tr, callbacks=[lgb.log_evaluation(period=0)])
                
                probs = model.predict_proba(X_vl)[:, 1]
                cv_scores.append(roc_auc_score(y_vl, probs))
            
            return float(np.mean(cv_scores))
        else:
            # Single validation set
            model = lgb.LGBMClassifier(**params)
            model.fit(self.X_train, self.y_train, callbacks=[lgb.log_evaluation(period=0)])
            
            probs = model.predict_proba(self.X_val)[:, 1]
            return float(roc_auc_score(self.y_val, probs))
    
    def tune_lgbm(self, n_trials=100):
        sampler = TPESampler(seed=42)
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(func=self.lgbm_objective, n_trials=n_trials, show_progress_bar=True)
        
        print(f"\nBest AUC: {study.best_trial.value:.4f}")
        return study.best_trial.params, study.best_trial.value

In [None]:
lgbm_tuner = LGBMTuner(
    X_train=x_train_processed,
    y_train=y_train,
    X_val=x_val_processed,
    y_val=y_val,
    scale_pos_weight=scale_pos_weight_value
)
lgbm_best_params, lgbm_best_score = lgbm_tuner.tune_lgbm(n_trials=25)
print('Best AUC Score from LightGBM tuning:', lgbm_best_score)

In [None]:
# lightgbm
lgb_model = lgb.LGBMClassifier(
    objective='binary',
    metric='auc',
    scale_pos_weight=scale_pos_weight_value,
    random_state=42,
    **lgbm_best_params
)
lgb_model.fit(x_train_processed, y_train,
               eval_set=[(x_val_processed, y_val)],
               callbacks=[#lgb.early_stopping(100), 
                          lgb.log_evaluation(period=0)])

In [None]:
# evaluate on train ,test, val
evaluate_model(lgb_model, x_train_processed, y_train, "Train")
evaluate_model(lgb_model, x_val_processed, y_val, "Validation")
evaluate_model(lgb_model, x_test_processed, y_test, "Test")

In [None]:
RocCurveDisplay.from_estimator(
    lgb_model,
    x_val_processed,
    y_val,
    name="lgb ROC Curve"
)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.show()

# now we will using lgbm pipline is faster and auc gap is small