In [None]:
import pandas as pd
import numpy as np

In [None]:
def quick_report(df):
    print("Rows:", len(df))
    print("Columns:", df.columns.tolist())
    print("\nMissing counts:")
    print(df.isna().sum().sort_values(ascending=False))
    print("\nSample types:")
    print(df.dtypes)
    print("\nSample rows:")
    print(df.head(3))
    print("-" * 40)

In [None]:
df= pd.read_csv("RuralCreditData.csv")

In [None]:
quick_report(df)

In [None]:
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]

In [None]:
numeric_cols = ["annual_income","monthly_expenses","loan_amount","loan_tenure",
                "loan_installments","age","occupants_count","house_area",
                "home_ownership","sanitary_availability","water_availabity"]

In [None]:
#changing numeric values to strings.
#ex: ₹12,000 to 12000
for c in numeric_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c].astype(str).str.replace(r"[^\d\.\-]","", regex=True), errors="coerce")

In [None]:
cat_cols = ["city","sex","social_class","primary_business","secondary_business","type_of_house","loan_purpose"]

In [None]:
for c in cat_cols:
    if c in df.columns:
        df[c] = df[c].astype(str).str.strip().str.lower()
        df.loc[df[c].isin(["nan","none","null","na",""]), c] = np.nan

In [None]:
for c in ["annual_income","monthly_expenses","loan_amount","loan_tenure",
          "loan_installments","age","occupants_count","house_area"]:
    if c in df.columns:
        med = np.nanmedian(df[c])
        if np.isfinite(med):
            df[c] = df[c].fillna(med)

In [None]:
for c in ["home_ownership","sanitary_availability","water_availabity"]:
    if c in df.columns:
        med = np.nanmedian(df[c])
        if np.isfinite(med):
            df[c] = df[c].fillna(med)
        ser = df[c].astype(float)
        if ser.dropna().between(0,1).all():
            df[c] = ser.round().astype(int)
        else:
            df[c] = (ser > 0.5).astype(int)

changing NaN values in categorical

In [None]:
def _mode_or(s, fallback="unknown"):
    try:
        m = s.mode(dropna=True)
        if len(m) > 0: return m.iloc[0]
    except Exception:
        pass
    return fallback

In [None]:
fill_tokens = {
    "city": "unknown",
    "sex": _mode_or(df["sex"]) if "sex" in df.columns else "unknown",
    "social_class": "unknown",
    "primary_business": _mode_or(df["primary_business"]) if "primary_business" in df.columns else "unknown",
    "secondary_business": "none",
    "type_of_house": _mode_or(df["type_of_house"]) if "type_of_house" in df.columns else "unknown",
    "loan_purpose": _mode_or(df["loan_purpose"]) if "loan_purpose" in df.columns else "unknown",
}

In [None]:
for c, token in fill_tokens.items():
    if c in df.columns:
        df[c] = df[c].fillna(token)

In [None]:
#handling negative and NaN values in numeric
for c in ["annual_income","monthly_expenses","loan_amount","loan_tenure",
          "loan_installments","age","house_area","occupants_count"]:
    if c in df.columns:
        # Replace negatives with 0
        df[c] = df[c].clip(lower=0)

        # Fill missing with median
        df[c].fillna(df[c].median(), inplace=True)

In [None]:
#taking care of outliers (1% and 99%)
for c in ["annual_income","monthly_expenses","loan_amount","loan_tenure",
          "loan_installments","age","house_area"]:
    if c in df.columns:
        low, high = np.nanquantile(df[c], [0.01, 0.99])
        df[c] = df[c].clip(low, high)

In [None]:
quick_report(df)

In [None]:
df_cleaned = df.copy()
df_cleaned.to_csv("RuralCreditData_cleaned.csv", index=False)

In [None]:
from google.colab import files
files.download("RuralCreditData_cleaned.csv")

# Output Column Computation

In [None]:
import pandas as pd
import numpy as np

In [None]:
df=pd.read_csv("RuralCreditData_cleaned.csv")

In [None]:
print(df.columns.tolist())

In [None]:
#computing monthly income from annual income (given)
df["monthly_income"] = pd.to_numeric(df.get("annual_income", np.nan), errors="coerce") / 12.0
if df["monthly_income"].isna().any():
    df["monthly_income"] = df["monthly_income"].fillna(df["monthly_income"].median())

In [None]:
#changing all loan_tenure to monthly. if value missing, replace with 12 months
if "loan_tenure" in df.columns:
    med_lt = np.nanmedian(df["loan_tenure"])
    if np.isfinite(med_lt) and med_lt > 10:
        df["loan_tenure_months"] = df["loan_tenure"]
    else:
        df["loan_tenure_months"] = df["loan_tenure"] * 12.0
else:
    df["loan_tenure_months"] = 12.0

In [None]:
#calculating loan_to_income ratio (loan compared to income) using loan_amount. adding 1 to take care of any undefined cases
df["loan_to_income"] = (df.get("loan_amount", 0) + 1.0) / (df.get("monthly_income", 0) + 1.0)

In [None]:
#income buffer. how much income left after paying for expenses)
df["income_buffer"]  = (df.get("monthly_income", 0) + 1.0) / (df.get("monthly_expenses", 0) + 1.0)

In [None]:
#no. of years of tenure
df["tenure_years"]   = df["loan_tenure_months"] / 12.0

In [None]:
#calculating savings
df["estimated_savings"] = (df.get("annual_income", 0) - 12.0 * df.get("monthly_expenses", 0)).clip(lower=0.0)

In [None]:
#feature extraction of loan purpose into w different cols and making them binary
lp = df.get("loan_purpose", "").astype(str).str.lower()
df["consumption_loan"] = lp.str.contains("consum|personal|household|marriage|medical", na=False).astype(int)
df["agri_loan"]        = lp.str.contains("agri|farm|crop|dairy|livestock|agriculture", na=False).astype(int)

In [None]:
#making a rainfall deficit column ranging between -2 to +2
#0 means normal rainfall
#+1 means mild drought
#+2 means severe drought
#-1 means better than average rainfall
#-2 means very good monsoon
rng = np.random.default_rng(42)
df["rainfall_deficit"] = rng.normal(0.0, 0.5, size=len(df)).clip(-2.0, 2.0)

In [None]:
#z-scaling the columns using which logit (credit score/risk score) will be calculated
def z_scale(col):
  x = pd.to_numeric(col, errors="coerce").astype(float)
  med = np.nanmedian(x)
  q1, q3 = np.nanpercentile(x, [25, 75])
  iqr = max(q3 - q1, 1e-9)
  return (x - med) / (iqr / 1.349)

In [None]:
z_lti = z_scale(df["loan_to_income"])
z_IB = z_scale(df["income_buffer"])
z_savings = z_scale(df["estimated_savings"])
z_tenure = z_scale(df["tenure_years"])

In [None]:
#logit calculation
b0 = -1.8 #baseline

logit = (
    b0
  + 1.10 * z_lti                   # higher leverage → higher risk
  - 0.90 * z_IB                   # more buffer → lower risk
  - 0.50 * z_savings                  # more savings → lower risk
  - 0.30 * z_tenure                  # longer tenure → slightly lower risk
  + 0.40 * df["consumption_loan"]  # consumption use → higher risk
  - 0.20 * df["agri_loan"]         # productive agri use → lower risk
  + 0.30 * df["rainfall_deficit"]  # bad season → higher risk
  + rng.normal(0.0, 0.15, size=len(df))  # for randomness
)

In [None]:
def sigmoid(z):
    return 1.0 / (1.0 + np.exp(-z))
#to convert risk score/logit to 0-1 probability (PD: Predicted Default)

In [None]:
#so average PD matches a realistic target (0.18 in our case-> observed in other datasets online)
def calibrate_shift(logits, target, tolerance=1e-6, max_iter=80):
    # find b such that mean(sigmoid(logits + b)) ~ target
    low, high = -10.0, 10.0
    for _ in range(max_iter):
        mid = 0.5 * (low + high) #binary search
        m = sigmoid(logits + mid).mean() #changing to PD and then calculating average PD (.mean())
        if m > target:
            high = mid
        else:
            low = mid
        if abs(m - target) < tolerance:
            return mid
    return 0.5 * (low + high)

In [None]:
TARGET_RATE = 0.18

In [None]:
b_shift = calibrate_shift(logit, TARGET_RATE) #found the correct shift
#i.e. how much should a PD value be shifted in order for the avg. PD value to be 0.18

In [None]:
pd_cal = sigmoid(logit + b_shift) #calculating final PD after shifting
#in calibration, we were just testing what the shift should be

In [None]:
#calculated probability of a default. now change that to binary-> default(1) or repay/did not default(0)

In [None]:
#P(X=x)=p^x(1−p)^(1−x),x∈{0,1}. bernoulli's formula

In [None]:
rng2 = np.random.default_rng(123)
default_label = (rng2.random(len(df)) < pd_cal).astype(int)

In [None]:
labels_df = pd.DataFrame({
    "pd": pd_cal,
    "default": default_label
}, index=df.index)

In [None]:
df = pd.concat([df, labels_df], axis=1)

In [None]:
df.head()

In [None]:
df.to_csv("RuralCreditData_cleaned_with_Default.csv", index=False)

In [None]:
from google.colab import files
files.download("RuralCreditData_cleaned_with_Default.csv")

In [None]:
print("Observed default rate:", round(df['default'].mean(), 3))

In [None]:
import pandas as pd
import numpy as np

In [None]:
df=pd.read_csv("RuralCreditData_cleaned_with_Default.csv")

In [None]:
df.head()

In [None]:
df.sample(50)

In [None]:
df.shape

In [None]:
df.info()

In [None]:
df['default'].value_counts(normalize=True) #moderately imbalanced (will handle in further steps)

# EDA

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
sns.set(style="whitegrid", palette="muted", font_scale=1.1)

In [None]:
df.columns.tolist()

In [None]:
df.dtypes

In [None]:
df.isna().sum().sort_values(ascending=False)

In [None]:
df.describe().T

In [None]:
ax = df['default'].value_counts(normalize=True).plot(
    kind='bar',
    color=['skyblue', 'salmon'],
    rot=0,
    title='Default class proportion (0=No Default, 1=Default)'
)
for i, v in enumerate(df['default'].value_counts(normalize=True)):
    ax.text(i, v + 0.01, f"{v:.2%}", ha='center')
plt.show()

In [None]:
num_cols = ['annual_income', 'monthly_expenses', 'loan_amount',
             'house_area', 'occupants_count', 'loan_to_income',
             'income_buffer', 'estimated_savings']

df[num_cols].hist(bins=30, figsize=(14,10), color='lightsteelblue', edgecolor='gray')
plt.suptitle('Numeric Feature Distributions', fontsize=16)
plt.show()

In [None]:
for col in ['annual_income', 'loan_amount', 'loan_to_income', 'income_buffer']:
    plt.figure(figsize=(6,4))
    sns.boxplot(x='default', y=col, data=df, palette=['lightblue','salmon'])
    plt.title(f'{col} vs Default')
    plt.show()

In [None]:
cat_cols = ['home_ownership','type_of_house','social_class','loan_purpose']

for c in cat_cols:
    if c in df.columns:
        plt.figure(figsize=(7,4))
        sns.countplot(x=c, hue='default', data=df,
                      palette=['lightblue','salmon'])
        plt.title(f'{c} distribution by Default')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

In [None]:
plt.figure(figsize=(12,8))
corr = df.corr(numeric_only=True)
sns.heatmap(corr, cmap="RdBu_r", center=0, annot=False)
plt.title("Correlation Heatmap")
plt.show()

In [None]:
corr_target = corr['default'].sort_values(ascending=False)
print("\nTop correlations with default:\n")
print(corr_target)

# Preprocessing

In [None]:
df = df.drop(columns=['id','pd'], errors='ignore')

In [None]:
df[['loan_tenure','loan_tenure_months','tenure_years','consumption_loan']].describe()

In [None]:
#no variance shown
drop_cols = ['loan_tenure','loan_tenure_months','tenure_years','consumption_loan']
df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore')

In [None]:
#social class has 457 unique values so if i OHE all of them, there will be 457 cols-> not good
#so i'll take the top 20 classes and OHE them while i mark the rest as others and in UI that's what i'll show in the dropdown, i.e. 20 different options and 1 option as "others"
top_classes = df['social_class'].value_counts().nlargest(20).index
df['social_class'] = df['social_class'].apply(lambda x: x if x in top_classes else 'other')

In [None]:
#OHE
df = pd.get_dummies(df, columns=['social_class'], drop_first=True)

In [None]:
cat_cols = [ 'city', 'type_of_house', 'social_class',
             'loan_purpose', 'primary_business', 'secondary_business' ]

df = pd.get_dummies(df, columns=[c for c in cat_cols if c in df.columns], drop_first=True)

In [None]:
df.head()

In [None]:
df.dtypes

In [None]:
bool_cols = df.select_dtypes(include='bool').columns
df[bool_cols] = df[bool_cols].astype(int)

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
df.dtypes.value_counts()
df.select_dtypes(exclude=['number'])

In [None]:
df['sex'] = df['sex'].astype(str).str.strip().str.lower()

In [None]:
df['sex'] = df['sex'].map({'m': 1, 'male': 1, 'f': 0, 'female': 0})

In [None]:
df['sex'].value_counts(dropna=False)

In [None]:
df['sex'] = df['sex'].fillna(df['sex'].mode()[0])

In [None]:
df.dtypes.value_counts()

In [None]:
non_numeric = df.select_dtypes(exclude=['int64','float64'])
print(non_numeric.columns)

Dimensionality using Feature Selection

In [None]:
#removing cols that are mostly constant
freq = (df.values != 0).mean(axis=0)   # fraction of non-zeros
keep_mask = (freq >= 0.01) & (freq <= 0.99)
df = df.loc[:, keep_mask]

In [None]:
df.shape

In [None]:
#to know the most informative features
from sklearn.feature_selection import mutual_info_classif
import pandas as pd

X = df.drop(columns=['default'])
y = df['default']

mi = mutual_info_classif(X, y, discrete_features='auto', random_state=42)
mi_series = pd.Series(mi, index=X.columns).sort_values(ascending=False)

print(mi_series.head(15))


In [None]:
df.head()

In [None]:
df.to_csv("RuralCreditData_preprocessed.csv", index=False)

In [None]:
from google.colab import files
files.download("RuralCreditData_preprocessed.csv")

In [None]:
import pandas as pd

In [None]:
df=pd.read_csv("RuralCreditData_preprocessed.csv")

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X = df.drop(columns=['default'])
y = df['default']

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=42
)

In [None]:
X_train.shape

In [None]:
X_test.shape

In [None]:
y_train.mean().round(3)

In [None]:
y_test.mean().round(3)

In [None]:
#will be using pipeline for all the algos to be tested into optuna because
#algos like logistic regression and SVM need scaling whereas algis like XGBoost, DT, RF don't

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.tree import DecisionTreeClassifier

In [None]:
pipelines = {
    "Logistic Regression": Pipeline([
        ("scaler", StandardScaler()),
        ("model", LogisticRegression(class_weight='balanced', max_iter=2000, random_state=42))
    ]),

    "SVM": Pipeline([
        ("scaler", StandardScaler()),
        ("model", SVC(class_weight='balanced', probability=True, random_state=42))
    ]),

    "Random Forest": Pipeline([
        ("model", RandomForestClassifier(class_weight='balanced', random_state=42))
    ]),

    "Decision Tree": Pipeline([
        ("model", DecisionTreeClassifier(class_weight='balanced', random_state=42))
    ]),

    "XGBoost": Pipeline([
        ("model", XGBClassifier(scale_pos_weight=(1/y_train.mean()), eval_metric='logloss', random_state=42))
    ])
}

In [None]:
from sklearn.metrics import roc_auc_score

for name, pipe in pipelines.items():
    pipe.fit(X_train, y_train)
    y_prob = pipe.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_prob)
    print(f"{name}: ROC-AUC = {auc:.3f}")


In [None]:
print([col for col in df.columns if 'pd' in col or 'prob' in col])

In [None]:
from sklearn.model_selection import StratifiedKFold, cross_val_score

In [None]:
!pip install optuna

Now that `optuna` is installed, you can run the cell to import it.

In [None]:
import optuna

In [None]:
#handing imbalance for XGBoost
pos_rate = y_train.mean()
neg_rate = 1 - pos_rate
scale_pos_weight = float(neg_rate / pos_rate)

In [None]:
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [None]:
import numpy as np
def build_pipeline(model_name, trial):
    if model_name == "Logistic":
        C = trial.suggest_float("logreg_C", 1e-3, 10.0, log=True)
        return Pipeline([
            ("scaler", StandardScaler()),
            ("model", LogisticRegression(
                class_weight="balanced", max_iter=3000, random_state=42, C=C, solver="lbfgs"
            ))
        ])
    elif model_name == "SVM":
        C = trial.suggest_float("svm_C", 1e-2, 10.0, log=True)
        gamma = trial.suggest_float("svm_gamma", 1e-4, 1.0, log=True)
        return Pipeline([
            ("scaler", StandardScaler()),
            ("model", SVC(
                class_weight="balanced", probability=True, random_state=42,
                C=C, gamma=gamma, kernel="rbf"
            ))
        ])
    elif model_name == "RandomForest":
        n_estimators = trial.suggest_int("rf_n_estimators", 200, 600, step=100)
        max_depth = trial.suggest_int("rf_max_depth", 4, 24)
        max_features = trial.suggest_categorical("rf_max_features", ["sqrt", "log2", None])
        min_samples_split = trial.suggest_int("rf_min_samples_split", 2, 10)
        min_samples_leaf = trial.suggest_int("rf_min_samples_leaf", 1, 5)
        return Pipeline([
            ("model", RandomForestClassifier(
                class_weight="balanced", random_state=42, n_estimators=n_estimators,
                max_depth=max_depth, max_features=max_features,
                min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf,
                n_jobs=-1
            ))
        ])
    elif model_name == "DecisionTree":
        max_depth = trial.suggest_int("dt_max_depth", 3, 24)
        min_samples_split = trial.suggest_int("dt_min_samples_split", 2, 20)
        min_samples_leaf = trial.suggest_int("dt_min_samples_leaf", 1, 10)
        return Pipeline([
            ("model", DecisionTreeClassifier(
                class_weight="balanced", random_state=42, max_depth=max_depth,
                min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf
            ))
        ])
    elif model_name == "XGBoost":
        n_estimators = trial.suggest_int("xgb_n_estimators", 200, 800, step=100)
        max_depth = trial.suggest_int("xgb_max_depth", 3, 10)
        learning_rate = trial.suggest_float("xgb_learning_rate", 0.01, 0.3, log=True)
        subsample = trial.suggest_float("xgb_subsample", 0.6, 1.0)
        colsample_bytree = trial.suggest_float("xgb_colsample_bytree", 0.6, 1.0)
        reg_lambda = trial.suggest_float("xgb_reg_lambda", 0.0, 5.0)
        return Pipeline([
            ("model", XGBClassifier(
                random_state=42, eval_metric="logloss",
                n_estimators=n_estimators, max_depth=max_depth,
                learning_rate=learning_rate, subsample=subsample,
                colsample_bytree=colsample_bytree, reg_lambda=reg_lambda,
                scale_pos_weight=scale_pos_weight, n_jobs=-1
            ))
        ])
    else:
        raise ValueError("Unknown model")

In [None]:
def objective(trial):
    model_name = trial.suggest_categorical("model", ["Logistic","SVM","RandomForest","DecisionTree","XGBoost"])
    pipe = build_pipeline(model_name, trial)
    scores = cross_val_score(pipe, X_train, y_train, scoring="roc_auc", cv=cv, n_jobs=-1) # CV ROC-AUC
    return float(np.mean(scores))

In [None]:
study = optuna.create_study(direction="maximize", study_name="credit_model_selection")
study.optimize(objective, n_trials=40, show_progress_bar=True)

In [None]:
print("Best AUC:", study.best_value)
print("Best params:", study.best_trial.params)
best_model_name = study.best_trial.params["model"]

In [None]:
from sklearn.metrics import (
    roc_auc_score, average_precision_score, precision_recall_fscore_support,
    classification_report, confusion_matrix
)
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
best_pipe = build_pipeline(best_model_name, study.best_trial)
best_pipe.fit(X_train, y_train)

In [None]:
p_test = best_pipe.predict_proba(X_test)[:, 1]

In [None]:
y_pred = (p_test >= 0.50).astype(int)

In [None]:
auc = roc_auc_score(y_test, p_test)
pr_auc = average_precision_score(y_test, p_test)
prec, rec, f1, _ = precision_recall_fscore_support(y_test, y_pred, average="binary", zero_division=0)

In [None]:
print(f"AUC: {auc:.3f} | PR-AUC: {pr_auc:.3f} | Precision: {prec:.3f} | Recall: {rec:.3f} | F1: {f1:.3f}")
print("\nClassification report:\n", classification_report(y_test, y_pred, digits=3))

In [None]:
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=["No Default","Default"], yticklabels=["No Default","Default"])
plt.xlabel("Predicted"); plt.ylabel("Actual"); plt.title("Confusion Matrix — XGBoost (thr=0.50)")
plt.tight_layout(); plt.show()

checking for a random point in the data

In [None]:
sample_row = X_test.sample(1, random_state=42)
X_test_sample = sample_row.drop(columns=["default", "pd"], errors="ignore")

In [None]:
prob_default = best_pipe.predict_proba(X_test_sample)[:, 1][0]

#to get binary prediction
pred_label = int(prob_default >= 0.5)

print(f"Predicted Probability of Default: {prob_default:.3f}")
print(f"Predicted Class (1=Default, 0=No Default): {pred_label}")


In [None]:
!pip install shap

In [None]:
import shap
import matplotlib.pyplot as plt

In [None]:
import shap
import matplotlib.pyplot as plt

In [None]:
explainer = shap.TreeExplainer(best_pipe.named_steps["model"])
sample_data = sample_row.copy()

In [None]:
shap_values = explainer(sample_data)

In [None]:
shap.initjs()
shap.force_plot(explainer.expected_value, shap_values.values, sample_data)

In [None]:
shap.plots.bar(shap_values, max_display=10)

In [None]:
from joblib import dump

In [None]:
dump(best_pipe, "RuralCreditModel.joblib")

In [None]:
from google.colab import files
files.download("RuralCreditModel.joblib")

In [None]:
import os
os.makedirs("DMDW_Rural_Credit_Project", exist_ok=True)

In [None]:
import shutil

In [None]:
shutil.copy("RuralCreditModel.joblib", "DMDW_Rural_Credit_Project/")
shutil.copy("feature_columns.json", "DMDW_Rural_Credit_Project/")
shutil.copy("RuralCreditData_preprocessed.csv", "DMDW_Rural_Credit_Project/")
shutil.copy("DMDW_project.ipynb", "DMDW_Rural_Credit_Project/")