# Convolutional Neural Networks

## Imports and constants

In [None]:
import torch
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, SubsetRandomSampler
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchmetrics import Accuracy
from torchinfo import summary
from tqdm.notebook import tqdm

import matplotlib.patches as mpatches
from sklearn.manifold import TSNE
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import seaborn as sns

from datetime import datetime
import os
import shutil
import subprocess
import webbrowser
import time
import csv
import numpy as np
import pandas as pd


TENSORBOARD = False
EPOCHS = 15
BATCH_SIZE = 32
LEARNING_RATE = 0.001
models_path = 'models'
plots_path = 'graphs'
runs_path = 'runs'

## Load data & initial preprocessing

In [None]:
# Download the FashionMNIST train and test datasets
train_set_full = datasets.FashionMNIST(root='./datasets/', train=True, download=True, transform=transforms.ToTensor())
test_set = datasets.FashionMNIST(root='./datasets', train=False, download=True, transform=transforms.ToTensor())

# Compute the mean and std of train images
imgs = torch.stack([img for img, _ in train_set_full], dim=0)
mean = imgs.mean()
std = imgs.std()

# Define transformations:
# -ToTensor: Converts images to tensors
# -Normalise: Normalizes images to have mean 0 and std 1
train_val_set_transforms_aug = transforms.Compose([
    #transforms.RandomRotation(degrees=10),
    transforms.RandomHorizontalFlip(p=0.25),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    #transforms.RandomGrayscale(p=0.2),
    #transforms.RandomResizedCrop(size=(28, 28), scale=(0.8, 1.0)),
    #transforms.RandomInvert(p=0.8),
    transforms.RandomAutocontrast(p=0.2),
    transforms.RandomSolarize(p=0.2, threshold=15),
    transforms.ToTensor(),
    #transforms.RandomErasing(p=0.2, scale=(0.02, 0.1), ratio=(0.1, 3.3), value=0),
    transforms.Normalize(mean=mean, std=std)
])

test_set_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# Apply transforms to the train and test datasets
train_val_dataset = datasets.FashionMNIST(root='./datasets/', train=True, download=True, transform=train_val_set_transforms_aug)
test_dataset = datasets.FashionMNIST(root='./datasets/', train=False, download=True, transform=test_set_transforms)


# Split the train set into training and validation sets
train_size = int(0.8 * len(train_val_dataset))
validation_size = len(train_val_dataset) - train_size
train_set, validation_set = random_split(train_val_dataset, [train_size, validation_size])

# Create iterators for our datasets using DataLoader
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = DataLoader(validation_set, batch_size=BATCH_SIZE, shuffle=True)
train_validation_loader = DataLoader(train_val_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Select device for training. Selects GPU or MPS if available, otherwise uses CPU
device = (
    'cuda'
    if torch.cuda.is_available()
    else 'mps'
    if torch.backends.mps.is_available()
    else 'cpu'
)
print(f'Using {device} device')

# Index dataset's labels to image descriptors
labels_map = {
    0: 'T-Shirt',
    1: 'Trouser',
    2: 'Pullover',
    3: 'Dress',
    4: 'Coat',
    5: 'Sandal',
    6: 'Shirt',
    7: 'Sneaker',
    8: 'Bag',
    9: 'Ankle Boot',
}

In [None]:
# If tensorboard and there is a 'runs' folder, delete it to start over
if TENSORBOARD:
    def delete_folder(folder_path):
        # Check if the folder exists
        if os.path.exists(folder_path) and os.path.isdir(folder_path):
            # Delete the folder
            shutil.rmtree(folder_path)
            print(f'Folder '{folder_path}' has been deleted.')
        else:
            print(f'Folder '{folder_path}' does not exist.')

    folder_to_delete = runs_path
    delete_folder(folder_to_delete)

## LeNet baseline

### Define the LeNet model

In [None]:
class LeNet5(nn.Module):
    def __init__(self):
        super().__init__()
        self.first_wave = nn.Sequential(
              nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2),   # 28x28 -> 32x32 -> 28x28
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 28x28 -> 14x14
        )

        self.second_wave = nn.Sequential(      
              nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1),  # 14x14 -> 10x10
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 10x10 -> 5x5
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=16*5*5, out_features=120),
            nn.ReLU(),
            nn.Linear(in_features=120, out_features=84),
            nn.ReLU(),
            nn.Linear(in_features=84, out_features=10),
        )
        
        # Initialize weights using Kaiming Uniform
        self._initialize_weights()
        
        # Additional output layers
        self.aux_output1 = nn.Linear(6*14*14, 10)  # After first pooling
        self.aux_output2 = nn.Linear(16*5*5, 10)   # After second pooling

    def forward(self, x):
        x = self.first_wave(x)
        aux1 = self.aux_output1(x.view(x.size(0), -1))
        
        x = self.second_wave(x)
        aux2 = self.aux_output2(x.view(x.size(0), -1))
        
        x = self.classifier(x)
        
        return x, aux1, aux2

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def get_features(self, x):
        # Pass through the first wave of layers
        x = self.first_wave(x)
        
        # Pass through the second wave of layers
        x = self.second_wave(x)
        
        # Flatten the output for the classifier
        x = x.view(x.size(0), -1)

        # Pass through all layers of the classifier except the last one
        for layer in self.classifier[:-1]:
            x = layer(x)

        return x

In [None]:
model_lenet5 = LeNet5()

summary(model=model_lenet5, input_size=(1, 1, 28, 28), col_width=20,
                  col_names=['input_size', 'output_size', 'num_params', 'trainable'], row_settings=['var_names'], verbose=0)

### Train LeNet model

In [None]:
def train_model_multiclass(model, num_classes, train_loader, validation_loader=None, device='cpu', epochs=15, lr=0.001,
                           adapt_lr=False, adapt_lr_step=5, adapt_lr_gamma=0.5, tensorboard_tracking=False):
    '''
    Trains a model for multiclass classification.

    :param model: model
    :param num_classes: number of classes
    :param train_loader: training data loader
    :param validation_loader: validation data loader, if None then no validation
    :param device: device choice
    :param epochs: number of epochs
    :param lr: learning rate
    :param adapt_lr: True to adapt learning rate, False to keep static
    :param adapt_lr_step: number of epochs to change learning rate after if adapt_lr is True
    :param adapt_lr_gamma: learning rate adaptation gamma if adapt_lr is True
    :param tensorboard_tracking: tracks training with TensorBoard if True

    :return: returns model at best epoch, dictionary of training and validation results up to best epoch, and learning rates for every epoch
    '''
    
    # Experiment tracking setup using TensorBoard
    if tensorboard_tracking:
        timestamp = datetime.now().strftime('%Y-%m-%d')
        experiment_name = 'MNIST'
        model_name = model.__class__.__name__
        log_dir = os.path.join('runs', timestamp, experiment_name, model_name)
        writer = SummaryWriter(log_dir) # Initialize a TensorBoard writer for logging

    # Define Loss Function
    loss_fn = nn.CrossEntropyLoss()
    # Define Optimizer
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = StepLR(optimizer, step_size=adapt_lr_step, gamma=adapt_lr_gamma)
    # Define output accuracy function
    accuracy = Accuracy(task='multiclass', num_classes=num_classes)
    
    # Move accuracy and model to the appropriate device
    accuracy = accuracy.to(device)
    model_lenet5 = model.to(device)
    
    history_lenet5 = {'train_loss':[], 'val_loss':[], 'train_acc': [], 'val_acc': []}
    
    learning_rates = []
    
    best_val_acc = 0.0
    best_epoch = 0
    best_model_state = None
    
    for epoch in tqdm(range(epochs)): # Progress bar for epochs
        # Initialize trining loss and accuracy
        train_loss, train_acc = 0.0, 0.0
    
        # Iterating over batches of training data
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            # Set model to training mode
            model_lenet5.train()
            # Forward pass: compute predictions
            y_pred, _, _ = model_lenet5(X)
            #Calculate the current batch loss and accuracy and aggregate it to the total variables
            loss = loss_fn(y_pred, y)
            train_loss += loss.item()
            acc = accuracy(y_pred, y)
            train_acc += acc
            # Zero out gradients
            optimizer.zero_grad()
            # Backpropagate the loss
            loss.backward()
            # Update model parameters
            optimizer.step()
    
        # Average training loss and accuracy over all batches
        train_loss /= len(train_loader)
        train_acc /= len(train_loader)
        # Keep history
        history_lenet5['train_loss'].append(train_loss)
        history_lenet5['train_acc'].append(train_acc)

        val_loss, val_acc = 0.0, 0.0
        # If running validation
        if validation_loader is not None:
            # Set model to evaluation mode
            model_lenet5.eval()
            with torch.inference_mode(): # Context manager for inference without gradient tracking
                # Iterating over batches of validation data
                for X, y in validation_loader:
                    X, y = X.to(device), y.to(device)
        
                    y_pred, _, _ = model_lenet5(X)
        
                    loss = loss_fn(y_pred, y)
                    val_loss += loss.item()
        
                    acc = accuracy(y_pred, y)
                    val_acc += acc
        
                # Average validation loss and accuracy over all batches
                val_loss /= len(validation_loader)
                val_acc /= len(validation_loader)
                # Keep history
                history_lenet5['val_loss'].append(val_loss)
                history_lenet5['val_acc'].append(val_acc)
                
                # Check if the validation accuracy improved
                if val_acc > best_val_acc:
                    best_val_acc = val_acc
                    # Save the best model state
                    best_epoch = epoch
                    best_model_state = model_lenet5.state_dict()
        else:
            # Keep values at 0 if not running validation
            history_lenet5['val_loss'].append(val_loss)
            history_lenet5['val_acc'].append(val_acc)
    
        if tensorboard_tracking:
            # Log training and validation loss and accuracy to TensorBoard
            writer.add_scalars(main_tag='Loss', tag_scalar_dict={'train/loss': train_loss, 'val/loss': val_loss_}, global_step=epoch)
            writer.add_scalars(main_tag='Accuracy', tag_scalar_dict={'train/acc': train_acc, 'val/acc': val_acc}, global_step=epoch)
    
        print(f'Epoch: {epoch}| Train loss: {train_loss: .5f}| Train acc: {train_acc: .5f}| Val loss: {val_loss: .5f}| Val acc: {val_acc: .5f}')
        
        # Decrease the learning rate
        if adapt_lr:
            scheduler.step()
        learning_rates.append(scheduler.get_last_lr())
        
    # After training
    if best_model_state is not None:
        model_lenet5.load_state_dict(best_model_state)
        # Cut the history up to the best epoch
        history_lenet5['train_loss'] = history_lenet5['train_loss'][:best_epoch + 1]
        history_lenet5['train_acc'] = history_lenet5['train_acc'][:best_epoch + 1]
        history_lenet5['val_loss'] = history_lenet5['val_loss'][:best_epoch + 1]
        history_lenet5['val_acc'] = history_lenet5['val_acc'][:best_epoch + 1]

    if tensorboard_tracking:
        writer.close()

    return model_lenet5, history_lenet5, learning_rates

In [None]:
model_lenet5, history_lenet5, learning_rates_lenet5 = train_model_multiclass(model_lenet5, 10, train_loader, validation_loader, device,
                                                                             EPOCHS, LEARNING_RATE,
                                                                             tensorboard_tracking=TENSORBOARD)

### Visualize the descriptive layers

In [None]:
model_lenet5.eval()
with torch.no_grad():
    # Get one batch of data
    for images, _ in test_loader:
        images = images.to(device)
        _, aux1, aux2 = model_lenet5(images)

        # Select the first image in the batch for visualization
        scores_aux1 = aux1[0].cpu().numpy()
        scores_aux2 = aux2[0].cpu().numpy()

        # Plot
        plt.figure(figsize=(15, 6))
        
        plt.subplot(1, 2, 1)
        plt.title('Aux1 Output for the First Image')
        plt.bar(np.arange(10), scores_aux1)
        plt.xlabel('Classes')
        plt.ylabel('Scores')

        plt.subplot(1, 2, 2)
        plt.title('Aux2 Output for the First Image')
        plt.bar(np.arange(10), scores_aux2)
        plt.xlabel('Classes')
        plt.ylabel('Scores')

        plt.show()
        break  # Only visualize for the first batch


### Test in how many images the first Conv layer predicted the same as the second

In [None]:
total_samples = 0
matching_highest_scores = 0

model_lenet5.eval()
with torch.no_grad():
    for X, y in train_loader:
        X = X.to(device)
        _, aux1, aux2 = model_lenet5(X)

        # Get the indices of the max score for each sample in the batch
        max_indices_aux1 = torch.argmax(aux1, dim=1)
        max_indices_aux2 = torch.argmax(aux2, dim=1)

        # Count how many times the highest scoring classes match
        matches = torch.sum(max_indices_aux1 == max_indices_aux2)
        matching_highest_scores += matches.item()

        total_samples += X.size(0)

# Calculate the percentage
percentage_matching = (matching_highest_scores / total_samples) * 100
print(f'Percentage of matching highest scores: {percentage_matching:.2f}%')


### t-SNE visualization

In [None]:
features = []
labels_list = []

model_lenet5.eval()
with torch.no_grad():
    for X, y in test_loader:
        X = X.to(device)
        feature = model_lenet5.get_features(X)
        features.append(feature.cpu().numpy())
        labels_list.append(y.numpy())

features = np.concatenate(features, axis=0)
labels = np.concatenate(labels_list, axis=0)

features = features[::1]
labels = labels[::1]

# Apply t-SNE
tsne = TSNE(n_components=2, random_state=123).fit_transform(features)

# Create a color map with distinct colors for each label
custom_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
                 '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']
custom_cmap = ListedColormap(custom_colors)

# Plotting
plt.figure(figsize=(10, 6))
scatter = plt.scatter(tsne[:, 0], tsne[:, 1], c=labels, cmap=custom_cmap)
plt.title('t-SNE visualization of LeNet5 penultimate layer features')

# Calculate centroids for each class
centroids = {}
for i in range(10):
    indices = labels == i
    centroids[i] = np.mean(tsne[indices], axis=0)
    
# Plot label names at centroids
for i, centroid in centroids.items():
    plt.text(centroid[0], centroid[1], labels_map[i],
             color='black', fontsize=12, fontweight='demi', ha='center', va='center')   
    

# Create a legend
handles = [mpatches.Patch(color=custom_cmap(i), label=labels_map[i]) for i in range(10)]
plt.legend(handles=handles, title='Clothing Items', fontsize='6', loc='lower left')

# Save plot to file
save_path = os.path.join(plots_path +'/', f'tSNE.png')
plt.savefig(save_path, bbox_inches='tight', dpi=300)

plt.show()

### Plot metrics and store model

In [None]:
def plot_train_val_metrics(history, model_name, plots_path='graphs'):
    '''
    Plots accuracy and loss for training and validation sets and outputs to file.

    :param history: dictionary of training and validation results
    :param model_name: model name for plot title and output file name
    :param plots_path: plots directory path
    '''
    
    plt.figure(figsize=(10, 4))

    # Plot training and validation loss
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Training Loss', color='black')
    plt.plot(history['val_loss'], label='Validation Loss', color='red')
    plt.title('Training & Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plot training and validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Training Accuracy', color='black')
    plt.plot(history['val_acc'], label='Validation Accuracy', color='red')
    plt.title('Training & Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Add a main title for the whole figure
    plt.suptitle(f'Training and Validation Metrics for {model_name}', fontsize=16, y=1.05)

    # Adjust layout and show plot
    plt.tight_layout()
    
    # Save plot to file
    save_path = os.path.join(plots_path +'/', f'{model_name}_metrics.png')
    plt.savefig(save_path, bbox_inches='tight', dpi=300)
    
    plt.show()

In [None]:
def export_model(model, model_name, models_path='models'):
    '''
    Outputs model to file.

    :param model: model
    :param model_name: model name for output file name
    :param models_path: models directory path
    '''

    # Export model to TorchScript
    model_scripted = torch.jit.script(model)
    
    # Save model
    save_path = os.path.join(models_path, model_name + '.pt')
    model_scripted.save(save_path)

In [None]:
plot_train_val_metrics(history_lenet5, 'lenet5', plots_path)

In [None]:
export_model(model_lenet5, 'lenet5_model_original')

### Evaluation on the test set

In [None]:
def test_model(model, test_loader):
    '''
    Tests a trained model on test data.

    :param model: model
    :param test_loader: test data loader
    :return: returns test loss, test accuracy, predicted labels, and actual labels
    '''
    
    # model to the appropriate device
    model_lenet5 = model.to(device)
    
    test_loss, test_acc = 0, 0
    y_preds = []
    y_labels = []

    # Set model to evaluation mode
    model_lenet5.eval()
    with torch.inference_mode():
        # Iterating over batches of test data
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            y_pred, _, _ = model_lenet5(X)
    
            test_loss += loss_fn(y_pred, y)
            test_acc += accuracy(y_pred, y)
    
            # Store predictions and labels for confusion matrix
            y_preds.extend(y_pred.argmax(dim=1).cpu().numpy())
            y_labels.extend(y.cpu().numpy())

        # Average test loss and accuracy over all batches
        test_loss /= len(test_loader)
        test_acc /= len(test_loader)
    
    print(f'Test loss: {test_loss: .5f}| Test acc: {test_acc: .5f}')

    return test_loss, test_acc, y_preds, y_labels

In [None]:
test_loss, test_acc, y_preds, y_labels = test_model(model_lenet5, test_loader)

In [None]:
# See random images with their labels
torch.manual_seed(22)  # setting random seed
%matplotlib inline
fig = plt.figure(figsize=(12, 4))

rows, cols = 6, 10
for i in range(1, (rows * cols) + 1):
    random_idx = torch.randint(0, len(test_dataset), size=[1]).item()
    img, label_gt = test_dataset[random_idx]
    img_temp = img.unsqueeze(dim=0).to(device)
    # print(img.shape)
    label_pred = int(torch.argmax(model_lenet5(img_temp)[0]))
    fig.add_subplot(rows, cols, i)
    img = img.permute(1, 2, 0)    # CWH --> WHC
    plt.imshow(img, cmap='gray')
    if label_pred == label_gt:
        plt.title(labels_map[label_pred], color='g') # green label for correct prediction
    else:
        plt.title(labels_map[label_pred], color='r') # red label for incorrect prediction
    plt.axis(False)
    plt.tight_layout();

## Model variants

### 1st variant
Change learning rate to adaptive learning rate (1/2 every 5 epochs)

In [None]:
class LeNet5_v1(nn.Module):
    def __init__(self):
        super().__init__()
        self.first_wave = nn.Sequential(
              nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2),   # 28x28 -> 32x32 -> 28x28
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 28x28 -> 14x14
        )

        self.second_wave = nn.Sequential(      
              nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1),  # 14x14 -> 10x10
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 10x10 -> 5x5
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=16*5*5, out_features=120),
            nn.ReLU(),
            nn.Linear(in_features=120, out_features=84),
            nn.ReLU(),
            nn.Linear(in_features=84, out_features=10),
        )
        
        # Initialize weights using Kaiming Uniform
        self._initialize_weights()
        
        # Additional output layers
        self.aux_output1 = nn.Linear(6*14*14, 10)  # After first pooling
        self.aux_output2 = nn.Linear(16*5*5, 10)   # After second pooling

    def forward(self, x):
        x = self.first_wave(x)
        aux1 = self.aux_output1(x.view(x.size(0), -1))
        
        x = self.second_wave(x)
        aux2 = self.aux_output2(x.view(x.size(0), -1))
        
        x = self.classifier(x)
        
        return x, aux1, aux2

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def get_features(self, x):
        # Pass through the first wave of layers
        x = self.first_wave(x)
        
        # Pass through the second wave of layers
        x = self.second_wave(x)
        
        # Flatten the output for the classifier
        x = x.view(x.size(0), -1)

        # Pass through all layers of the classifier except the last one
        for layer in self.classifier[:-1]:
            x = layer(x)

        return x

In [None]:
model_lenet5_v1 = LeNet5_v1()

summary(model=model_lenet5_v1, input_size=(1, 1, 28, 28), col_width=20,
                  col_names=['input_size', 'output_size', 'num_params', 'trainable'], row_settings=['var_names'], verbose=0)

In [None]:
model_lenet5_v1, history_lenet5_v1, learning_rates_lenet5_v1 = train_model_multiclass(model_lenet5_v1, 10, train_loader, validation_loader, device,
                                                                                      EPOCHS, LEARNING_RATE, True, 5, 0.5,
                                                                                      tensorboard_tracking=TENSORBOARD)

In [None]:
plot_train_val_metrics(history_lenet5_v1, 'lenet5_v1')

In [None]:
export_model(model_lenet5_v1, 'lenet5_model_v1')

In [None]:
def plot_lr(lr_per_epoch, file_name='adaptive_lr', plots_path='graphs'):
    '''
    Plots learning rates for every epoch and outputs to file.

    :param lr_per_epoch: learning rate for every epoch
    :param file_name: output file name (exclusing extension)
    :param plots_path: plots directory path
    '''
    # Plot learning rate
    plt.plot(lr_per_epoch)
    plt.xlabel('Epochs')
    plt.ylabel('Learning Rate')
    plt.title('Adaptive Learning Rate During Training')

    # Save plot to file
    save_path = os.path.join(plots_path +'/', file_name + '.png')
    plt.savefig(save_path, bbox_inches='tight', dpi=300)
    
    plt.show()

In [None]:
plot_lr(learning_rates_lenet5_v1, 'adaptive_lr')

### 2nd variant
Increase number of filters in convolution layers from 6 and 16 to 32 and 64

In [None]:
class LeNet5_v2(nn.Module):
    def __init__(self):
        super().__init__()
        self.first_wave = nn.Sequential(
              nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2),   # 28x28 -> 32x32 -> 28x28
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 28x28 -> 14x14
        )

        self.second_wave = nn.Sequential(      
              nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1),  # 14x14 -> 10x10
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 10x10 -> 5x5
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=64*5*5, out_features=120),
            nn.ReLU(),
            nn.Linear(in_features=120, out_features=84),
            nn.ReLU(),
            nn.Linear(in_features=84, out_features=10),
        )
        
        # Initialize weights using Kaiming Uniform
        self._initialize_weights()
        
        # Additional output layers
        self.aux_output1 = nn.Linear(32*14*14, 10)  # After first pooling
        self.aux_output2 = nn.Linear(64*5*5, 10)   # After second pooling

    def forward(self, x):
        x = self.first_wave(x)
        aux1 = self.aux_output1(x.view(x.size(0), -1))
        
        x = self.second_wave(x)
        aux2 = self.aux_output2(x.view(x.size(0), -1))
        
        x = self.classifier(x)
        
        return x, aux1, aux2

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def get_features(self, x):
        # Pass through the first wave of layers
        x = self.first_wave(x)
        
        # Pass through the second wave of layers
        x = self.second_wave(x)
        
        # Flatten the output for the classifier
        x = x.view(x.size(0), -1)

        # Pass through all layers of the classifier except the last one
        for layer in self.classifier[:-1]:
            x = layer(x)

        return x

In [None]:
model_lenet5_v2 = LeNet5_v2()

summary(model=model_lenet5_v2, input_size=(1, 1, 28, 28), col_width=20,
                  col_names=['input_size', 'output_size', 'num_params', 'trainable'], row_settings=['var_names'], verbose=0)

In [None]:
model_lenet5_v2, history_lenet5_v2, learning_rates_lenet5_v2 = train_model_multiclass(model_lenet5_v2, 10, train_loader, validation_loader, device,
                                                                                      EPOCHS, LEARNING_RATE, True, 5, 0.5,
                                                                                      tensorboard_tracking=TENSORBOARD)

In [None]:
plot_train_val_metrics(history_lenet5_v2, 'lenet5_v2')

In [None]:
export_model(model_lenet5_v2, 'lenet5_model_v2')

### 3rd variant
Increase neurons in fully connected layers from 120 and 84 to 200 and 140 (maintain 0.7 ratio)

In [None]:
class LeNet5_v3(nn.Module):
    def __init__(self):
        super().__init__()
        self.first_wave = nn.Sequential(
              nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2),   # 28x28 -> 32x32 -> 28x28
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 28x28 -> 14x14
        )

        self.second_wave = nn.Sequential(      
              nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1),  # 14x14 -> 10x10
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 10x10 -> 5x5
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=64*5*5, out_features=200),
            nn.ReLU(),
            nn.Linear(in_features=200, out_features=140),
            nn.ReLU(),
            nn.Linear(in_features=140, out_features=10),
        )
        
        # Initialize weights using Kaiming Uniform
        self._initialize_weights()
        
        # Additional output layers
        self.aux_output1 = nn.Linear(32*14*14, 10)  # After first pooling
        self.aux_output2 = nn.Linear(64*5*5, 10)   # After second pooling

    def forward(self, x):
        x = self.first_wave(x)
        aux1 = self.aux_output1(x.view(x.size(0), -1))
        
        x = self.second_wave(x)
        aux2 = self.aux_output2(x.view(x.size(0), -1))
        
        x = self.classifier(x)
        
        return x, aux1, aux2

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def get_features(self, x):
        # Pass through the first wave of layers
        x = self.first_wave(x)
        
        # Pass through the second wave of layers
        x = self.second_wave(x)
        
        # Flatten the output for the classifier
        x = x.view(x.size(0), -1)

        # Pass through all layers of the classifier except the last one
        for layer in self.classifier[:-1]:
            x = layer(x)

        return x

In [None]:
model_lenet5_v3 = LeNet5_v3()

summary(model=model_lenet5_v3, input_size=(1, 1, 28, 28), col_width=20,
                  col_names=['input_size', 'output_size', 'num_params', 'trainable'], row_settings=['var_names'], verbose=0)

In [None]:
model_lenet5_v3, history_lenet5_v3, learning_rates_lenet5_v3 = train_model_multiclass(model_lenet5_v3, 10, train_loader, validation_loader, device,
                                                                                      EPOCHS, LEARNING_RATE, True, 5, 0.5,
                                                                                      tensorboard_tracking=TENSORBOARD)

In [None]:
plot_train_val_metrics(history_lenet5_v3, 'lenet5_v3')

In [None]:
export_model(model_lenet5_v3, 'lenet5_model_v3')

### 4th variant
Add batch normalization

In [None]:
class LeNet5_v4(nn.Module):
    def __init__(self):
        super().__init__()
        self.first_wave = nn.Sequential(
              nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2),   # 28x28 -> 32x32 -> 28x28
              nn.BatchNorm2d(32),
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 28x28 -> 14x14
        )

        self.second_wave = nn.Sequential(      
              nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1),  # 14x14 -> 10x10
              nn.BatchNorm2d(64),
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2),  # 10x10 -> 5x5
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=64*5*5, out_features=200),
            nn.ReLU(),
            nn.Linear(in_features=200, out_features=140),
            nn.ReLU(),
            nn.Linear(in_features=140, out_features=10),
        )
        
        # Initialize weights using Kaiming Uniform
        self._initialize_weights()
        
        # Additional output layers
        self.aux_output1 = nn.Linear(32*14*14, 10)  # After first pooling
        self.aux_output2 = nn.Linear(64*5*5, 10)   # After second pooling

    def forward(self, x):
        x = self.first_wave(x)
        aux1 = self.aux_output1(x.view(x.size(0), -1))
        
        x = self.second_wave(x)
        aux2 = self.aux_output2(x.view(x.size(0), -1))
        
        x = self.classifier(x)
        
        return x, aux1, aux2

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def get_features(self, x):
        # Pass through the first wave of layers
        x = self.first_wave(x)
        
        # Pass through the second wave of layers
        x = self.second_wave(x)
        
        # Flatten the output for the classifier
        x = x.view(x.size(0), -1)

        # Pass through all layers of the classifier except the last one
        for layer in self.classifier[:-1]:
            x = layer(x)

        return x

In [None]:
model_lenet5_v4 = LeNet5_v4()

summary(model=model_lenet5_v4, input_size=(1, 1, 28, 28), col_width=20,
                  col_names=['input_size', 'output_size', 'num_params', 'trainable'], row_settings=['var_names'], verbose=0)

In [None]:
model_lenet5_v4, history_lenet5_v4, learning_rates_lenet5_v4 = train_model_multiclass(model_lenet5_v4, 10, train_loader, validation_loader, device,
                                                                                      EPOCHS, LEARNING_RATE, True, 5, 0.5,
                                                                                      tensorboard_tracking=TENSORBOARD)

In [None]:
plot_train_val_metrics(history_lenet5_v4, 'lenet5_v4')

In [None]:
export_model(model_lenet5_v4, 'lenet5_model_v4')

## Store results in CSV

In [None]:
def export_model_metrics(model_metrics, file_name='model_metrics'):
    '''
    Outputs best epoch model training and validation results to file.

    :param model_metrics: dictionary of best epoch training and validation results dictionaries for every model
    :param file_name: output file name (exclusing extension)
    '''
    file_name = file_name + '.csv'
    with open(file_name, mode='w', newline='') as file:
        writer = csv.writer(file)
        
        # Write the header
        header = ['Model', 'Train Loss', 'Validation Loss', 'Train Accuracy', 'Validation Accuracy']
        writer.writerow(header)

        # Write the data for each model
        for model_name, metrics in model_metrics.items():
            row = [
                model_name,
                metrics['train_loss'],
                metrics['val_loss'],
                metrics['train_acc'].item() if torch.is_tensor(metrics['train_acc']) else metrics['train_acc'],
                metrics['val_acc'].item() if torch.is_tensor(metrics['val_acc']) else metrics['val_acc']
            ]
            writer.writerow(row)

In [None]:
model_metrics = {
    'lenet5_model_original': {'train_loss': history_lenet5['train_loss'][-1], 
                              'val_loss': history_lenet5['val_loss'][-1], 
                              'train_acc': history_lenet5['train_acc'][-1], 
                              'val_acc': history_lenet5['val_acc'][-1]},
    'lenet5_model_v1': {'train_loss': history_lenet5_v1['train_loss'][-1], 
                        'val_loss': history_lenet5_v1['val_loss'][-1], 
                        'train_acc': history_lenet5_v1['train_acc'][-1], 
                        'val_acc': history_lenet5_v1['val_acc'][-1]},
    'lenet5_model_v2': {'train_loss': history_lenet5_v2['train_loss'][-1], 
                        'val_loss': history_lenet5_v2['val_loss'][-1], 
                        'train_acc': history_lenet5_v2['train_acc'][-1], 
                        'val_acc': history_lenet5_v2['val_acc'][-1]},
    'lenet5_model_v3': {'train_loss': history_lenet5_v3['train_loss'][-1], 
                        'val_loss': history_lenet5_v3['val_loss'][-1], 
                        'train_acc': history_lenet5_v3['train_acc'][-1], 
                        'val_acc': history_lenet5_v3['val_acc'][-1]},
    'lenet5_model_v4': {'train_loss': history_lenet5_v4['train_loss'][-1], 
                        'val_loss': history_lenet5_v4['val_loss'][-1], 
                        'train_acc': history_lenet5_v4['train_acc'][-1], 
                        'val_acc': history_lenet5_v4['val_acc'][-1]},
}

export_model_metrics(model_metrics, 'final_model_metrics.csv')

## Test the model with the highest validation accuracy

In [None]:
results = pd.read_csv('final_model_metrics.csv')

# Find the id of the model with the max Validation Accuracy
max_val_acc_id = results['Validation Accuracy'].idxmax()

# Retrieve the row of this specific model
max_model = results.loc[max_val_acc_id]

max_val_acc_formatted = float('{:.3f}'.format(max_model['Validation Accuracy'])) *100

print(f'The model with the highest validation accuracy ({max_val_acc_formatted}%) is {max_model['Model']}')

In [None]:
results

In [None]:
# Load best model
model_file = f'models/{max_model['Model']}.pt'
model = torch.jit.load(model_file)

test_loss, test_acc, y_preds, y_labels = test_model(model, test_loader)

In [None]:
def plot_confusion_matrix(preds, labels, labels_map, file_name='confusion_matrix', plots_path='graphs'):
    '''
    Plots confusion matrix of predicted labels and actual labels and outputs to file.

    :param preds: predicted labels
    :param labels: actual labels
    :param labels_map: dictionary indexing dataset's labels to image descriptors
    :param file_name: output file name (exclusing extension)
    :param plots_path: plots directory path
    '''
    
    # Extract label names in the order of their numerical values
    label_names = [labels_map[i] for i in range(len(labels_map))]

    # Compute the confusion matrix
    cm = confusion_matrix(labels, preds)

    # Plot the confusion matrix
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_names, yticklabels=label_names)
    plt.xlabel('True labels')
    plt.ylabel('Predicted labels')
    plt.title('Confusion Matrix')
    
    # Save plot to file
    save_path = os.path.join(plots_path +'/', file_name + '.png')
    plt.savefig(save_path, bbox_inches='tight', dpi=300)
    
    plt.show()

In [None]:
plot_confusion_matrix(y_preds, y_labels, labels_map, 'cm_best_model')

## Test the model with the highest validation accuracy after retraining on train and validation datasets

In [None]:
# Initialize appropriate model
if max_model['Model'] == lenet5_model_original:
    model = LeNet5()
    adapt_lr = False
elif max_model['Model'] == 'lenet_model_v1':
    model = LeNet5_v1()
    adapt_lr = True
elif max_model['Model'] == 'lenet_model_v2':
    model = LeNet5_v2()
    adapt_lr = True
elif max_model['Model'] == 'lenet_model_v3':
    model = LeNet5_v3()
    adapt_lr = True
elif max_model['Model'] == 'lenet_model_v4':
    model = LeNet5_v4()
    adapt_lr = True

# Retrain model on both train and validation data
model, history, learning_rates = train_model_multiclass(model, 10, train_validation_loader, None, device,
                                                        EPOCHS, LEARNING_RATE, adapt_lr, 5, 0.5,
                                                        tensorboard_tracking=TENSORBOARD)

In [None]:
test_loss, test_acc, y_preds, y_labels = test_model(model, test_loader)

In [None]:
plot_confusion_matrix(y_preds, y_labels, labels_map, 'cm_best_model_retrained')

## K-fold cross validation

### Retrain the model with the highest validation accuracy with k-fold cross validation

In [None]:
# Initialize results for for k-fold cross validation
kf_results = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}

# Initialize the k-fold cross validation
kf = KFold(n_splits=5, shuffle=True)

# Loop through each fold
for fold, (train_idx, test_idx) in enumerate(kf.split(train_val_dataset)):
    # Create data loaders for training and validation sets for fold
    train_loader_fold = torch.utils.data.DataLoader(train_val_dataset, batch_size=BATCH_SIZE, sampler=SubsetRandomSampler(train_idx))
    validation_loader_fold = torch.utils.data.DataLoader(train_val_dataset, batch_size=BATCH_SIZE, sampler=SubsetRandomSampler(test_idx))

    # Initialize appropriate model
    if max_model['Model'] == lenet5_model_original:
        model_fold = LeNet5()
        adapt_lr = False
    elif max_model['Model'] == 'lenet_model_v1':
        model_fold = LeNet5_v1()
        adapt_lr = True
    elif max_model['Model'] == 'lenet_model_v2':
        model_fold = LeNet5_v2()
        adapt_lr = True
    elif max_model['Model'] == 'lenet_model_v3':
        model_fold = LeNet5_v3()
        adapt_lr = True
    elif max_model['Model'] == 'lenet_model_v4':
        model_fold = LeNet5_v4()
        adapt_lr = True

    # Train model on fold
    model_fold, history_fold, learning_rates_fold = train_model_multiclass(model_fold, 10, train_loader_fold, validation_loader_fold, device,
                                                                           EPOCHS, LEARNING_RATE, adapt_lr, 5, 0.5,
                                                                           tensorboard_tracking=TENSORBOARD)

    # Export model for fold
    export_model(model_fold, max_model['Model'] + '_crossval_fold' + str(fold + 1))
    
    # Plot results for fold
    plot_train_val_metrics(history_fold, 'lenet5_crossval_fold' + str(fold + 1))

    # Store performance metrics for fold
    kf_results['train_loss'].append(history_fold['train_loss'][-1])
    kf_results['train_acc'].append(history_fold['train_acc'][-1])
    kf_results['val_loss'].append(history_fold['val_loss'][-1])
    kf_results['val_acc'].append(history_fold['val_acc'][-1])

# Calculate averages across folds
avg_train_loss = np.mean(kf_results['train_loss'])
avg_train_acc = np.mean(kf_results['train_acc'])
avg_val_loss = np.mean(kf_results['val_loss'])
avg_val_acc = np.mean(kf_results['val_acc'])

print('Average Training Loss:', avg_train_loss)
print('Average Training Accuracy:', avg_train_acc)
print('Average Validation Loss:', avg_val_loss)
print('Average Validation Accuracy:', avg_val_acc)