In [None]:
import torch
import torch.nn as nn 
import torch.optim as optim 
import torch.nn.functional as F

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader

import numpy as np 
import seaborn as sns 

from matplotlib import pyplot as plt 
from sklearn.metrics import confusion_matrix 

from load_mnist import load_mnist 

In [None]:
# Loading training and testing data
xtrain, ytrain, xtest, ytest = load_mnist() 

In [None]:
# Reset function
def reset_model(model):
    for layer in model.children():
       if hasattr(layer, 'reset_parameters'): 
           layer.reset_parameters() 

In [None]:
# Get dimensions of training and testing data
M = ytrain.shape[1] 
p = xtrain.shape[1] 

ntrain = xtrain.shape[0] 
ntest = xtest.shape[0] 

In [None]:
# Converting training and testing data to torch tensors 
xtra_torch = torch.tensor(xtrain).float()
ytra_torch = torch.tensor(ytrain).float()
xtes_torch = torch.tensor(xtest).float()
ytes_torch = torch.tensor(ytest).float() 

# Converting our flat vectors to images for the CNNs
xtra_conv = torch.tensor(xtrain).reshape(ntrain, 28, 28).float().unsqueeze(1)
ytra_conv = torch.tensor(ytrain).float()
xtes_conv = torch.tensor(xtest).reshape(ntest, 28, 28).float().unsqueeze(1)
ytes_conv = torch.tensor(ytest).float() 

In [None]:
# Check if CUDA is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", torch.cuda.get_device_name(device)) 

In [None]:
# FCN
num_batch = 1000    
training_data = TensorDataset(xtra_torch.to(device), ytra_torch.to(device)) 
train_dat_fcn = DataLoader(training_data, shuffle = True, batch_size = num_batch) 

xte = xtes_torch.to(device) 
yte = ytes_torch.to(device) 

# CNN
train_dat_im = TensorDataset(xtra_conv.to(device), ytra_conv.to(device))
train_dat_cnn = DataLoader(train_dat_im, shuffle = True, batch_size = num_batch)

xte_im = xtes_conv.to(device) 
yte_im = ytes_conv.to(device) 

In [None]:
# 2-layer network 
model_2L = nn.Sequential(nn.Linear(p, 100), nn.ReLU(), nn.Linear(100, M)) 

# 4-layer network 
model_4L = nn.Sequential(nn.Linear(p, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, M)) 

# Convolutional network 
conv_mod = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                         nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.ReLU(), nn.Flatten(), nn.Linear(32 * 7 * 7, M)) 

conv_swap = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.MaxPool2d(kernel_size = 2, stride = 2), nn.ReLU(),
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.MaxPool2d(kernel_size = 2, stride = 2), nn.ReLU(), 
                        nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.ReLU(), nn.Flatten(), nn.Linear(32 * 7 * 7, M)) 

conv_swap_htan = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.MaxPool2d(kernel_size = 2, stride = 2), nn.Tanh(),
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.MaxPool2d(kernel_size = 2, stride = 2), nn.Tanh(), 
                        nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.Tanh(), nn.Flatten(), nn.Linear(32 * 7 * 7, M))

conv_mod_bn = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(8), nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                        nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(32), nn.ReLU(), nn.Flatten(), nn.Linear(32 * 7 * 7, M)) 

conv_mod_bn_dropout = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(8), nn.ReLU(), nn.Dropout2d(0.5), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(16), nn.ReLU(), nn.Dropout2d(0.5), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                        nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.BatchNorm2d(32), nn.ReLU(), nn.Dropout2d(0.5), nn.Flatten(), nn.Linear(32 * 7 * 7, M)) 

conv_mod_bn_selu = nn.Sequential(nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding = 1), nn.SELU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                          nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3,3), stride = 1, padding = 1), nn.SELU(), nn.MaxPool2d(kernel_size = 2, stride = 2), 
                        nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (3,3), stride = 1, padding = 1), nn.SELU(), nn.Flatten(), nn.Linear(32 * 7 * 7, M)) 


# Selecting model 
mod = conv_mod  

# Resetting model when we switch architectures 
reset_model(mod) 

# Moving model to GPU 
mod.to(device) 

# Check dimensions through network 
check_dims = False 
if check_dims == True: 
    X = torch.rand(size = (10, 1, 28, 28)).to(device) 
    for layer in mod: 
        X = layer(X) 
        print(layer.__class__.__name__, 'output shape:\t', X.shape)

In [None]:
# Loss function 
loss = F.cross_entropy 
it = 0 
loss_train , acc_train = [], [] 
loss_test, acc_test = [], [] 
l_rate = 0.05

# Vanilla SGD optimizer 
sgd_opt = optim.SGD(mod.parameters(), lr = l_rate) 

# Adam optimizer
adam_opt = optim.Adam(mod.parameters(), lr = 0.001 , betas = (0.9, 0.999), eps = 1e-08, weight_decay = 0, amsgrad = False) 

# Exercise 1.5 - choosing three different optimizers 
# Adadelta optimizer
adad_opt = optim.Adadelta(mod.parameters(), lr = 1.0, rho = 0.9, eps = 1e-06, weight_decay = 0) 
 
# Adagrad optimizer
adag_opt = optim.Adagrad(mod.parameters(), lr = 0.01, lr_decay = 0, weight_decay = 0, initial_accumulator_value = 0, eps = 1e-10) 

# Adamax optimizer
adamax_opt = optim.Adamax(mod.parameters(), lr = 0.002, betas = (0.9, 0.999), eps = 1e-08, weight_decay = 0) 

# ASGD optimizer
asgd_opt = optim.ASGD(mod.parameters(), lr = 0.01, lambd = 0.0001, alpha = 0.75, t0 = 1000000.0, weight_decay = 0) 

# RMSprop optimizer 
rmsp_opt = optim.RMSprop(mod.parameters(), lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0, momentum = 0, centered = False) 

In [None]:
# Selecting optimizer
optr = adam_opt
full_n = True  

if full_n == False: 
    training_dat = train_dat_fcn 
    xt = xte 
    yt = yte 
else:
    training_dat = train_dat_cnn 
    xt = xte_im 
    yt = yte_im 

In [None]:
# Optimization loop
num_epochs = 50

y_predictions = []

for epoch in range(num_epochs): 
    mod.train() 
    for xbatch, ybatch in training_dat: 
        optr.zero_grad() 
        prediction = mod(xbatch) 
        pred = torch.argmax(prediction, dim = 1) 
        
        true_lab = torch.argmax(ybatch, dim = 1) 
        
        ce_loss = loss(prediction, true_lab) 
        
        acc_train.append(100 * (1/num_batch) * torch.sum(pred == true_lab, dim = 0).item()) 

        loss_train.append(ce_loss.item()) 
        
        ce_loss.backward() 
        optr.step() 
        
        mod.eval() 
        with torch.no_grad(): 
            ytrue_lab = torch.argmax(yt, dim = 1) 
            prediction_test = mod(xt) 
            pred_test = torch.argmax(prediction_test, dim = 1) 
            
            ce_test = loss(prediction_test, ytrue_lab) 
            loss_test.append(ce_test.item()) 
            acc_test.append(100 * (1/ntest) * torch.sum(pred_test == ytrue_lab, dim = 0).item()) 
            
            # Saving final predictions 
            if epoch == (num_epochs - 1): 
                y_predictions = mod(xt) 
            
        it += 1 
    print("Epoch %s/%s" % (epoch + 1, num_epochs)) 

In [None]:
plt.figure(1) 
loss_tra, = plt.plot(loss_train, 'r') 
loss_tes, = plt.plot(loss_test, 'b') 
plt.title("Training and testing loss") 
plt.xlabel("Iteration") 
plt.ylabel("Loss") 
plt.legend([loss_tra, loss_tes], ['Train loss', 'Test loss']) 
plt.annotate("Final train loss: %s" % (loss_train[-1]) ,xycoords = 'figure fraction', xy = (0.2,0.5)) 
plt.annotate("Final test loss: %s" % (loss_test[-1]), xycoords = 'figure fraction', xy = (0.2,0.55)) 
print("Final training loss: %s." % loss_train[-1]) 
print("Final testing loss: %s." % loss_test[-1]) 
plt.savefig("loss_plots", dpi = 500) 

plt.figure(2) 
acc_tra, = plt.plot(acc_train, 'r') 
acc_tes, = plt.plot(acc_test, 'b') 
plt.title("Training and testing accuracy") 
plt.xlabel("Iteration") 
plt.ylabel("Accuracy in %") 
plt.legend([acc_tra, acc_tes], ['Train accuracy', 'Test accuracy'])
plt.annotate("Final train accuracy: %s%%" % (acc_train[-1]) ,xycoords = 'figure fraction', xy = (0.2,0.5))
plt.annotate("Final test accuracy: %s%%" % (acc_test[-1]), xycoords = 'figure fraction', xy = (0.2,0.55))
print() 
print("Final training accuracy: %s%%." % acc_train[-1])
print("Final testing accuracy: %s%%." % acc_test[-1]) 
plt.savefig("acc_plots", dpi = 500) 

In [None]:
# Retrieve data and labels from the GPU 
y_test = yte_im.data.cpu() 

# Remove singleton dimension 
y_test = np.squeeze(y_test, axis = 1) 
ytrue = torch.argmax(y_test, dim = 1) 

ypreds = y_predictions.data.cpu() 
ypredslab = torch.argmax(ypreds, dim = 1) 

cf_mat = confusion_matrix(ytrue, ypredslab, normalize = None) 

mnist_classes = np.arange(10) 
mnist_lab = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] 

if True: 
    plt.figure(3) 
    cmat = sns.heatmap(cf_mat, cmap = 'Greens', annot = True, fmt = "d", cbar_kws = {'label':'Number'}) # use .3f for floating-point data 
    cmat.set_xticklabels(mnist_lab, rotation = 'horizontal') 
    cmat.set_yticklabels(mnist_lab, rotation = 'horizontal') 
    plt.xticks(mnist_classes, mnist_lab) 
    plt.yticks(mnist_classes, mnist_lab) 
    plt.savefig("confusion_mat", dpi = 500) 

# Showing 3 examples of wrong classifications 
wrong_examples = (ytrue != ypredslab) 

wrong_preds = np.squeeze(xtes_conv[wrong_examples]) 
wrong_labs = ypredslab[wrong_examples] 
actual_labs = torch.argmax(ytes_conv[wrong_examples], dim = 1)

ind = np.random.choice(a = wrong_labs.shape[0], size = 4, replace = True)

fig, ax = plt.subplots(2 , 2) 
for a in fig.axes: 
    a.set_xticks([]) 
    a.set_yticks([]) 
    
ax[0,0].imshow(wrong_preds[ind[0], :, :], cmap = 'gray') 
ax[0,1].imshow(wrong_preds[ind[1], :, :], cmap = 'gray') 
ax[1,0].imshow(wrong_preds[ind[2], :, :], cmap = 'gray') 
ax[1,1].imshow(wrong_preds[ind[3], :, :], cmap = 'gray') 
plt.savefig("wrong_preds", dpi = 500) 

for i in ind: 
    print("Correct label: %d" % actual_labs[i].item()) 
    print("Predicted label: %d" % wrong_labs[i].item()) 