In [None]:
import os
import random

import matplotlib.pyplot as plt

import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms

from torch.utils.data import Dataset
import torch.optim as optim
from tqdm import tqdm

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import seaborn as sn
import pandas as pd

Define Helpers

In [None]:
def get_n_params(model):
    """ Function to count number of parameters in a model. """
    np=0
    for p in list(model.parameters()):
        np += p.nelement()
    return np

# For reproducibility
def seed_everything(seed):
    """ Seeds all relevant random generators to the same value. """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    print('Manual seed changed successfully.')

seed = 42
seed_everything(seed)

###  Load Raw Data and Define Datasets

In [None]:
def load_data(train_batch_size, test_batch_size):
    
    # Data in train set and test set are [im_tensor, label]. im_tensor size - 1x32x32 (gray scale, 32x32 pixels)
    trainset = datasets.MNIST('../Datasets/', train=True, download=True,
                               transform=transforms.Compose([
                               transforms.Resize((32, 32)),
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))
                               ]))

    val_set_size = int(0.2 * len(trainset))
    trainset, valset = torch.utils.data.random_split(trainset, [len(trainset) - val_set_size, val_set_size])

    testset = datasets.MNIST('../Datasets', train=False,
                              transform=transforms.Compose([
                              transforms.Resize((32, 32)),
                              transforms.ToTensor(),
                              transforms.Normalize((0.1307,), (0.3081,))
                              ]))

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=train_batch_size, shuffle=True)
    val_loader   = torch.utils.data.DataLoader(valset,   batch_size=train_batch_size, shuffle=False)
    test_loader  = torch.utils.data.DataLoader(testset,  batch_size=test_batch_size,  shuffle=False)

    return trainset, train_loader, valset, val_loader, testset, test_loader

In [None]:
# load train data
train_batch_size = 64
test_batch_size = 1000

train_set, train_loader, val_set, val_loader, test_set, test_loader = load_data(train_batch_size, test_batch_size)

print(f'data shape: train {len(train_set)}, val {len(val_set)}, test {len(test_set)}')

In [None]:
# Display some images
for i in range(9):
    plt.subplot(330 + 1 + i)
    plt.imshow(255 - train_set[i][0].squeeze(), cmap=plt.get_cmap('gray'))
    plt.title(f"Label {train_set[i][1]}")
plt.tight_layout()
plt.show()

### Model Definition

In [None]:
class CNN(nn.Module):
    def __init__(self, n_feat_maps, dropout_layer=0):
        super(CNN, self).__init__()
        self.n_feat_maps = n_feat_maps
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_feat_maps, kernel_size=5)
        self.conv2 = nn.Conv2d(n_feat_maps, n_feat_maps, kernel_size=5)
        self.fc1 = nn.Linear(n_feat_maps*5*5, 50)
        self.fc2 = nn.Linear(50, 10)
        
        if dropout_layer == 1:
            self.dropout = nn.Dropout(p=0.2)
            self.dropout_layer = 1
        else:
            self.dropout_layer = 0

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        
        if self.dropout_layer == 1:
            x = self.dropout(x)
        
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = torch.flatten(x, 1) # flatten all dimensions except the batch dimension
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)

        return x


In [None]:
# set device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Working on: ', device)

criterion = torch.nn.CrossEntropyLoss()

def train(epoch, model):

    model.train()
    train_loss = 0
    correct    = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss   = criterion(output, target)
        loss.backward()
        optimizer.step()
        

        train_loss += criterion(output, target).data  # sum up batch loss
        preds       = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct    += preds.eq(target.data.view_as(preds)).cpu().sum().item()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


    train_loss /= len(train_loader.dataset)
    accuracy    = 100. * correct / len(train_loader.dataset)
    
    total_train_accuracy.append(accuracy)
    total_train_loss.append(train_loss)


def eval(model,pred_res=0):

    model.eval()
    val_loss = 0
    correct  = 0

    preds_vec = []
    for data, target in tqdm(val_loader):
        data, target = data.to(device), target.to(device)
        output       = model(data)
        val_loss    += criterion(output, target).data  # sum up batch loss
        
        preds    = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += preds.eq(target.data.view_as(preds)).cpu().sum().item()
        preds_vec.append(preds.squeeze(1))
        
    val_loss /= len(val_loader.dataset)
    accuracy  = 100. * correct / len(val_loader.dataset)
    
    total_val_accuracy.append(accuracy)
    total_val_loss.append(val_loss)

    print('\nValidation set: average loss: {:.4f}, accuracy: {}/{} ({:.0f}%)\n'.format(
        val_loss, correct, len(val_loader.dataset),
        accuracy))
    
    if pred_res == 1:
        preds_vec = torch.hstack(preds_vec)
        return preds_vec, accuracy


### Train and Evaluate a Model

In [None]:
# define number of feature maps
n_feat_maps = 3
num_epoches = [5, 10, 15] # Define the number of epochs

T_train_acc_no,  T_val_acc_no  = [], [] 
T_train_loss_no, T_val_loss_no = [], []
models_comp_epoch = []

for e in range(len(num_epoches)):
    model_cnn        = CNN(n_feat_maps  )
    model_cnn_w_drop = CNN(n_feat_maps,1)

    models_comp = [model_cnn, model_cnn_w_drop]
    models_comp_epoch.append(model_cnn)
    models_comp_epoch.append(model_cnn_w_drop)
    
    print('\033[1mNumber of Epochs: {}\033[0m'.format(num_epoches[e]))
    
    for i in range(2):   
        # Send Model to device and set the optimizer
        models_comp[i].to(device)
        optimizer = optim.SGD(models_comp[i].parameters(), lr=0.01, momentum=0.5)
        print('Number of parameters: {}'.format(get_n_params(models_comp[i])))
        
        # Set count for accuracy and loss over epochs
        total_train_accuracy, total_val_accuracy = [], []   # Initiating accuracy count
        total_train_loss,     total_val_loss     = [], []   # Initiating loss count
        
        # Train and evaluate model
        print('\033[1mRun for model No. {}\033[0m'.format(i))
        for epoch in range(0, num_epoches[e]):
            train(epoch, models_comp[i])
            eval(models_comp[i])

        # Set the count of accuracy and loss for the specific model
        T_train_acc_no.append( total_train_accuracy)
        T_val_acc_no.append(   total_val_accuracy  )  
        T_train_loss_no.append(total_train_loss    )
        T_val_loss_no.append(  total_val_loss      )

In [None]:
print('\033[1mModel No. 0 - Without dropout layer\nModel No. 1 - With dropout layer\033[0m')

for i in range(2*len(num_epoches)):
    num_epoches_r = range(num_epoches[i//2])
    fig, ax = plt.subplots(nrows=1, ncols=2)
    fig.set_figheight(5)
    fig.set_figwidth(15)

    ax[0].plot(num_epoches_r, T_train_acc_no[i], c='b', label='Train'     )
    ax[0].plot(num_epoches_r, T_val_acc_no[i],   c='r', label='Validation')
    ax[0].legend(loc='lower right')

    ax[0].set_xlabel('Num_epochs') ; ax[0].set_ylabel('Accuracy [%]')
    ax[0].title.set_text('Train - Validation Accuracy')

    ax[0].grid() 
    ax[0].set_xticks(num_epoches_r)


    ax[1].plot(num_epoches_r, T_train_loss_no[i], c='b', label='Train'     )
    ax[1].plot(num_epoches_r, T_val_loss_no[i],   c='r', label='Validation')
    ax[1].legend(loc='upper right')

    ax[1].set_xlabel('Num_epochs') ; ax[1].set_ylabel('Loss')
    ax[1].title.set_text('Train - Validation Loss')

    ax[1].grid() 
    ax[1].set_xticks(num_epoches_r)

    if i % 2  > 0:
        md = 1
    else:
        md = 0
    fig.suptitle('Train vs. Validation predictions results\nModel No. {}\n Number of epochs: {}'.format(md, num_epoches[i//2]), fontweight="bold")
    plt.show()

### Inference

In [None]:
def test(model):

    model.eval()
    test_loss = 0
    correct   = 0

    preds_vec = []
    for data, target in tqdm(test_loader):
        data, target = data.to(device), target.to(device)
        output       = model(data)
        test_loss   += criterion(output, target).data  # sum up batch loss
        
        preds    = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += preds.eq(target.data.view_as(preds)).cpu().sum().item()
        preds_vec.append(preds.squeeze(1))

    test_loss    /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    
    print('\nTest set: loss: {:.4f}, accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        test_accuracy))

    preds_vec = torch.hstack(preds_vec)
    return preds_vec, test_accuracy


# Confusion Matrix

In [None]:
print('\033[1mModel No. 0 - Without dropout layer\nModel No. 1 - With dropout layer\033[0m')

for indx in range(2*len(num_epoches)):
    print('\033[1mNumber of Epochs: {}\033[0m'.format(num_epoches[indx//2]))
    test_predictions, test_accuracy = test(models_comp_epoch[indx]  )
    eval_predictions, eval_accuracy = eval(models_comp_epoch[indx],1)

    test_true = test_set.targets

    eval_true = []
    for batch_idx, (data, target) in enumerate(val_loader):
        eval_true.append(target)
    eval_true = torch.hstack(eval_true)

    test_confmat = confusion_matrix(test_true, test_predictions)
    eval_confmat = confusion_matrix(eval_true, eval_predictions)
    df_cm_test = pd.DataFrame(test_confmat, index = [i for i in range(10)],
                            columns = [i for i in range(10)])
    df_cm_eval = pd.DataFrame(eval_confmat, index = [i for i in range(10)],
                            columns = [i for i in range(10)])

    if indx % 2  > 0:
            md = 1
    else:
            md = 0

    print('\033[1mModel No. {}:\033[0m'.format(md))
    print('Test dataset accuracy: {}\nValidation dataset accuracy: {}'.format(round(test_accuracy,3),round(eval_accuracy,3)))

    f,(ax1,ax2) = plt.subplots(1,2)
    f.set_figheight(5)
    f.set_figwidth(15)

    sn.heatmap(df_cm_test, annot=True, fmt='g', ax=ax1); ax1.set_title('Test dataset Confusion Matrix',      fontweight='bold')
    sn.heatmap(df_cm_eval, annot=True, fmt='g', ax=ax2); ax2.set_title('Validation dataset Confusion Matrix',fontweight='bold')
    f.suptitle('Test vs. Validation Confusion Matrices\nModel No. {}\n Number of epochs: {}'.format(md,num_epoches[indx//2]), fontweight="bold")
    plt.show()