# Strategy for KNN

for sure i will need to randomly undersample and SMOTE oversample the training set (TRAIN ONLY) to keep balance

Use correlation between features of the dataset

In [46]:
import pandas as pd
import numpy as np
import os
import pickle
import time
import random
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.pipeline import Pipeline as SkPipeline
from sklearn.model_selection import GridSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import balanced_accuracy_score
from imblearn.pipeline import Pipeline as ImbPipeline # Replaces sklearn Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler


import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# load the dataset and fix values

In [47]:
# Use the GPU
if torch.backends.mps.is_available():
    print("MPS device is available.")
    device = torch.device("mps")
elif torch.cuda.is_available():
    print("CUDA device is available.")
    device = torch.device("cuda")
else:
    print("No GPU acceleration available.")
    device = torch.device("cpu")

# Fix the seed to have deterministic behaviour
def fix_random(seed: int) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True  # slower

SEED = 1337
fix_random(SEED)

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

DATASET_PATH = "dataset_train/dataset.csv"
dataset = pd.read_csv(DATASET_PATH, delimiter=",")

print(f"Shape of the dataset: {dataset.shape}")
duplicates = dataset[dataset.duplicated()]
print(f"Number of duplicates in the dataset: {duplicates.shape[0]}")

MPS device is available.
Shape of the dataset: (148301, 145)
Number of duplicates in the dataset: 0


## split the dataset

In [48]:
X = dataset.drop(columns=["grade"])
y = dataset["grade"].map({"A": 6, "B": 5, "C": 4, "D": 3, "E": 2, "F": 1, "G": 0})

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [49]:
class NumericExtractor(BaseEstimator, TransformerMixin):
    """Extracts integers from strings using regex"""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in X.columns:
            X[col] = X[col].astype(str).str.extract(r"(\d+)").astype(float)
        return X

class CyclicalDateEncoder(BaseEstimator, TransformerMixin):
    """Converts mm-yyyy to year + sine/cosine month encoding."""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in X.columns:
            # errors="coerce" turns unparseable data/NaNs into NaT
            date_series = pd.to_datetime(X[col], format="%b-%Y", errors="coerce")
            # If date is NaT, these become NaN, which we handle in the pipeline later
            angle = 2 * np.pi * date_series.dt.month / 12

            X[f"{col}_year"] = date_series.dt.year
            X[f"{col}_month_sin"] = np.sin(angle)
            X[f"{col}_month_cos"] = np.cos(angle)
            
            X.drop(columns=[col], inplace=True)
        return X
    
class BinaryModeEncoder(BaseEstimator, TransformerMixin):
    """"Encodes 0 if value is mode, 1 if not"""
    def __init__(self):
        self.modes_ = {}

    def fit(self, X, y=None):
        # Calculate mode for each column and store it
        for col in X.columns:
            self.modes_[col] = X[col].mode()[0]
        return self

    def transform(self, X):
        X_copy = X.copy()
        for col, mode in self.modes_.items():
            # Apply: 1 if NOT the mode (least frequent), 0 if mode
            X_copy[col] = (X_copy[col] != mode).astype(int)
        return X_copy
    
class HighMissingDropper(BaseEstimator, TransformerMixin):
    """Drops columns with high missing percentage. Fits only on training data."""
    
    def __init__(self, threshold=20):
        self.threshold = threshold
        self.cols_to_drop_ = []

    def fit(self, X, y=None):
        # Calculate missing percentages only on training data
        missing_percentages = X.isna().mean() * 100
        self.cols_to_drop_ = missing_percentages[missing_percentages > self.threshold].index.tolist()
        return self

    def transform(self, X):
        X = X.copy()
        return X.drop(columns=self.cols_to_drop_)

In [50]:
redundant_cols = ['loan_title']
binary_cols = ["loan_payment_plan_flag", "listing_initial_status", "application_type_label",
               "hardship_flag_indicator", "disbursement_method_type", "debt_settlement_flag_indicator"]
one_hot_encoding_cols = ["borrower_housing_ownership_status", "borrower_income_verification_status",
                       "loan_status_current_code", "loan_purpose_category", "borrower_address_state"]
extract_fields = ["loan_contract_term_months", "borrower_profile_employment_length", "borrower_address_zip"]
date_fields = ["loan_issue_date", "credit_history_earliest_line", "last_payment_date", "last_credit_pull_date"]

In [51]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import StratifiedKFold, cross_val_score

# Pipeline 1: Numerics (Extract -> Then Impute Median)
numeric_pipe = SkPipeline([
    ('extract', NumericExtractor()),
    ('impute', SimpleImputer(strategy='median')) # Handles NaNs created by extractor
])

# Pipeline 2: Categoricals (Impute "Missing" -> Then OHE)
categorical_pipe = SkPipeline([
    ('impute', SimpleImputer(strategy='constant', fill_value='missing')), # Prevent OHE crash
    ('ohe', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
])

# Pipeline 3: Dates (Encode -> Then Impute Median for Sin/Cos/Year)
date_pipe = SkPipeline([
    ('cyclical', CyclicalDateEncoder()),
    ('impute', SimpleImputer(strategy='median')) # Imputing sin/cos is mathematically valid
])

# Pipeline 4: Binary (Encode -> Impute)
binary_pipe = SkPipeline([
    ('binary_enc', BinaryModeEncoder()), 
    ('impute', SimpleImputer(strategy='most_frequent')) # Safety net
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num_pipe', numeric_pipe, extract_fields),
        ('cat_pipe', categorical_pipe, one_hot_encoding_cols),
        ('date_pipe', date_pipe, date_fields),
        ('bin_pipe', binary_pipe, binary_cols),
        ('drop_redundant', 'drop', redundant_cols)
    ], 
    # Use a median imputer for any columns not listed (passthrough)
    remainder=SimpleImputer(strategy='median') 
)

# Helper to create the tail end of the pipeline (Dimensionality Reduction + Model)
def get_model_tail():
    return [
        ('pca', PCA(n_components=0.95)), 
        ('lda', LDA()), 
        ('knn', KNeighborsClassifier(n_neighbors=5, weights='distance'))
    ]

# Dictionary containing the 3 strategies to test
pipelines = {
    # 1. Baseline: No resampling
    "Baseline (No Resampling)": ImbPipeline([
        ('dropper', HighMissingDropper(threshold=20)),
        ('prep', preprocessor),
        ('scaler', StandardScaler()), # Scale BEFORE PCA
        *get_model_tail()
    ]),

    # 2. Undersampling: Reduces majority classes to match minority
    "Random Undersampling": ImbPipeline([
        ('dropper', HighMissingDropper(threshold=20)),
        ('prep', preprocessor),
        ('scaler', StandardScaler()), # Scale BEFORE sampling
        ('sampler', RandomUnderSampler(random_state=42)),
        *get_model_tail()
    ]),

    # 3. SMOTE: Synthetically generates minority class samples
    "SMOTE Oversampling": ImbPipeline([
        ('dropper', HighMissingDropper(threshold=20)),
        ('prep', preprocessor),
        ('scaler', StandardScaler()), # Scale BEFORE sampling so distances are valid
        ('sampler', SMOTE(random_state=42)),
        *get_model_tail()
    ])
}

for name, pipeline in pipelines.items():
    print(f"\nTraining: {name}...")
    
    # Fit on training data (Resampling happens here automatically)
    pipeline.fit(X_train, y_train)
    
    # Predict on test data (Resampling is skipped here automatically)
    y_pred = pipeline.predict(X_test)
    
    # Scores
    print(f"--- Results for {name} ---")
    acc = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {acc:.4f}")
    
    bacc = balanced_accuracy_score(y_test, y_pred)
    print(f"Balanced Accuracy: {bacc:.4f}")
    
    f1 = f1_score(y_test, y_pred, average="weighted")
    print(f"F1 score: {f1:.4f}")



Training: Baseline (No Resampling)...
--- Results for Baseline (No Resampling) ---
Accuracy: 0.5191
Balanced Accuracy: 0.4630
F1 score: 0.5175

Training: Random Undersampling...
--- Results for Random Undersampling ---
Accuracy: 0.5348
Balanced Accuracy: 0.5172
F1 score: 0.5341

Training: SMOTE Oversampling...
--- Results for SMOTE Oversampling ---
Accuracy: 0.5572
Balanced Accuracy: 0.5354
F1 score: 0.5566
