# CNN: Classification of CHD cases
## Model training

In [None]:
# Install required packages
%pip install -q ydata-profiling numpy pandas matplotlib tensorflow imblearn torch 
%reset -f

import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
from IPython.display import display, Markdown



In [None]:
# Define helper functions

def to_list(df_dict, heads):
    """Convert dataframe to dictionaries"""
    # {key: [], ...}
    out = {}
    for _key in list(heads):
        out[_key] = [df_dict[_key][key] for key in df_dict[_key].keys()]
    return out

def encode_input(data_range, data):
    """Encode categorical variables with string categories into numerical categoriess"""
    return data_range.index(data)

def preprocess_data(df_list: dict, normalize=True) -> np.ndarray:
    """Preprocess data into np.arrays (encode if needed)"""
    output = []
    data_length = len(df_list['chd'])
    for i in range(data_length):
        item = []
        for key in list(df_list.keys())[1:]:
            item.append(df_list[key][i])
        output.append(item)
    inputs_arr = np.array(output)
    targets_arr = np.array(df_list['chd'])
    if normalize:
        _range = np.max(inputs_arr, axis=0) - np.min(inputs_arr, axis=0)
        inputs_arr = (inputs_arr-np.min(inputs_arr, axis=0)) / _range
    return inputs_arr, targets_arr

def prepare_data(inputs, targets, seed=1001):
    """Prepare data for CNN"""
    positive_mask = targets == 1
    negative_mask = targets == 0
    
    #Choose between a. and b. 
        # a. Use the next 6 lines of code if need to balance sample by undersampling
    np.random.seed(seed)
    n_minimum = min(np.sum(positive_mask), np.sum(negative_mask))

    positive_indices = random.sample(range(np.sum(positive_mask)), n_minimum)
    negative_indices = random.sample(range(np.sum(negative_mask)), n_minimum)

    positive_inputs = inputs[positive_mask][positive_indices, ]
    positive_targets = targets[positive_mask][positive_indices, ]
    negative_inputs = inputs[negative_mask][negative_indices,]
    negative_targets = targets[negative_mask][negative_indices,]
    inputs = np.concatenate([positive_inputs, negative_inputs]).tolist()
    targets = np.concatenate([positive_targets, negative_targets]).tolist()

        # b. Use next two lines if no need to balance samples 
    #inputs = inputs.tolist()
    #targets = targets.tolist()

    np.random.seed(seed)
    np.random.shuffle(inputs)
    np.random.seed(seed)
    np.random.shuffle(targets)
    return np.array(inputs), np.array(targets)

def get_metrics(y_pred, y_true):
    """Calculate metrics from confusion matrix"""
    _confusion_matrix = confusion_matrix(y_pred, y_true)
    tp = _confusion_matrix[0,0]
    fn = _confusion_matrix[1,0]
    fp = _confusion_matrix[0,1]
    tn = _confusion_matrix[1,1]
    # metrics
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)
    fscore = 2*tp/(2*tp + fp + fn)
    accuracy = (tp+tn)/(tp+tn+fp+fn)
    miss_rate = fn/(tn+tp)
    fall_out_rate = fp/(fp+tn)
    # return [precision, recall, fscore, accuracy, miss_rate, fall_out_rate]
    return [precision, recall, fscore, accuracy, miss_rate, fall_out_rate]

def present_metrics(results_dict): 
    """Generate data frame of metrics"""
    df = pd.DataFrame(results_dict, index = ['Precision', 'Recall', 'F-score', 'Accuracy', 'Miss Rate', 'Fall out rate'])
    df['Average'] = df.mean(axis=1)
    return df


def plot_loss_acc(history):
    """Plot train and test loss (left) / accuracy (right) for each epoch"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    colors = plt.cm.get_cmap('tab10', len(history))  
    
    # Plot training and validation loss
    for i, (repeat, data) in enumerate(history.items()):
        epochs = range(1, len(data['train_loss']) + 1)
        ax1.plot(epochs, data['train_loss'], '--', label=f'{repeat} Training Loss',color=colors(i))
        ax1.plot(epochs, data['val_loss'], label=f'{repeat} Validation Loss',color=colors(i))
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)

    # Plot training and validation accuracy
    for i, (repeat, data) in enumerate(history.items()):
        epochs = range(1, len(data['train_acc']) + 1)
        ax2.plot(epochs, data['train_acc'], '--', label=f'{repeat} Training Accuracy',color=colors(i))
        ax2.plot(epochs, data['val_acc'], label=f'{repeat} Validation Accuracy',color=colors(i))

    ax2.set_title('Training and Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.show()


In [None]:
# Import imbalanced data and convert to dictionary 
filepath = 'cleaned_data_imbalanced.csv'
df = pd.read_csv(filepath)
df_clear = df.dropna(axis=0)
df_shuffle = df_clear.sample(frac=1).reset_index(drop=True)
df_dict = df_shuffle.to_dict() # shuffle resampled data
heads = list(df_dict.keys())
df_list = to_list(df_dict, heads)
display('Number of samples: {len(df_list["chd"])}')
display('Number of variables: {len(df_list)}')

In [None]:
# Build CNN

class CHDPred(Dataset):
    def __init__(self, inputs, targets) -> None:
        super().__init__()
        self.inputs = inputs
        self.targets = targets
        
    def __len__(self):
        return self.inputs.shape[0]

    def __getitem__(self, index):
        _input = torch.from_numpy(self.inputs[index]).type(torch.float32)
        _target = torch.from_numpy(self.targets[index]).type(torch.float32)
        return _input, _target

# Actual classification model 
class CHDPredModel(nn.Module):
    def __init__(self):
        super(CHDPredModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=8, kernel_size=(3,2)) # kernel size adjusted to fit dataset

        self.linear1 = nn.Linear(32, 16)
        self.linear2 = nn.Linear(16, 1)

    
    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = F.relu(self.conv2(out))
        out = out.view((-1, 32))
        out = F.relu(self.linear1(out))
        return F.sigmoid(self.linear2(out))

# Classification model for shape check in each step
class CHDPredModelSHAPE(nn.Module):
    def __init__(self):
        super(CHDPredModelSHAPE, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=8, kernel_size=(3,2)) # kernel size adjusted to fit dataset
        self.linear1 = nn.Linear(32, 16)
        self.linear2 = nn.Linear(16, 1)

    def forward(self, x):
        print("Input shape:", x.shape)
        out = F.relu(self.conv1(x))
        print("Shape after conv1:", out.shape)
        out = F.relu(self.conv2(out))
        print("Shape after conv2:", out.shape)
        out = out.view((-1, 32))
        print("Shape after view:", out.shape)
        out = F.relu(self.linear1(out))
        print("Shape after linear1:", out.shape)
        return torch.sigmoid(self.linear2(out))

In [None]:
inputs, targets = preprocess_data(df_list)
print(f'inputs shape after preprocess_data(): {inputs.shape}')
print(f'targets shape after preprocess_data(): {targets.shape}')

inputs, targets = prepare_data(inputs, targets)
print(f'inputs shape after prepare_data(): {inputs.shape}')
print(f'targets shape after prepare_data(): {targets.shape}')

n_samples = inputs.shape[0]
tr_inputs = inputs[0:int(n_samples*0.7), :].reshape((-1, 1, 3, 5)) # input size adjusted to fit dataset
print(f'tr_inputs shape: {tr_inputs.shape}')
tr_targets = targets[0:int(n_samples*0.7)].reshape((-1, 1))
print(f'tr_targets shape: {tr_targets.shape}')
te_inputs = inputs[int(n_samples*0.7):, :].reshape((-1, 1, 3, 5)) # input size adjusted to fit dataset
print(f'te_inputs shape: {te_inputs.shape}')
te_targets = targets[int(n_samples*0.7):].reshape((-1, 1))
print(f'te_targets shape: {te_targets.shape}')

# Check shapes of the model 
batchsize = 32

train_set = CHDPred(tr_inputs, tr_targets)
val_set = CHDPred(te_inputs, te_targets)
train_loader = DataLoader(train_set, batch_size=batchsize)
val_loader = DataLoader(val_set, batch_size=1)

model = CHDPredModelSHAPE()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


for j, (input_, target_) in enumerate(train_loader):
    optimizer.zero_grad()
    out = model(input_)
    loss = criterion(out, target_)

    loss.backward()
    optimizer.step()

    # Printing shapes for the first iteration only
    if j == 0:
        break

In [None]:
epochs = 100
repeats = 5
batchsize = 32

all_results = {}
history = {}

for rep in range(repeats): 
    
    display(Markdown(f'#### Repeat {rep+1}'))

    #inputs, targets = preprocess_data(df_list)
    #inputs, targets = prepare_data(inputs, targets)
    #n_samples = inputs.shape[0]
    #tr_inputs = inputs[0:int(n_samples*0.7), :].reshape((-1, 1, 3, 5)) # input size adjusted to fit dataset
    #tr_targets = targets[0:int(n_samples*0.7)].reshape((-1, 1))
    #te_inputs = inputs[int(n_samples*0.7):, :].reshape((-1, 1, 3, 5)) # input size adjusted to fit dataset
    #te_targets = targets[int(n_samples*0.7):].reshape((-1, 1))

    train_set = CHDPred(tr_inputs, tr_targets)
    val_set = CHDPred(te_inputs, te_targets)
    train_loader = DataLoader(train_set, batch_size=batchsize)
    val_loader = DataLoader(val_set, batch_size=1)

    model = CHDPredModel()
    citeration = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # Initiation accuracy and loss records 
    train_accs = []
    val_accs = []
    train_losses = []
    val_losses = []

    for i in range(epochs):
        model.train()
        loss_ = 0
        acc_ = 0
        val_acc = 0
        val_loss_ = 0

        for j, (input_, target_) in enumerate(train_loader):
            optimizer.zero_grad()
            out = model(input_)
            loss = citeration(out, target_)

            loss.backward()
            optimizer.step()

            pred = out.detach().numpy()
            pred_ = np.zeros_like(pred)
            pred_[pred>0.5] = 1
            pred_ = pred_.astype('float')
            acc = np.sum(pred_ == target_.numpy()) / batchsize
            
            loss_ += loss.item()
            acc_ += acc

        model.eval()
        for j, (input_, target_) in enumerate(val_loader):
            out = model(input_)
            val_loss = citeration(out, target_)

            pred = out.detach().numpy()
            pred_ = np.zeros_like(pred)
            pred_[pred>0.5] = 1
            pred_ = pred_.astype('float')
            acc = np.sum(pred_ == target_.numpy())
            
            val_loss_ += val_loss.item()
            val_acc += acc

        # Calculate average training and validation accuracy and loss
        train_acc_avg = acc_ / len(train_loader)
        val_acc_avg = val_acc / len(val_loader)
        train_loss_avg = loss_ / len(train_loader)
        val_loss_avg = val_loss_ / len(val_loader)

        # Store values for plotting
        train_accs.append(train_acc_avg)
        val_accs.append(val_acc_avg)
        train_losses.append(train_loss_avg)
        val_losses.append(val_loss_avg)

        # Display after each epoch
        print("epochs: {}, train_loss: {}, val_loss: {}, train_acc: {}, val_acc: {}".format(
        i + 1,
        train_loss_avg,
        val_loss_avg,
        train_acc_avg, 
        val_acc_avg))

    model.eval()
    preds = []
    labels = []
    for j, (input_, target_) in enumerate(val_loader):
        out = model(input_)
        pred = out.detach().numpy()
        pred_ = np.zeros_like(pred)
        pred_[pred>0.5] = 1
        pred_ = pred_.astype('float')
        preds.append(pred_[0][0])
        labels.append(target_.numpy()[0][0])

    # Record history in dictionary
    repeat_key = f'Repeat {len(all_results) + 1}'
        # history = {'Repeat 1': [loss, val_acc]} 
    history[repeat_key] = {'train_acc': train_accs,
                           'val_acc': val_accs, 
                           'train_loss': train_losses,
                           'val_loss': val_losses}
    
    # Record results in dictionary
    round_results = get_metrics(preds, labels)
    all_results[repeat_key] = round_results



# Model Evaluation

In [None]:
# Create subplots
plot_loss_acc(history)


In [None]:
# Print performance metrics
display(Markdown(f'# CNN Performance Metrics'))
display(present_metrics(all_results))


## Save model

In [None]:
model_scripted = torch.jit.script(model) # Export to TorchScript
model_scripted.save('model_scripted.pt') # Save