In [None]:
import numpy as np 
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader


import pandas as pd
import matplotlib.pyplot as plt
from typing import List
from collections import OrderedDict
import hiddenlayer as hl
from tqdm import tqdm


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset

In [None]:
# dataset : load using pandas

class HeartDiseaseDatasets(Dataset):

        def __init__(self):
            
            training = pd.read_csv("heart_train.csv", index_col="Unnamed: 0", dtype=np.float32).values
            #testing = pd.read_csv("heart_train.csv", index_col="Unnamed: 0").values

            self.n_train = training.shape[0]
            #self.n_test = testing.shape[0]

            X_train = training[..., :-1] #, testing[..., :-1]

            y_train = training[..., np.newaxis, -1] #, testing[..., [-1]] # target is the last column

            self.X_train = X_train

            self.y_train = y_train

        def __getitem__(self, index):
            return self.X_train[index], self.y_train[index]

        def __len__(self):
            return self.n_train           

In [None]:
dataset = HeartDiseaseDatasets()

first_data = dataset[0]
features, labels = first_data
print(features, labels)

In [None]:
dataset.__len__()

In [None]:
train_loader = DataLoader(dataset=dataset, 
                            batch_size=64,
                            shuffle=True,
                            num_workers=0)

# Model

In [None]:
# defining model : subclassing 

class HeartDiseaseClassifier(nn.Module):

    # initialize model and its layers 
    def __init__(self, input_shape, output_shape, hiddens : List):
        super(HeartDiseaseClassifier, self).__init__()
        # inherit __init__ from nn.Module
        self.input_layer = nn.Linear(input_shape, hiddens[0]) 
        # an input layer -> corresponding to the input shape
        self.hidden_layers = nn.Sequential(OrderedDict([(f"hidden{i+1}",nn.Linear(hiddens[i], hiddens[i+1])) for i in range(len(hiddens) - 1)]))
        # the use of sequential model make us enable to add multiple hidden layers w/o hard coding
        self.output_layer = nn.Linear(hiddens[-1], output_shape)
        # output layer
        self.activation = nn.ReLU()
        # non-linear activation for layers
        self.classifier_activation = nn.Sigmoid()
        # non-linear activation for last classifier layer
    def forward(self, x):
        # define forward pass of the model
        x = self.activation(self.input_layer(x))
        # the first layer then non-linear activation
        for hidden_layer in self.hidden_layers:
            # loop through the sequential and add activation to the hidden outputs
            x = self.activation(hidden_layer(x))
        # finally, add sigmoid to the final layer
        x = self.classifier_activation(self.output_layer(x))

        return x

    def fit(self, dataloader, epochs=1, optimizer=None, criterion=None, device=None, lr=0.001):
        # define training function
        # defining optimizer : the algorithm that optimize the model parameters
        _optimizer = optimizer(self.parameters(), lr=lr)
        reported_loss = []
        for epoch in range(epochs):
            collective_loss = []
            for _, (features, labels) in tqdm(enumerate(dataloader)):
                
                # load the data (features and labels) to a particular device
                features.to(device)
                labels.to(device)

                # forward passing

                prediction = self.forward(features)
                loss = criterion(prediction, labels)

                # backward passing

                _optimizer.zero_grad()
                loss.backward()
                _optimizer.step()

                collective_loss.append(loss.item())
            
            reported_loss.append(np.mean(collective_loss))
            print(f"epoch {epoch+1} finished!", f"loss = {np.mean(collective_loss)}")

        print("Training finished!")

        return np.array(reported_loss)
        



                


In [None]:
model = HeartDiseaseClassifier(dataset.X_train.shape[1], 1, [10, 5, 2])

In [None]:
model

In [None]:
# visualizing model 
transforms = [hl.transforms.Prune('Constant')] # Removes Constant nodes from graph.

with torch.no_grad():
    graph = hl.build_graph(model, torch.zeros([1, 13]), transforms=transforms)
    graph.theme = hl.graph.THEMES['blue'].copy()
    graph.save('HeartDisease_hiddenlayer', format='png')

In [None]:
# training
model.fit(train_loader, epochs=1000, optimizer=torch.optim.Adam, criterion=nn.BCELoss(), device=device, lr=0.001)

# Testing

In [None]:
class HeartDiseaseTest(Dataset):

        def __init__(self):
            
            testing = pd.read_csv("heart_test.csv", index_col="Unnamed: 0", dtype=np.float32).values
            #testing = pd.read_csv("heart_train.csv", index_col="Unnamed: 0").values

            self.n_test = testing.shape[0]
            #self.n_test = testing.shape[0]

            X_test = testing[..., :-1] 

            y_test = testing[..., np.newaxis,-1] 

            self.X_test = X_test

            self.y_test = y_test

        def __getitem__(self, index):
            return self.X_test[index], self.y_test[index]

        def __len__(self):
            return self.n_test           

In [None]:
testset  =HeartDiseaseTest()
test_loader = DataLoader(testset,
                        batch_size=128,
                        shuffle=True,
                        num_workers=0)

In [None]:
def logits_decoding(logit):
    logit = torch.where(logit >= 0.5, torch.ones(logit.shape,dtype=torch.float32), logit)
    logit = torch.where(logit < 0.5, torch.zeros(logit.shape,dtype=torch.float32), logit)
    return logit

In [None]:
logit = torch.randn([3, 3, 3], dtype=torch.float32)

logits_decoding(logit)

In [None]:
with torch.no_grad():
    n_correct = 0
    n_samples = 0

    for features, labels in test_loader:
        features.to(device)
        labels.to(device)

        logit = model(features)
        prediction = logits_decoding(logit)
        n_samples += labels.size(0)
        n_correct += (prediction == labels).sum().item()

    accuracy = 100 * n_correct / n_samples
    print(accuracy)
