In [1]:
import math

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import os

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, TensorDataset


In [2]:
df = pd.read_csv("../Data PCA/cleaned_augmented_2.csv")

X = df.iloc[:,:-1]
y = df.iloc[:,-1]

class_1 = int(len(df[df['Diabetes_binary'] == 1]))

# List of components to iterate over
components_list = [3, 4, 5, 6, 7, 8, 9, 10]

# Create a folder to store results CSV files if it doesn't exist
if not os.path.exists('Results'):
    os.makedirs('Results')

In [3]:
# Define the RBF neural network
class RBFNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(RBFNN, self).__init__()
        self.hidden_dim = hidden_dim
        self.centers = nn.Parameter(torch.rand(hidden_dim, input_dim))
        self.beta = nn.Parameter(torch.rand(hidden_dim))
        self.linear = nn.Linear(hidden_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.drop = nn.Dropout(p=0.2)
        self.linear3 = nn.Linear(hidden_dim, output_dim)

    def radial_basis_function(self, x):
        return torch.exp(-self.beta * (x.unsqueeze(1) - self.centers).pow(2).sum(2))

    def forward(self, x):
        rbf_out = self.radial_basis_function(x)
        h1 = self.linear(rbf_out)
        h2 = self.linear2(self.drop(h1))
        output = self.linear3(h2)
        return output, rbf_out

In [6]:
# Loop over the number of components
for n_components in components_list:
    results = []
    # Perform PCA transformation
    pca = PCA(n_components=n_components, random_state=69)
    X_pca = pca.fit_transform(X)
    
    # Balancing classes after PCA
    df_pca = pd.concat([
        df[df['Diabetes_binary'] == 0].sample(class_1),
        df[df['Diabetes_binary'] == 1].sample(class_1)
    ], ignore_index=True)

    df_pca = df_pca.sample(frac=1).reset_index(drop=True)

    X_bal_pca = df_pca.iloc[:,:-1]
    y_bal_pca = df_pca.iloc[:,-1]

    # Split data into training, testing, and validation sets after PCA
    total_samples_pca = len(X_bal_pca)
    train_size_pca = int(0.8 * total_samples_pca)
    val_size_pca = (total_samples_pca - train_size_pca) // 2
    test_size_pca = total_samples_pca - train_size_pca - val_size_pca

    X_tensor_pca = torch.tensor(X_bal_pca.values, dtype=torch.float32)
    y_tensor_pca = torch.tensor(y_bal_pca.values, dtype=torch.float32)

    dataset_pca = TensorDataset(X_tensor_pca, y_tensor_pca)
    train_data_pca, val_data_pca, test_data_pca = random_split(dataset_pca, [train_size_pca, val_size_pca, test_size_pca])

    train_loader_pca = DataLoader(train_data_pca, batch_size=32, shuffle=True)
    val_loader_pca = DataLoader(val_data_pca, batch_size=32)
    test_loader_pca = DataLoader(test_data_pca, batch_size=32)

    # Training the model after PCA
    input_dim_pca = X_bal_pca.shape[1]
    hidden_dim = 64
    output_dim = 1

    for optimizer_type in [optim.Adam, optim.SGD]:
        for lr in [0.001, 0.005, 0.01, 0.05, 0.1]:
            model = RBFNN(input_dim_pca, hidden_dim, output_dim)

            criterion = nn.BCEWithLogitsLoss()
            optimizer = optimizer_type(model.parameters(), lr=lr)

            # Training loop
            num_epochs = 50
            train_losses = []
            val_losses = []
            train_accuracies = []
            val_accuracies = []

            for epoch in range(1, num_epochs + 1):
                model.train()
                total_correct_train = 0
                total_samples_train = 0
                for inputs, labels in train_loader_pca:
                    optimizer.zero_grad()
                    outputs, _ = model(inputs)
                    loss = criterion(outputs.squeeze(), labels)
                    loss.backward()
                    optimizer.step()

                    predictions = torch.sigmoid(outputs).round().squeeze()
                    total_correct_train += (predictions == labels).sum().item()
                    total_samples_train += len(labels)

                train_accuracy = total_correct_train / total_samples_train
                train_accuracies.append(train_accuracy)
                train_losses.append(loss.item())

                model.eval()
                total_correct_val = 0
                total_samples_val = 0
                val_loss = 0
                with torch.no_grad():
                    for inputs, labels in val_loader_pca:
                        outputs, _ = model(inputs)
                        loss = criterion(outputs.squeeze(), labels)
                        val_loss += loss.item()

                        predictions = torch.sigmoid(outputs).round().squeeze()
                        total_correct_val += (predictions == labels).sum().item()
                        total_samples_val += len(labels)

                val_accuracy = total_correct_val / total_samples_val
                val_accuracies.append(val_accuracy)
                val_losses.append(val_loss / len(val_loader_pca))

                if epoch%10 == 0:
                    print(f'Epoch {epoch}/{num_epochs}')
                    print(f'Training Loss: {loss.item():.4f}, Training Accuracy: {train_accuracy:.4f}')
                    print(f'Validation Loss: {val_loss / len(val_loader_pca):.4f}, Validation Accuracy: {val_accuracy:.4f}')
                    print()

            # Plot and save figures
            plt.figure(figsize=(10, 5))

            # Losses
            plt.subplot(1, 2, 1)
            plt.plot(range(1, num_epochs + 1), train_losses, label='Train')
            plt.plot(range(1, num_epochs + 1), val_losses, label='Validation')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.title(f'{n_components}_components_Loss Over Epochs')
            plt.legend()
            plt.grid(True)

            # Accuracies
            plt.subplot(1, 2, 2)
            plt.plot(range(1, num_epochs + 1), train_accuracies, label='Train')
            plt.plot(range(1, num_epochs + 1), val_accuracies, label='Validation')
            plt.xlabel('Epoch')
            plt.ylabel('Accuracy')
            plt.title(f'{n_components}_components_Accuracy Over Epochs')
            plt.legend()
            plt.grid(True)

            plt.tight_layout()
            plt.savefig(f'NNPlots/{n_components}_components_{optimizer_type.__name__}_{lr}_accuracy.png')
            plt.close()

            # Calculate metrics on test set
            with torch.no_grad():
                failure = False
                TP = 0
                TN = 0
                FP = 0
                FN = 0
                total_correct_test = 0
                total_samples_test = 0
                for inputs, labels in test_loader_pca:
                    outputs, _ = model(inputs)
                    predictions = torch.sigmoid(outputs).round().squeeze()
                    total_correct_test += (predictions == labels).sum().item()
                    total_samples_test += len(labels)

                    for i in range(len(labels)):
                        if labels[i].item() == 1 and predictions[i].item() == 1:
                            TP += 1
                        elif labels[i].item() == 1 and predictions[i].item() == 0:
                            FN += 1
                        elif labels[i].item() == 0 and predictions[i].item() == 1:
                            FP += 1
                        elif labels[i].item() == 0 and predictions[i].item() == 0:
                            TN += 1
                        else:
                            print(labels[i].item() == 1)
                            print(predictions[i].item() == 1)
                            failure = True
                            print(labels[i].item(), predictions[i].item())

                try:
                    precision = TP / (TP + FP) if TP + FP != 0 else "Couldn't compute"
                except Exception as e:
                    precision = "NA"
                    message = f"{e}\tFAILURE\noptimizer: {optimizer_type.__name__}\nlearning_rate: {lr}\n"
                    print(message)
                    failure = True
                try:
                    recall = TP / (TP + FN) if TP + FN != 0 else "Couldn't compute"
                except Exception as e:
                    recall = "NA"
                    message = f"{e}\tFAILURE\noptimizer: {optimizer_type.__name__}\nlearning_rate: {lr}\n"
                    print(message)
                    failure = True
                try:
                    f1_score = (2 * precision * recall) / (precision + recall) if precision + recall != 0 else "Couldn't compute"
                except Exception as e:
                    f1_score = "NA"
                    message = f"{e}\tFAILURE\noptimizer: {optimizer_type.__name__}\nlearning_rate: {lr}\n"
                    print(message)
                    failure = True
                try:
                    val_accuracy = (TP + TN) / (TP + FP + TN + FN) if TP + FP + TN + FN != 0 else "Couldn't compute"
                except Exception as e:
                    val_accuracy = "NA"
                    message = f"{e}\tFAILURE\noptimizer: {optimizer_type.__name__}\nlearning_rate: {lr}\n"
                    print(message)
                    failure = True

                results.append({'optimizer': optimizer_type.__name__,
                                'learning_rate': lr,
                                'accuracy': val_accuracy,
                                'precision': precision,
                                'recall': recall,
                                'F1_score': f1_score,
                                'failure': failure})
    # Save results to CSV
    results_df = pd.DataFrame(results)
    results_df.to_csv(f'Results/results_{n_components}_components.csv', index=False)

KeyboardInterrupt: 