In [None]:
# %% Deep learning - Section 14.138
#    FFN project 1: predicting heart disease
#    1) Use a reduced version of the UCI heart disease dataset
#    2) Clean and preprocess the data
#    3) Build a FFN to predict disease (binarise)
#    4) Focus on getting it to work, rather than on maximising accuracy

# This code pertains a deep learning course provided by Mike X. Cohen on Udemy:
#   > https://www.udemy.com/course/deeplearning_x
# The "base" code in this repository is adapted (with very minor modifications)
# from code developed by the course instructor (Mike X. Cohen), while the
# "exercises" and the "code challenges" contain more original solutions and
# creative input from my side. If you are interested in DL (and if you are
# reading this statement, chances are that you are), go check out the course, it
# is singularly good.


In [147]:
# %% Libraries and modules
import numpy               as np
import matplotlib.pyplot   as plt
import torch
import torch.nn            as nn
import seaborn             as sns
import copy
import torch.nn.functional as F
import pandas              as pd
import scipy.stats         as stats
import sklearn.metrics     as skm
import time

from torch.utils.data                 import DataLoader,TensorDataset
from sklearn.model_selection          import train_test_split
from google.colab                     import files
from torchsummary                     import summary
from IPython                          import display
from matplotlib_inline.backend_inline import set_matplotlib_formats
set_matplotlib_formats('svg')
plt.style.use('default')


In [38]:
# %% Get the data

def get_data(url='https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.cleveland.data'):

    # Get data and set colummn names
    data = pd.read_csv(url,sep=',',header=None)
    data.columns = ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','oldpeak','slope','ca','thal','disease']

    # Look for "?" or other placeholders, concert to NaNs, and set to numeric
    data = data.replace('?',np.nan)
    data = data.apply(pd.to_numeric)

    # Fill numeric NaNs with median (too few observation to loose some of them)
    for col in data.columns:
        data[col] = data[col].fillna(data[col].median())

    # Z-score all the variables but quality
    cols2zscore = data.keys()
    cols2zscore = cols2zscore.drop('disease')

    for col in cols2zscore:
        mean_val  = np.mean(data[col])
        std_val   = np.std(data[col])
        data[col] = (data[col] - mean_val) / std_val

    # Binarise quality
    data.loc[:,'boolean_disease'] = 0
    data.loc[data['disease']>=1, 'boolean_disease'] = 1
    data.loc[data['disease']==0, 'boolean_disease'] = 0 # Implicit but here for clarity

    # Convert from pandas dataframe to PyTorch tensor
    data_T   = torch.tensor( data[cols2zscore].values ).float()
    labels_T = torch.tensor( data['boolean_disease'].values ).long().view(-1,1)

    print(f'Data shape: {data_T.shape}')
    print(f'Labels shape: {labels_T.shape}')

    # Split data with scikitlearn (train, dev, test)
    train_data,tmp_data, train_labels,tmp_labels = train_test_split(data_T,labels_T,test_size=0.2)
    dev_data,test_data, dev_labels,test_labels   = train_test_split(tmp_data,tmp_labels,test_size=0.5)

    # PyTorch datasets
    train_data = TensorDataset(train_data,train_labels)
    dev_data   = TensorDataset(dev_data,dev_labels)
    test_data  = TensorDataset(test_data,test_labels)

    # DataLoader objects (dot drop last, again, too few observation to loose some of them)
    batch_size   = 16
    train_loader = DataLoader(train_data,batch_size=batch_size,shuffle=True,drop_last=True)
    dev_loader   = DataLoader(dev_data,batch_size=dev_data.tensors[0].shape[0])
    test_loader  = DataLoader(test_data,batch_size=test_data.tensors[0].shape[0])

    return train_loader,dev_loader,test_loader,data


In [None]:
# %% Test data function and visualise

# Data
train_loader,dev_loader,test_loader,data = get_data()

# Plotting
phi = (1 + np.sqrt(5)) / 2
data.hist(bins=20,figsize=(phi*8,8))
plt.suptitle("Feature distributions (z-scored)")
plt.tight_layout(rect=[0,0,1,0.97])

plt.savefig('figure3_ffn_project_2.png')

plt.show()

files.download('figure3_ffn_project_2.png')


In [55]:
# %% Model class

# Optional parametrised metaparameters:
#  > number of layers and of units per layer
#  > starting learning rate
#  > optimizer (e.g. 'SGD', 'RMSprop', or 'Adam')
#  > L2 regularisation
#  > activation function (e.g., 'ReLU', 'LeakyReLU', 'ReLU6', or 'GELU')

def gen_model(n_units=16,n_layers=2,lr=0.01,optim='SGD',L2_lambda=0,act_fun='ReLU'):

    class model(nn.Module):
        def __init__(self,n_units,n_layers):
            super().__init__()

            # Dictionary to store the layers and the activation function
            self.layers  = nn.ModuleDict()
            self.nLayers = n_layers
            self.act_fun = act_fun

            # Architecture (input, hidden, output)
            # Input layer
            self.layers['input'] = nn.Linear(13,n_units)

            # Hidden layers
            for i in range(n_layers):
                self.layers[f'hidden{i}'] = nn.Linear(n_units,n_units)

            # Output layer
            self.layers['output'] = nn.Linear(n_units,3)

        def forward(self,x):

            # Input layer
            x = self.layers['input'](x)

            # Hidden layers (fetch selected activation function)
            act_fun = getattr(torch.nn,self.act_fun)()
            for i in range(self.nLayers):
                x = act_fun(self.layers[f'hidden{i}'](x))

            # Output layer
            x = self.layers['output'](x)

            return x

    # Model instance, loss function, and optimizer
    ANN       = model(n_units,n_layers)
    loss_fun  = nn.CrossEntropyLoss()
    opti_fun  = getattr( torch.optim,optim )
    optimizer = opti_fun(ANN.parameters(),lr=lr,weight_decay=L2_lambda)

    return ANN,loss_fun,optimizer


In [None]:
# %% Test model function

n_units   = 16
n_layers  = 2
lr        = 0.01
optim_alg = 'Adam'
L2_decay  = 0.01
act_fun   = 'ReLU'

ANN,loss_fun,optimizer = gen_model(n_units,n_layers,lr,optim_alg,L2_decay,act_fun)
print(ANN)
print(loss_fun)
print(optimizer)


In [61]:
# %% Function to train the model

# Optional parametrised metaparameters:
#  > number of epochs

def train_model(num_epochs=50):

    # Epochs and fresh model instance
    num_epochs = num_epochs
    ANN,loss_fun,optimizer = gen_model(n_units,n_layers,lr,optim,L2_lambda,act_fun)

    # Preallocate vars
    train_loss = torch.zeros(num_epochs)
    train_acc  = torch.zeros(num_epochs)
    dev_loss   = torch.zeros(num_epochs)
    dev_acc    = torch.zeros(num_epochs)

    # Loop over epochs
    for epoch_i in range(num_epochs):

        # Loop over training data batches
        batch_loss = []
        batch_acc  = []

        for X,y in train_loader:

            # Forward pass, backpropagation, and optimizer step
            yHat = ANN(X)
            loss = loss_fun(yHat,y.squeeze())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Loss and accuracy from this batch
            batch_loss.append(loss.item())
            batch_acc.append( 100*torch.mean((torch.argmax(yHat,axis=1)==y.squeeze()).float()) )

        train_loss[epoch_i]  = np.mean(batch_loss).item()
        train_acc[epoch_i]   = np.mean(batch_acc).item()

        # Test loss and pseudo-accuracy
        ANN.eval()

        with torch.no_grad():
            X,y  = next(iter(dev_loader))
            yHat = ANN(X)
            dev_loss[epoch_i] = loss_fun(yHat,y.squeeze())
            dev_acc[epoch_i]  = 100*torch.mean((torch.argmax(yHat,axis=1)==y.squeeze()).float())

        ANN.train()

    return train_loss,train_acc,dev_loss,dev_acc,ANN


In [None]:
# %% Test the whole setting

# Generate data
train_loader,dev_loader,test_loader,data = get_data()

# Set parameters and generate model
n_units    = 32
n_layers   = 2
lr         = 0.01
optim      = 'SGD'
L2_lambda  = 0
act_fun    = 'ReLU'
num_epochs = 250

ANN,loss_fun,optimizer = gen_model( n_units   = n_units,
                                    n_layers  = n_layers,
                                    lr        = lr,
                                    optim     = optim,
                                    L2_lambda = L2_lambda,
                                    act_fun   = act_fun )

# Train model
train_loss,train_acc,dev_loss,dev_acc,ANN = train_model(num_epochs=num_epochs)


In [95]:
# %% Functions for 1D smoothing filter

# Improved for edge effects - adaptive window
def smooth_adaptive(x,k):
    smoothed = np.zeros_like(x)
    half_k   = k // 2

    for i in range(len(x)):
        start       = max(0, i-half_k)
        end         = min(len(x), i+half_k + 1)
        smoothed[i] = np.mean(x[start:end])

    return smoothed


In [None]:
# %% Plotting

phi = (1 + np.sqrt(5)) / 2
fig,axs = plt.subplots(1,2,figsize=(1.5*phi*6,6))

# Train loss
l1 = axs[0].plot(smooth_adaptive(train_loss.numpy(),10),label="Loss")[0]
axs[0].set_ylim(0.2,1)
axs[0].set_title(f"Training set loss and accuracy\nFinal acc ≈ {train_acc[-5:].mean().item():.1f}%")
axs[0].set_xlabel("Epoch")
axs[0].set_ylabel("Loss")

ax0b = axs[0].twinx()
l2 = ax0b.plot(smooth_adaptive(train_acc.numpy(),10),label="Accuracy",color='tab:orange')[0]
ax0b.set_ylim(0,102)

axs[0].legend(handles=[l1,l2],loc='center right')

# Dev loss
l3 = axs[1].plot(smooth_adaptive(dev_loss.numpy(),10),label="Loss")[0]
axs[1].set_ylim(0.2,1)
axs[1].set_title(f"Development set loss and accuracy\nFinal acc ≈ {dev_acc[-5:].mean().item():.1f}%")
axs[1].set_xlabel("Epoch")

ax1b = axs[1].twinx()
l4 = ax1b.plot(smooth_adaptive(dev_acc.numpy(),10),label="Accuracy",color='tab:orange')[0]
ax1b.set_ylim(0,102)
ax1b.set_ylabel("Accuracy")

axs[1].legend(handles=[l3,l4],loc='center right')

plt.tight_layout()

plt.savefig('figure4_ffn_project_2.png')

plt.show()

files.download('figure4_ffn_project_2.png')


In [None]:
# %% Evaluate on test set and plot

# Evaluate on test set
ANN.eval()
with torch.no_grad():
    X,y    = next(iter(test_loader))
    y_pred = torch.argmax(ANN(X),dim=1).numpy()

y_true  = y.numpy().ravel()
correct = y_pred == y_true
test_acc = 100*np.mean(correct)

# Plotting
df_plot = pd.DataFrame({
            "Class": ["Healthy" if c==0 else "Pathological" for c in y_true],
            "Sample index": np.arange(len(y_true)),
            "Correct": correct })

phi = (1 + np.sqrt(5)) / 2
plt.figure(figsize=(phi*5,5))

sns.swarmplot(
        x="Class",
        y="Sample index",
        data=df_plot,
        hue="Correct",
        palette={True:"green",False:"red"},
        dodge=False,
        size=8 )

plt.ylabel("Subject index")
plt.title(f"Test set predictions (accuracy = {test_acc:.1f}%)")
plt.legend(title="Predictions")
plt.grid()
plt.tight_layout()

plt.savefig('figure5_ffn_project_2.png')

plt.show()

files.download('figure5_ffn_project_2.png')


In [None]:
# %% Compute confusion matrices and plot

# Get confusion matrices
ANN.eval()
with torch.no_grad():
    train_preds  = torch.argmax(ANN(train_loader.dataset.tensors[0]),dim=1).numpy()
    train_labels = train_loader.dataset.tensors[1].numpy()

    dev_preds    = torch.argmax(ANN(dev_loader.dataset.tensors[0]),dim=1).numpy()
    dev_labels   = dev_loader.dataset.tensors[1].numpy()

    test_preds   = torch.argmax(ANN(test_loader.dataset.tensors[0]),dim=1).numpy()
    test_labels  = test_loader.dataset.tensors[1].numpy()

train_conf = skm.confusion_matrix(train_labels,train_preds)
dev_conf   = skm.confusion_matrix(dev_labels,dev_preds)
test_conf  = skm.confusion_matrix(test_labels,test_preds)

# Plotting
phi = (1 + np.sqrt(5)) / 2
fig,ax = plt.subplots(1,3,figsize=(1.5*phi*6,6))
cmap = plt.cm.Blues

labels = ["Healthy", "Pathological"]

# Train
vmax = train_conf.max()
im   = ax[0].imshow(train_conf,cmap=cmap,vmax=vmax)
ax[0].set_xticks([0,1])
ax[0].set_yticks([0,1])
ax[0].set_xticklabels(labels)
ax[0].set_yticklabels(labels)
ax[0].set_xlabel('Predicted')
ax[0].set_ylabel('True')
ax[0].set_title('Train confusion matrix')
for i in range(train_conf.shape[0]):
    row_sum = train_conf[i].sum()
    for j in range(train_conf.shape[1]):
        color = 'white' if train_conf[i,j]/vmax > 0.5 else 'black'
        ax[0].text(j,i-0.05,f"{train_conf[i,j]}",ha='center',va='center',color=color,fontsize=12)
        ax[0].text(j,i+0.05,f"({100*train_conf[i,j]/row_sum:.1f}%)",ha='center',va='center',color=color,fontsize=10)

# Dev
vmax = dev_conf.max()
im   = ax[1].imshow(dev_conf,cmap=cmap,vmax=vmax)
ax[1].set_xticks([0,1])
ax[1].set_yticks([0,1])
ax[1].set_xticklabels(labels)
ax[1].set_yticklabels(labels)
ax[1].set_xlabel('Predicted')
ax[1].set_ylabel('True')
ax[1].set_title('Dev confusion matrix')
for i in range(dev_conf.shape[0]):
    row_sum = dev_conf[i].sum()
    for j in range(dev_conf.shape[1]):
        color = 'white' if dev_conf[i,j]/vmax > 0.5 else 'black'
        ax[1].text(j,i-0.05,f"{dev_conf[i,j]}",ha='center',va='center',color=color,fontsize=12)
        ax[1].text(j,i+0.05,f"({100*dev_conf[i,j]/row_sum:.1f}%)",ha='center',va='center',color=color,fontsize=10)

# Test
vmax = test_conf.max()
im   = ax[2].imshow(test_conf,cmap=cmap,vmax=vmax)
ax[2].set_xticks([0,1])
ax[2].set_yticks([0,1])
ax[2].set_xticklabels(labels)
ax[2].set_yticklabels(labels)
ax[2].set_xlabel('Predicted')
ax[2].set_ylabel('True')
ax[2].set_title('Test confusion matrix')
for i in range(test_conf.shape[0]):
    row_sum = test_conf[i].sum()
    for j in range(test_conf.shape[1]):
        color = 'white' if test_conf[i,j]/vmax > 0.5 else 'black'
        ax[2].text(j,i-0.05,f"{test_conf[i,j]}",ha='center',va='center',color=color,fontsize=12)
        ax[2].text(j,i+0.05,f"({100*test_conf[i,j]/row_sum:.1f}%)",ha='center',va='center',color=color,fontsize=10)

plt.tight_layout()

plt.savefig('figure6_ffn_project_2.png')

plt.show()

files.download('figure6_ffn_project_2.png')


In [184]:
# %% Compute performance measures on train and test data

# Preallocate
train_metrics = np.zeros(4)
dev_metrics   = np.zeros(4)
test_metrics  = np.zeros(4)

# Train performance measures
train_metrics[0] = skm.accuracy_score (train_labels,train_preds)
train_metrics[1] = skm.precision_score(train_labels,train_preds,average='weighted')
train_metrics[2] = skm.recall_score   (train_labels,train_preds,average='weighted')
train_metrics[3] = skm.f1_score       (train_labels,train_preds,average='weighted')

# Dev performance measures
dev_metrics[0] = skm.accuracy_score (dev_labels,dev_preds)
dev_metrics[1] = skm.precision_score(dev_labels,dev_preds,average='weighted')
dev_metrics[2] = skm.recall_score   (dev_labels,dev_preds,average='weighted')
dev_metrics[3] = skm.f1_score       (dev_labels,dev_preds,average='weighted')

# Test performance measures
test_metrics[0] = skm.accuracy_score (test_labels,test_preds)
test_metrics[1] = skm.precision_score(test_labels,test_preds,average='weighted')
test_metrics[2] = skm.recall_score   (test_labels,test_preds,average='weighted')
test_metrics[3] = skm.f1_score       (test_labels,test_preds,average='weighted')


In [None]:
# %% Plotting

phi = (1 + np.sqrt(5)) / 2
fig = plt.figure(figsize=(6*phi,6))

datasets    = ['Train','Dev','Test']
metrics     = ['Accuracy','Precision','Recall','F1-score']
all_metrics = np.stack([train_metrics,dev_metrics,test_metrics],axis=0)

x     = np.arange(len(datasets))
width = 0.15

for i in range(len(metrics)):
    plt.bar(x+i*0.5*width-.5*width,all_metrics[:,i],width,label=metrics[i],zorder=2)

plt.xticks(x,datasets)
plt.ylim([0.84,0.92])
plt.ylabel('Score')
plt.title('Performance metrics per set')
plt.legend()
plt.grid(alpha=1,axis='y')
plt.tight_layout()

plt.savefig('figure7_ffn_project_2.png')

plt.show()

files.download('figure7_ffn_project_2.png')
