<a href="https://colab.research.google.com/github/Nuthan10/Loan-To-Cap-Ratio/blob/main/Insurance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip -q install pandas numpy scikit-learn xgboost joblib matplotlib

import os, zipfile
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix,
    roc_curve, auc, precision_recall_curve
)
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

import xgboost as xgb
from xgboost import XGBClassifier
import joblib

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)

SEED = 42
np.random.seed(SEED)

WORKDIR = Path("/content")
DATA_DIR = WORKDIR / "data"
OUT_DIR = WORKDIR / "outputs"
DATA_DIR.mkdir(exist_ok=True, parents=True)
OUT_DIR.mkdir(exist_ok=True, parents=True)

def print_head(df, title, n=5):
    print(f"\n--- {title} (rows={len(df):,}, cols={df.shape[1]}) ---")
    display(df.head(n))

def require_columns(df: pd.DataFrame, cols, name="dataframe"):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{name} is missing required columns: {missing}")

def evaluate_binary_percent(y_true, y_proba, threshold=0.5, model_name="model"):
    y_true = np.asarray(y_true).astype(int)
    y_pred = (y_proba >= threshold).astype(int)
    return {
        "model": model_name,
        "threshold": float(threshold),
        "accuracy_%": round(accuracy_score(y_true, y_pred) * 100, 2),
        "precision": round(precision_score(y_true, y_pred, zero_division=0), 4),
        "recall": round(recall_score(y_true, y_pred, zero_division=0), 4),
        "f1": round(f1_score(y_true, y_pred, zero_division=0), 4),
        "roc_auc": round(roc_auc_score(y_true, y_proba), 4) if len(np.unique(y_true)) > 1 else np.nan,
        "pr_auc": round(average_precision_score(y_true, y_proba), 4) if len(np.unique(y_true)) > 1 else np.nan,
        "positive_rate_%": round(y_true.mean() * 100, 2),
        "confusion_matrix": confusion_matrix(y_true, y_pred).tolist()
    }

print("Setup complete.")


In [None]:
from google.colab import files

uploaded = files.upload()
for fn in uploaded.keys():
    print("Uploaded:", fn)

for f in WORKDIR.glob("*"):
    if f.is_file() and f.suffix.lower() in [".zip", ".txt", ".csv"]:
        target = DATA_DIR / f.name
        if not target.exists():
            f.rename(target)

for z in DATA_DIR.glob("*.zip"):
    with zipfile.ZipFile(z, "r") as zip_ref:
        zip_ref.extractall(DATA_DIR)
    print("Extracted:", z.name)

print("Files in data/:")
for f in sorted(DATA_DIR.glob("*"))[:200]:
    print(" -", f.name)


In [None]:
def read_freddie_file(path: Path, sep="|"):
    return pd.read_csv(path, sep=sep, header=None, dtype=str, engine="python")

all_files = list(DATA_DIR.glob("*"))
txt_candidates = [p for p in all_files if p.suffix.lower() in [".txt", ".csv"]]
txt_candidates = sorted(txt_candidates, key=lambda p: p.stat().st_size, reverse=True)

orig_candidates = [p for p in txt_candidates if "orig" in p.name.lower() or "origination" in p.name.lower()]
perf_candidates = [p for p in txt_candidates if any(k in p.name.lower() for k in ["svcg", "servicing", "perf", "performance", "time"])]

orig_path = orig_candidates[0] if orig_candidates else (txt_candidates[0] if txt_candidates else None)
perf_path = perf_candidates[0] if perf_candidates else (txt_candidates[1] if len(txt_candidates) > 1 else None)

if orig_path is None or perf_path is None:
    raise FileNotFoundError("Could not identify Freddie origination and performance files.")

print("Using origination file:", orig_path.name)
print("Using performance file:", perf_path.name)

orig_raw = read_freddie_file(orig_path, sep="|")
perf_raw = read_freddie_file(perf_path, sep="|")

print_head(orig_raw, "Orig RAW")
print_head(perf_raw, "Perf RAW")


In [None]:
FREDDIE_ORIG_COLS = [
    "credit_score","first_payment_date","first_time_homebuyer_flag","maturity_date","msa",
    "mortgage_insurance_pct","num_units","occupancy_status","orig_combined_ltv","orig_dti",
    "orig_upb","orig_ltv","orig_interest_rate","channel","prepayment_penalty_flag",
    "amortization_type","property_state","property_type","postal_code","loan_sequence_number",
    "loan_purpose","orig_loan_term","num_borrowers","seller_name","servicer_name",
    "super_conforming_flag"
]

FREDDIE_PERF_COLS = [
    "loan_sequence_number","monthly_reporting_period","current_actual_upb","current_loan_delinquency_status",
    "loan_age","remaining_months_to_legal_maturity","repurchase_flag","modification_flag","zero_balance_code",
    "zero_balance_effective_date","current_interest_rate","current_deferred_upb","due_date_of_last_paid_installment",
    "mi_recoveries","net_sales_proceeds","non_mi_recoveries","expenses","legal_costs","maintenance_and_preservation_costs",
    "taxes_and_insurance","misc_expenses","actual_loss_calculation","modification_cost",
    "step_mod_flag","deferred_payment_modification","estimated_ltv","zero_balance_removal_upb","delinquency_due_to_disaster",
    "borrower_assistance_status","current_month_modification_cost","interest_bearing_upb"
]

def apply_cols_strict(df, cols, name):
    if df.shape[1] != len(cols):
        raise ValueError(
            f"{name} column count mismatch. Found {df.shape[1]} columns but expected {len(cols)}."
        )
    df = df.copy()
    df.columns = cols
    return df

# Ensure orig_raw has the same number of columns as FREDDIE_ORIG_COLS by selecting the first N columns
orig = apply_cols_strict(orig_raw.iloc[:, :len(FREDDIE_ORIG_COLS)], FREDDIE_ORIG_COLS, "FREDDIE_ORIG_COLS")
# Ensure perf_raw has the same number of columns as FREDDIE_PERF_COLS by selecting the first N columns
perf = apply_cols_strict(perf_raw.iloc[:, :len(FREDDIE_PERF_COLS)], FREDDIE_PERF_COLS, "FREDDIE_PERF_COLS")

require_columns(perf, ["loan_sequence_number","monthly_reporting_period","current_loan_delinquency_status","zero_balance_code"], "Freddie performance")

perf["monthly_reporting_period"] = pd.to_datetime(perf["monthly_reporting_period"], format="%Y%m", errors="coerce")
perf["zero_balance_effective_date"] = pd.to_datetime(perf["zero_balance_effective_date"], format="%Y%m", errors="coerce")

if perf["monthly_reporting_period"].isna().any():
    raise ValueError("monthly_reporting_period has invalid dates.")

print_head(orig, "Orig NAMED")
print_head(perf, "Perf NAMED")

In [None]:
def to_numeric(s):
    return pd.to_numeric(s, errors="coerce")

def map_dq(x):
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return np.nan
    x = str(x).strip()
    return int(x) if x.isdigit() else np.nan

perf = perf.sort_values(["loan_sequence_number", "monthly_reporting_period"]).copy()

perf["dq_status_numeric"] = perf["current_loan_delinquency_status"].apply(map_dq)
perf["zero_balance_code_num"] = to_numeric(perf["zero_balance_code"])

perf["is_delinquent"] = (perf["dq_status_numeric"].fillna(0) >= 1).astype(int)
perf["is_serious_delinquent"] = (perf["dq_status_numeric"].fillna(0) >= 3).astype(int)

g = perf.groupby("loan_sequence_number", sort=False)

H = 6

future_serious = [g["is_serious_delinquent"].shift(-k) for k in range(1, H+1)]
future_zb = [g["zero_balance_code_num"].shift(-k).isin([3, 6, 9]).astype(int) for k in range(1, H+1)]

perf["y_default_future_6m"] = (
    pd.concat(future_serious, axis=1).max(axis=1).fillna(0).astype(int)
    | pd.concat(future_zb, axis=1).max(axis=1).fillna(0).astype(int)
).astype(int)

perf["rev_pos_in_loan"] = g.cumcount(ascending=False)
perf_snap = perf[perf["rev_pos_in_loan"] >= H].copy()

perf_snap["roll_3_dq"] = g["is_delinquent"].rolling(3, min_periods=1).sum().reset_index(level=0, drop=True)
perf_snap["roll_6_serious_dq"] = g["is_serious_delinquent"].rolling(6, min_periods=1).sum().reset_index(level=0, drop=True)

def streak(series):
    out, s = [], 0
    for v in series.astype(int).tolist():
        s = s + 1 if v == 1 else 0
        out.append(s)
    return pd.Series(out, index=series.index)

perf_snap["dq_streak"] = g["is_delinquent"].apply(streak).reset_index(level=0, drop=True)
perf_snap["gap_flag_proxy"] = (perf_snap["dq_streak"] >= 2).astype(int)

perf_snap["dq_status_lag1"] = g["dq_status_numeric"].shift(1)
perf_snap = perf_snap[perf_snap["dq_status_lag1"].notna()].copy()

freddie_ds = perf_snap.merge(orig, on="loan_sequence_number", how="left")

print("perf_snap shape:", perf_snap.shape)
print("y_default_future_6m positive rate:", perf_snap["y_default_future_6m"].mean())
print_head(freddie_ds, "Freddie Dataset (Leakage-Free)")


In [None]:
import xgboost as xgb

candidate_num = [
    "dq_status_lag1","roll_3_dq","roll_6_serious_dq","dq_streak","gap_flag_proxy",
    "orig_upb","orig_ltv","orig_dti","credit_score","orig_interest_rate","orig_loan_term","orig_combined_ltv"
]
candidate_cat = ["property_state","property_type","occupancy_status","loan_purpose","channel"]

num_cols_f = [c for c in candidate_num if c in freddie_ds.columns]
cat_cols_f = [c for c in candidate_cat if c in freddie_ds.columns]

X = freddie_ds[num_cols_f + cat_cols_f].copy()
y = freddie_ds["y_default_future_6m"].astype(int).values
loan_ids = freddie_ds["loan_sequence_number"].values

unique_loans = np.unique(loan_ids)
train_loans, test_loans = train_test_split(unique_loans, test_size=0.2, random_state=SEED)
train_loans, val_loans  = train_test_split(train_loans, test_size=0.2, random_state=SEED)

train_mask = np.isin(loan_ids, train_loans)
val_mask   = np.isin(loan_ids, val_loans)
test_mask  = np.isin(loan_ids, test_loans)

X_tr, y_tr = X.loc[train_mask], y[train_mask]
X_val, y_val = X.loc[val_mask], y[val_mask]
X_test, y_test_freddie = X.loc[test_mask], y[test_mask]

pre_f = ColumnTransformer(
    transformers=[
        ("num", Pipeline([("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler())]), num_cols_f),
        ("cat", Pipeline([("imp", SimpleImputer(strategy="most_frequent")),
                          ("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=False))]), cat_cols_f)
    ],
    remainder="drop"
)

X_tr_t = pre_f.fit_transform(X_tr)
X_val_t = pre_f.transform(X_val)
X_test_t_freddie = pre_f.transform(X_test)

dtrain = xgb.DMatrix(X_tr_t, label=y_tr)
dval   = xgb.DMatrix(X_val_t, label=y_val)
dtest  = xgb.DMatrix(X_test_t_freddie)

params = {
    "objective": "binary:logistic",
    "eval_metric": "auc",
    "learning_rate": 0.05,
    "max_depth": 4,
    "subsample": 0.85,
    "colsample_bytree": 0.85,
    "lambda": 1.0,
    "seed": SEED,
    "nthread": -1
}

watchlist = [(dtrain, "train"), (dval, "valid")]

booster_f = xgb.train(
    params=params,
    dtrain=dtrain,
    num_boost_round=5000,
    evals=watchlist,
    early_stopping_rounds=50,
    verbose_eval=False
)

proba_freddie = booster_f.predict(dtest)
freddie_metrics = evaluate_binary_percent(y_test_freddie, proba_freddie, 0.5, "Freddie_Default_Future6m_XGB")
display(pd.DataFrame([freddie_metrics]))

joblib.dump(
    {"pre": pre_f, "model": booster_f, "num_cols": num_cols_f, "cat_cols": cat_cols_f},
    OUT_DIR / "freddie_default_future6m_model.joblib"
)
print("Saved Freddie model:", OUT_DIR / "freddie_default_future6m_model.joblib")
print("Best iteration:", booster_f.best_iteration)


In [None]:
def plot_class_balance(y, title):
    y = np.asarray(y).astype(int)
    counts = np.bincount(y, minlength=2)
    labels = ["Negative (0)", "Positive (1)"]
    plt.figure()
    plt.bar(labels, counts)
    plt.title(title)
    plt.xlabel("Class")
    plt.ylabel("Count")
    plt.show()

def plot_roc(y_true, y_proba, title):
    y_true = np.asarray(y_true).astype(int)
    fpr, tpr, _ = roc_curve(y_true, y_proba)
    roc_auc = auc(fpr, tpr)
    plt.figure()
    plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.4f}")
    plt.plot([0, 1], [0, 1], linestyle="--")
    plt.title(title)
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend(loc="lower right")
    plt.show()

def plot_pr(y_true, y_proba, title):
    y_true = np.asarray(y_true).astype(int)
    precision, recall, _ = precision_recall_curve(y_true, y_proba)
    ap = average_precision_score(y_true, y_proba)
    plt.figure()
    plt.plot(recall, precision, label=f"PR-AUC = {ap:.4f}")
    plt.title(title)
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.legend(loc="lower left")
    plt.show()

def plot_confusion(y_true, y_proba, threshold, title):
    y_true = np.asarray(y_true).astype(int)
    y_pred = (np.asarray(y_proba) >= threshold).astype(int)
    cm = confusion_matrix(y_true, y_pred)
    plt.figure()
    plt.imshow(cm, interpolation="nearest")
    plt.title(f"{title} (thr={threshold})")
    plt.colorbar()
    plt.xticks([0, 1], ["Pred 0", "Pred 1"])
    plt.yticks([0, 1], ["True 0", "True 1"])
    for i in range(2):
        for j in range(2):
            plt.text(j, i, str(cm[i, j]), ha="center", va="center")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

def plot_score_dist(y_true, y_proba, title):
    y_true = np.asarray(y_true).astype(int)
    y_proba = np.asarray(y_proba)
    plt.figure()
    plt.hist(y_proba[y_true == 0], bins=30, alpha=0.7, label="Class 0")
    plt.hist(y_proba[y_true == 1], bins=30, alpha=0.7, label="Class 1")
    plt.title(title)
    plt.xlabel("Predicted probability")
    plt.ylabel("Frequency")
    plt.legend()
    plt.show()

def plot_xgb_training_curve(xgb_model, title):
    if not hasattr(xgb_model, "evals_result_") or not xgb_model.evals_result_:
        print("No evals_result_ found.")
        return
    res = xgb_model.evals_result_
    key0 = list(res.keys())[0]
    metric0 = list(res[key0].keys())[0]
    vals = res[key0][metric0]
    plt.figure()
    plt.plot(range(len(vals)), vals)
    plt.title(f"{title}\n({key0} / {metric0})")
    plt.xlabel("Boosting round")
    plt.ylabel(metric0)
    plt.show()

def plot_xgb_feature_importance(xgb_model, feature_names=None, top_n=20, title="Top Feature Importances"):
    booster = xgb_model.get_booster()
    score = booster.get_score(importance_type="gain")
    if not score:
        print("No feature importance available.")
        return
    items = sorted(score.items(), key=lambda kv: kv[1], reverse=True)[:top_n]
    feats = [k for k, _ in items]
    vals = [v for _, v in items]
    if feature_names is not None:
        def map_name(f):
            if f.startswith("f"):
                idx = int(f[1:])
                if 0 <= idx < len(feature_names):
                    return feature_names[idx]
            return f
        feats = [map_name(f) for f in feats]
    plt.figure()
    y_pos = np.arange(len(feats))
    plt.barh(y_pos, vals)
    plt.yticks(y_pos, feats)
    plt.gca().invert_yaxis()
    plt.title(title + " (gain)")
    plt.xlabel("Importance")
    plt.show()


In [None]:
plot_class_balance(y_test_freddie, "Freddie — Test Class Balance (Future 6M Default)")
plot_roc(y_test_freddie, proba_freddie, "Freddie — ROC Curve")
plot_pr(y_test_freddie, proba_freddie, "Freddie — Precision–Recall Curve")
plot_confusion(y_test_freddie, proba_freddie, threshold=0.5, title="Freddie — Confusion Matrix")
plot_score_dist(y_test_freddie, proba_freddie, "Freddie — Score Distribution")
plot_xgb_training_curve(xgb_f, "Freddie — Validation AUC Over Boosting Rounds")

try:
    feature_names_f = pre_f.get_feature_names_out()
except Exception:
    feature_names_f = None

plot_xgb_feature_importance(xgb_f, feature_names=feature_names_f, top_n=20, title="Freddie — Top Feature Importances")


In [None]:
from google.colab import files

uploaded = files.upload()
for fn in uploaded.keys():
    print("Uploaded:", fn)

for f in WORKDIR.glob("*.csv"):
    target = DATA_DIR / f.name
    if not target.exists():
        f.rename(target)

csvs = list(DATA_DIR.glob("*.csv"))
if not csvs:
    raise FileNotFoundError("No CSV found.")

claims_path = csvs[0]
claims = pd.read_csv(claims_path)
print("Using claims file:", claims_path.name)
print_head(claims, "Claims RAW")


In [None]:
import xgboost as xgb

dtrain = xgb.DMatrix(X_tr_t, label=y_tr)
dval   = xgb.DMatrix(X_val_t, label=y_val)
dtest  = xgb.DMatrix(X_test_t_claims)

params = {
    "objective": "binary:logistic",
    "eval_metric": "auc",
    "learning_rate": 0.05,
    "max_depth": 4,
    "subsample": 0.85,
    "colsample_bytree": 0.85,
    "lambda": 1.0,
    "seed": SEED,
    "nthread": -1
}

watchlist = [(dtrain, "train"), (dval, "valid")]

booster_c = xgb.train(
    params=params,
    dtrain=dtrain,
    num_boost_round=5000,
    evals=watchlist,
    early_stopping_rounds=50,
    verbose_eval=False
)

proba_claims = booster_c.predict(dtest)
claims_metrics = evaluate_binary_percent(y_test_claims, proba_claims, 0.5, "Claims_Fraud_XGB")
display(pd.DataFrame([claims_metrics]))

joblib.dump(
    {"pre": pre_c, "model": booster_c, "num_cols": num_cols_c, "cat_cols": cat_cols_c},
    OUT_DIR / "claim_suspicious_model.joblib"
)
print("Saved Claims model:", OUT_DIR / "claim_suspicious_model.joblib")
print("Best iteration:", booster_c.best_iteration)
