<a href="https://colab.research.google.com/github/Marcusleeleelee/FTEC4998-4999/blob/main/FTEC4998_4999.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [38]:
# Import necessary packages
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from google.colab import drive
from tqdm import tqdm
from sklearn.ensemble import BaggingClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Step 1: Utils - ok
def uni_list(input): return list(set(input))

In [4]:
class Dataset():
    def __init__(self, file_path):
        self.dataset = pd.read_feather(file_path)
        self.X_train, self.y_train = None, None
        self.X_test, self.y_test = None, None
        self.scalers = None
        self.pca = None
        self.label = 'loan_condition_cat'
        self.original_columns = None

    def show(self, rows=10):
        return self.dataset.head(rows)

    def basic_processing(self):
        temp_func_1 = lambda x: '<=2009' if str(x) in ['2007', '2008', '2009'] else ("[2010, 2012]" if str(x) in ['2010', '2011', '2012'] else '>=2013')
        columns_to_delete = [
            'id', 'issue_d', 'home_ownership_cat', 'income_category', 'income_cat', 'term_cat', 'application_type_cat',
            'purpose_cat', 'interest_payment_cat', 'loan_condition'
        ]
        self.dataset.drop(columns=columns_to_delete, inplace=True)
        self.dataset['grade'] = self.dataset['grade'].apply(temp_func_1)
        self.dataset['final_d'] = self.dataset['final_d'].apply(lambda x: str(x)[-4:]).apply(temp_func_1)
        self.dataset = pd.get_dummies(self.dataset, columns=['year', 'final_d', 'home_ownership', 'term', 'application_type',
                                                             'purpose', 'interest_payments', 'grade', 'region'], dtype=int)

    def train_test_split(self, test_size=0.2, random_state=42):
        X = self.dataset.drop(columns=[self.label])
        y = self.dataset[self.label]
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
        self.original_columns = X.columns

    def preprocessing_train(self):
        scaler = StandardScaler()
        self.X_train = pd.DataFrame(scaler.fit_transform(self.X_train), columns=self.original_columns)
        self.scalers = scaler

        # Perform PCA
        self.pca = PCA(n_components=30)
        pca_components = self.pca.fit_transform(self.X_train)
        self.X_train = pd.DataFrame(pca_components, columns=self.original_columns[:pca_components.shape[1]])

    def preprocessing_test(self):
        # Apply stored scalers
        self.X_test = pd.DataFrame(self.scalers.transform(self.X_test), columns=self.original_columns)

        # Apply PCA
        pca_components = self.pca.transform(self.X_test)
        self.X_test = pd.DataFrame(pca_components, columns=self.original_columns[:pca_components.shape[1]])

In [5]:
# Calculating # ok
data = Dataset('/content/drive/My Drive/Colab Notebooks/FTEC4998_9/loan_final313_processed.feather')
data.basic_processing()
data.train_test_split()
data.preprocessing_train()
data.preprocessing_test()

In [12]:
# Data conversion # ok
train_x, train_y = data.X_train, data.y_train
test_x, test_y = data.X_test, data.y_test
X_train, y_train = train_x.to_numpy(), train_y.values.ravel()
X_test, y_test = test_x.to_numpy(), test_y.values.ravel()
counts = np.mean(y_train == 1) * 100
print(counts)
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)
# Ensure y_train is binary
assert set(y_train).issubset({0, 1}), "Target values must be 0 or 1 for binary classification."

# Convert to tensors and move to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1).to(device)

7.5910370853482805
(709903, 30) (709903,)
(177476, 30) (177476,)


In [7]:
# Train, predict, and accuracy functions
def train_model(model): # ok
    print("Training")
    model.train()
    for epoch in range(model.epochs):
        model.optimizer.zero_grad()
        outputs = model(model.X_train_tensor)
        loss = model.criterion(outputs, model.y_train_tensor)
        loss.backward()
        model.optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}')


def predict_model(model, X_tensor):
    print('Predicting')
    model.eval()
    with torch.no_grad():
        outputs = model(X_tensor).squeeze()
        return (outputs > 0.5).float()

def calculate_accuracy(model, X_tensor, y_tensor):
    print('Calculating Accuracy.')
    X_tensor = X_tensor.to(next(model.parameters()).device)
    y_tensor = y_tensor.to(next(model.parameters()).device)
    predictions = predict_model(model, X_tensor)

    # Ensure predictions and labels are the same shape
    predictions = predictions.squeeze()
    y_tensor = y_tensor.squeeze()

    correct = (predictions == y_tensor).sum().item()
    accuracy = correct / y_tensor.size(0)
    return accuracy

In [None]:
# Train ANN
# MLP model
class ANN(nn.Module):
    def __init__(self, X_train_tensor, y_train_tensor, lr=0.001):
        super(ANN, self).__init__()
        self.input_dim = X_train_tensor.shape[1]
        self.net = nn.Sequential(
            nn.Linear(self.input_dim, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.BatchNorm1d(32),
            nn.LeakyReLU(0.03),
            nn.Dropout(0.3),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
        self.criterion = nn.BCELoss()
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.epochs = 100
        self.y_train_tensor = y_train_tensor
        self.X_train_tensor = X_train_tensor

    def forward(self, x):
        return self.net(x)
ann_model = ANN(X_train_tensor, y_train_tensor).to(device)
train_model(ann_model)
ann_predictions = predict_model(ann_model, X_train_tensor).cpu().numpy()
print('ANN train:', calculate_accuracy(ann_model, ann_model.X_train_tensor, ann_model.y_train_tensor))
print('ANN test:', calculate_accuracy(ann_model, X_test_tensor, y_test_tensor))

In [42]:
print(ann_predictions)
print(type(ann_predictions))
print(np.unique(ann_predictions))

[0. 0. 0. ... 0. 0. 0.]
<class 'numpy.ndarray'>
[0. 1.]


In [77]:
# Logistic Regression as a neural network
class LogisticRegressionModel(nn.Module):
    def __init__(self, X_train_tensor, y_train_tensor, lr=0.01):
        super(LogisticRegressionModel, self).__init__()
        self.input_dim = X_train_tensor.shape[1]
        self.net = nn.Sequential(
            nn.Linear(self.input_dim, 1),
            nn.Sigmoid()
        )
        self.criterion = nn.BCELoss()
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.epochs = 700
        self.y_train_tensor = y_train_tensor
        self.X_train_tensor = X_train_tensor

    def forward(self, x):
        return self.net(x)
log_reg = LogisticRegressionModel(X_train_tensor, y_train_tensor).to(device)
train_model(log_reg)
log_reg_predictions = predict_model(log_reg, X_train_tensor).cpu().numpy()
print('Logistic Regression train:', calculate_accuracy(log_reg, log_reg.X_train_tensor, log_reg.y_train_tensor))
print('Logistic Regression test:', calculate_accuracy(log_reg, X_test_tensor, y_test_tensor))

Epoch 0, Loss: 0.6930169463157654
Epoch 10, Loss: 0.5847083926200867
Epoch 20, Loss: 0.5268315672874451
Epoch 30, Loss: 0.490772545337677
Epoch 40, Loss: 0.4561319351196289
Epoch 50, Loss: 0.4263942539691925
Epoch 60, Loss: 0.4009254574775696
Epoch 70, Loss: 0.37893223762512207
Epoch 80, Loss: 0.3597172498703003
Epoch 90, Loss: 0.342867910861969
Epoch 100, Loss: 0.3280279040336609
Epoch 110, Loss: 0.3149062991142273
Epoch 120, Loss: 0.3032650649547577
Epoch 130, Loss: 0.2929040491580963
Epoch 140, Loss: 0.28365564346313477
Epoch 150, Loss: 0.27537810802459717
Epoch 160, Loss: 0.2679508328437805
Epoch 170, Loss: 0.26127129793167114
Epoch 180, Loss: 0.25525131821632385
Epoch 190, Loss: 0.2498154640197754
Epoch 200, Loss: 0.24489754438400269
Epoch 210, Loss: 0.2404409945011139
Epoch 220, Loss: 0.23639599978923798
Epoch 230, Loss: 0.23271894454956055
Epoch 240, Loss: 0.22937129437923431
Epoch 250, Loss: 0.22632013261318207
Epoch 260, Loss: 0.22353503108024597
Epoch 270, Loss: 0.22099004685

In [78]:
print(log_reg_predictions)
print(type(log_reg_predictions))
print(np.unique(log_reg_predictions))

[0. 0. 0. ... 0. 0. 0.]
<class 'numpy.ndarray'>
[0. 1.]


In [35]:
class SVMClassifier():
    def __init__(self, train_x, train_y, fraction=0.1, n_samples=10000000000000):
        # Sample a fraction of the data
        n = min(int(len(train_x) * fraction), n_samples)
        self.X_train = train_x.iloc[:n, :]
        self.y_train = train_y.iloc[:n]
        self.model = None
    def fit(self):
        print("Training.")
        # Use Bagging with SVM
        self.model = BaggingClassifier(
            estimator=SVC(C=0.1, kernel='poly', degree=5, gamma='scale'),
            n_estimators=6,
            random_state=42,
            max_samples= 0.05
        )
        self.model.fit(self.X_train, self.y_train)

    def predict(self, X_test):
        print('Predicting.')
        return self.model.predict(X_test).astype(float)

    def calculate_accuracy(self, X, y):
        print('Calculating Accuracy.')
        predictions = self.predict(X)
        accuracy = accuracy_score(y, predictions)
        return accuracy
SVM_model = SVMClassifier(train_x, train_y)
SVM_model.fit()
SVM_predictions = SVM_model.predict(SVM_model.X_train)
print('SVM train: ', SVM_model.calculate_accuracy(SVM_model.X_train, SVM_model.y_train))
print('SVM test: ', SVM_model.calculate_accuracy(test_x, test_y))

Training.
Predicting.
Calculating Accuracy.
Predicting.
SVM train:  0.9309480208480068
Calculating Accuracy.
Predicting.
SVM test:  0.9305370867046812


In [26]:
print(SVM_predictions)
print(type(SVM_predictions))
print(np.unique(SVM_predictions))

[0 0 0 ... 0 0 0]
<class 'numpy.ndarray'>
[0 1]


In [40]:
class NaiveBayesClassifier:
    def __init__(self, X_train, y_train, priors=None, var_smoothing=1e-9):
        self.model = GaussianNB(priors=priors, var_smoothing=var_smoothing)
        self.X_train = X_train
        self.y_train = y_train

    def fit(self):
        self.model.fit(self.X_train, self.y_train)

    def predict(self, X_test):
        return self.model.predict(X_test).astype(float)

    def calculate_accuracy(self, X, y):
        predictions = self.predict(X)
        accuracy = accuracy_score(y, predictions)
        return accuracy

    def confusion_matrix(self, X, y):
        predictions = self.predict(X)
        return confusion_matrix(y, predictions)

    def classification_report(self, X, y):
        predictions = self.predict(X)
        return classification_report(y, predictions)

# Example usage
NB_model = NaiveBayesClassifier(train_x, train_y)
NB_model.fit()
NB_predictions = NB_model.predict(NB_model.X_train)
print('Naive Bayes train accuracy:', NB_model.calculate_accuracy(NB_model.X_train, NB_model.y_train))
print('Naive Bayes test accuracy:', NB_model.calculate_accuracy(test_x, test_y))

Naive Bayes train accuracy: 0.8606020822563083
Naive Bayes test accuracy: 0.8602853343550677


In [42]:
print(NB_predictions)
print(type(NB_predictions))
print(np.unique(NB_predictions))

[0. 0. 0. ... 0. 0. 0.]
<class 'numpy.ndarray'>
[0. 1.]


In [46]:
class RandomForestModel():
    def __init__(self, X_train, y_train, n_estimators=10, max_depth=None, random_state=42):
        self.model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=random_state, max_samples = 0.05)
        self.X_train = X_train
        self.y_train = y_train

    def fit(self):
        self.model.fit(self.X_train, self.y_train)

    def predict(self, X_test):
        return self.model.predict(X_test).astype(float)

    def calculate_accuracy(self, X, y):
        predictions = self.predict(X)
        accuracy = accuracy_score(y, predictions)
        return accuracy

    def confusion_matrix(self, X, y):
        predictions = self.predict(X)
        return confusion_matrix(y, predictions)

    def classification_report(self, X, y):
        predictions = self.predict(X)
        return classification_report(y, predictions)

# Example usage
RF_model = RandomForestModel(train_x, train_y)
RF_model.fit()
RF_predictions = RF_model.predict(RF_model.X_train)
print('Random Forest train:', RF_model.calculate_accuracy(RF_model.X_train, RF_model.y_train))
print('Random Forest test:', RF_model.calculate_accuracy(test_x, test_y))

Random Forest train: 0.948521135986184
Random Forest test: 0.9470519957628074


In [47]:
print(RF_predictions)
print(type(RF_predictions))
print(np.unique(RF_predictions))

[0. 0. 0. ... 0. 0. 0.]
<class 'numpy.ndarray'>
[0. 1.]
