<a href="https://colab.research.google.com/github/Ayushi2001/Similarity-Comparison/blob/main/Copy_of_loan_default_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Loan Default Prediction — Modular ML Pipeline (Notebook Template)

This notebook is **data-agnostic** and modular. Set the path to your dataset and run all cells.

**How to use**
1. Put your dataset file at `/mnt/data/Dataset.csv` (or change `DATA_PATH` below).
2. (Optional) Put `Data_Dictionary.csv` next to it for quick reference.
3. Run cells top-to-bottom. The pipeline will:
   - Auto-detect the target column (with sensible fallbacks)
   - Build preprocessing and modeling pipelines
   - Handle class imbalance (class weights; optional SMOTE if available)
   - Evaluate and tune threshold
   - Log experiments to a CSV
   - Save artifacts (model + metadata)
   - Run basic unit tests


In [None]:

# === 0) Configuration ===
DATA_PATH = "/mnt/data/Dataset.csv"      # <-- change if needed
DATA_DICTIONARY_PATH = "/mnt/data/Data_Dictionary.csv"
TARGET_HINT = None   # e.g., "default" if you know the exact target column name
ARTIFACT_DIR = "/mnt/data/artifacts"
EXP_LOG = "/mnt/data/experiments.csv"

import os, json, math, warnings, joblib, numpy as np, pandas as pd
from pathlib import Path
warnings.filterwarnings("ignore")

Path(ARTIFACT_DIR).mkdir(parents=True, exist_ok=True)

print("Configured paths:")
print("DATA_PATH:", DATA_PATH)
print("DATA_DICTIONARY_PATH:", DATA_DICTIONARY_PATH)
print("ARTIFACT_DIR:", ARTIFACT_DIR)
print("EXP_LOG:", EXP_LOG)


In [None]:

# === 1) Imports ===
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import (roc_auc_score, precision_recall_curve, roc_curve,
                             classification_report, confusion_matrix, f1_score, precision_score, recall_score)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

# Optional libs
try:
    from xgboost import XGBClassifier
    HAS_XGB = True
except Exception:
    HAS_XGB = False

try:
    from imblearn.over_sampling import SMOTE
    from imblearn.pipeline import Pipeline as ImbPipeline
    HAS_IMB = True
except Exception:
    HAS_IMB = False

print("Environment — XGB:", HAS_XGB, "| imblearn:", HAS_IMB)


In [None]:

# === 2) Load Data ===
import pandas as pd, numpy as np

df = pd.read_csv(DATA_PATH)
print("Shape:", df.shape)
display(df.head())

if os.path.exists(DATA_DICTIONARY_PATH):
    try:
        dd = pd.read_csv(DATA_DICTIONARY_PATH)
        print("\nData dictionary (first rows):")
        display(dd.head())
    except Exception as e:
        print("Could not read data dictionary:", e)

# Target detection
possible_targets = ["default","Default","loan_default","Loan_Default","is_default","target","TARGET","label"]
target_col = TARGET_HINT if TARGET_HINT in df.columns else None
if target_col is None:
    for c in df.columns:
        if c in possible_targets:
            target_col = c
            break
if target_col is None:
    for c in df.columns:
        unique_vals = df[c].dropna().unique()
        if len(unique_vals) <= 3 and set(map(str, unique_vals)).issubset(set(map(str, [0,1,"0","1","Y","N","y","n","Yes","No"]))):
            if "id" not in c.lower() and "date" not in c.lower():
                target_col = c
                break

assert target_col is not None, "❌ Could not auto-detect the target column. Set TARGET_HINT to the correct column name."

def normalize_binary(s):
    mapping = {"Y":1,"y":1,"Yes":1,"YES":1,"N":0,"n":0,"No":0,"NO":0,"1":1,"0":0}
    return s.map(lambda x: mapping.get(str(x), x)).astype(float)

uniq = df[target_col].dropna().unique()
if not set(np.unique(np.array(list(map(str, uniq))))).issubset(set(map(str, [0,1]))):
    df[target_col] = normalize_binary(df[target_col]).astype(int)
else:
    df[target_col] = df[target_col].astype(int)

print("Detected target:", target_col)
print("Target counts:\n", df[target_col].value_counts(dropna=False))


In [None]:

# === 3) Quick EDA ===
print("Missing values per column (top 20):")
display(df.isna().sum().sort_values(ascending=False).head(20))

pos_rate = df[target_col].mean()
print(f"Positive rate: {pos_rate:.3f}")


In [None]:

# === 4) Feature Columns & Types ===
id_like = [c for c in df.columns if any(k in c.lower() for k in ["id","uuid","guid"])]
feature_cols = [c for c in df.columns if c not in [target_col] + id_like]

X_raw = df[feature_cols].copy()
y = df[target_col].astype(int).values

num_cols = X_raw.select_dtypes(include=["number"]).columns.tolist()
cat_cols = X_raw.select_dtypes(exclude=["number"]).columns.tolist()
print("Numeric:", len(num_cols), "| Categorical:", len(cat_cols))


In [None]:

# === 5) Conditional Feature Engineering ===
import numpy as np
def add_engineered_features(df_in):
    df_out = df_in.copy()
    # Create ratios only if sources exist
    ratios = [
        ("loan_to_income", "loan_amount", "annual_income"),
        ("debt_to_income", "total_debt", "annual_income"),
        ("emi_to_income", "monthly_installment", "monthly_income"),
        ("credit_utilization_ratio", "revolving_balance", "revolving_limit"),
    ]
    for new_name, num, denom in ratios:
        if num in df_out.columns and denom in df_out.columns:
            with np.errstate(divide='ignore', invalid='ignore'):
                df_out[new_name] = (df_out[num].astype(float) /
                                    df_out[denom].replace(0, np.nan).astype(float)).fillna(0.0)
    return df_out

X = add_engineered_features(X_raw)
num_cols = X.select_dtypes(include=["number"]).columns.tolist()
cat_cols = X.select_dtypes(exclude=["number"]).columns.tolist()


In [None]:

# === 6) Preprocessing Pipeline ===
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

preprocess = ColumnTransformer(transformers=[
    ("num", numeric_transformer, num_cols),
    ("cat", categorical_transformer, cat_cols)
])


In [None]:

# === 7) Train/Test Split & Model Candidates ===
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

def make_pipeline(clf, use_smote=False):
    try:
        if use_smote and HAS_IMB:
            from imblearn.pipeline import Pipeline as ImbPipeline
            from imblearn.over_sampling import SMOTE
            return ImbPipeline([
                ("preprocess", preprocess),
                ("smote", SMOTE(random_state=42)),
                ("clf", clf),
            ])
    except Exception:
        pass
    return Pipeline([("preprocess", preprocess), ("clf", clf)])

candidates = [
    ("LogReg_weighted",
     make_pipeline(LogisticRegression(max_iter=200, class_weight="balanced"))),
    ("RandomForest",
     make_pipeline(RandomForestClassifier(n_estimators=300, class_weight="balanced_subsample", n_jobs=-1, random_state=42)))
]

try:
    if HAS_XGB:
        candidates.append(("XGBClassifier", make_pipeline(
            XGBClassifier(
                n_estimators=500, max_depth=5, learning_rate=0.05,
                subsample=0.8, colsample_bytree=0.8, objective="binary:logistic",
                eval_metric="logloss", random_state=42
            ))))
except Exception:
    pass

if HAS_IMB:
    candidates.append(("LogReg_SMOTE", make_pipeline(LogisticRegression(max_iter=200), use_smote=True)))

results = []
best_auc, best_model, best_name = -1, None, None
for name, pipe in candidates:
    pipe.fit(X_train, y_train)
    proba = pipe.predict_proba(X_test)[:,1]
    auc = roc_auc_score(y_test, proba)
    preds = (proba >= 0.5).astype(int)
    f1 = f1_score(y_test, preds)
    pr = precision_score(y_test, preds)
    re = recall_score(y_test, preds)
    results.append((name, auc, f1, pr, re))
    print(f"{name:16s} | AUC: {auc:.4f} | F1: {f1:.4f} | P: {pr:.4f} | R: {re:.4f}")
    if auc > best_auc:
        best_auc, best_model, best_name = auc, pipe, name

import pandas as pd
res_df = pd.DataFrame(results, columns=["model","auc","f1","precision","recall"]).sort_values("auc", ascending=False)
print("\nBest model:", best_name, "AUC:", best_auc)
display(res_df)


In [None]:

# === 8) Curves & Threshold Tuning ===
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, precision_recall_curve, classification_report, confusion_matrix

proba_best = best_model.predict_proba(X_test)[:,1]

fpr, tpr, thr = roc_curve(y_test, proba_best)
plt.figure()
plt.plot(fpr, tpr, label=f"ROC AUC = {roc_auc_score(y_test, proba_best):.3f}")
plt.plot([0,1],[0,1],'--')
plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("ROC"); plt.legend(); plt.show()

prec, rec, pr_thr = precision_recall_curve(y_test, proba_best)
plt.figure()
plt.plot(rec, prec)
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("PR Curve"); plt.show()

f1_scores = [(t, f1_score(y_test, (proba_best >= t).astype(int))) for t in np.linspace(0.1, 0.9, 81)]
best_t, best_f1 = max(f1_scores, key=lambda x: x[1])
print(f"Best threshold by F1: {best_t:.3f} (F1={best_f1:.3f})")

preds_opt = (proba_best >= best_t).astype(int)
print("\nClassification Report (best threshold)")
print(classification_report(y_test, preds_opt, digits=4))
print("Confusion Matrix:\n", confusion_matrix(y_test, preds_opt))


In [None]:

# === 9) Log Experiment & Save Artifacts ===
from datetime import datetime
import pandas as pd, json, joblib, os

exp_record = {
    "timestamp": datetime.utcnow().isoformat(),
    "best_model": best_name,
    "best_auc": float(best_auc),
    "threshold": float(best_t),
    "train_rows": int(len(X_train)),
    "test_rows": int(len(X_test)),
    "num_features": len(X.select_dtypes(include=['number']).columns),
    "cat_features": len(X.select_dtypes(exclude=['number']).columns),
}

if os.path.exists(EXP_LOG):
    old = pd.read_csv(EXP_LOG)
    out = pd.concat([old, pd.DataFrame([exp_record])], ignore_index=True)
else:
    out = pd.DataFrame([exp_record])
out.to_csv(EXP_LOG, index=False)
print("Logged experiment to:", EXP_LOG)
display(out.tail(5))

# Save model + metadata
model_path = os.path.join(ARTIFACT_DIR, f"model_{best_name}.joblib")
joblib.dump(best_model, model_path)

meta = {
    "target_col": target_col,
    "best_model": best_name,
    "threshold": float(best_t),
}
with open(os.path.join(ARTIFACT_DIR, "metadata.json"), "w") as f:
    json.dump(meta, f, indent=2)

print("Saved model:", model_path)
print("Saved metadata:", os.path.join(ARTIFACT_DIR, "metadata.json"))


In [None]:

# === 10) Inference Helper ===
import numpy as np, pandas as pd

def predict_default(df_raw: pd.DataFrame, threshold=None):
    proba = best_model.predict_proba(df_raw)[:,1]
    t = threshold if threshold is not None else best_t
    return pd.DataFrame({"proba": proba, "pred": (proba >= t).astype(int)})

# Demo on 5 samples
demo = X.iloc[:5].copy()
display(demo.join(predict_default(demo)))


In [None]:

# === 11) Basic Unit Tests ===

def test_no_nulls_after_preprocess():
    Z = best_model.named_steps.get("preprocess").fit_transform(X, y)
    import numpy as np
    assert not np.isnan(Z).any(), "NaNs found after preprocess"

def test_probability_bounds():
    import numpy as np
    p = best_model.predict_proba(X.iloc[:20])[:,1]
    assert np.all((p>=0)&(p<=1)), "Probabilities out of range"

def test_inference_shape():
    preds = predict_default(X.iloc[:10])
    assert len(preds)==10 and {"proba","pred"}.issubset(preds.columns), "Inference output malformed"

for fn in [test_no_nulls_after_preprocess, test_probability_bounds, test_inference_shape]:
    fn()
print("✅ Basic unit tests passed.")


In [None]:

# === 12) Environment Versions ===
import sklearn, sys, pandas as pd, numpy as np
print("Python:", sys.version)
print("pandas:", pd.__version__)
print("numpy:", np.__version__)
print("sklearn:", sklearn.__version__)
try:
    import xgboost as xgb
    print("xgboost:", xgb.__version__)
except Exception:
    print("xgboost: not available")
try:
    import imblearn
    print("imblearn:", imblearn.__version__)
except Exception:
    print("imblearn: not available")


Python: 3.12.11 (main, Jun  4 2025, 08:56:18) [GCC 11.4.0]
pandas: 2.2.2
numpy: 2.0.2
sklearn: 1.6.1
xgboost: 3.0.4
imblearn: 0.14.0
