# COURSE: A deep understanding of deep learning
## SECTION: Understand and design CNNs
### LECTURE: The EMNIST dataset (letter recognition)
#### TEACHER: Mike X Cohen, sincxpress.com
##### COURSE URL: udemy.com/course/deeplearning_x/?couponCode=202210

In [1]:
# import libraries
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader,TensorDataset,Dataset
import copy
from sklearn.model_selection import train_test_split


# for importing data
import torchvision
import torchvision.transforms as T

import matplotlib.pyplot as plt
from IPython import display
display.set_matplotlib_formats('svg')

  display.set_matplotlib_formats('svg')


In [2]:
# use GPU if available
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


# Import and inspect the data

In [3]:
# download the dataset
cdata = torchvision.datasets.EMNIST(root='emnist',split='letters',download=True)

# more info: https://www.nist.gov/itl/products-and-services/emnist-dataset

In [4]:
# inspect the data

# the categories (but how many letters??)
print(cdata.classes)
print(str(len(cdata.classes)) + ' classes')

print('\nData size:')
print(cdata.data.shape)

# transform to 4D tensor for conv layers (and transform from int8 to float)
images_original = cdata.data.view([124800,1,28,28]).float()
print('\nTensor data:')
print(images_original.shape)

# brief aside: class 'N/A' doesn't exist in the data.
print( torch.sum(cdata.targets==0) )

# However, it causes problems in one-hot encoding...
torch.unique(cdata.targets)
print(cdata.class_to_idx)
# so we'll eliminate it and subtract 1 from the original

# remove the first class category
letterCategories = cdata.classes[1:]

# relabel labels to start at 0
labels_original = copy.deepcopy(cdata.targets)-1
print(labels_original.shape)

###
print( torch.sum(labels_original==0) )
print(torch.unique(labels_original))


# Normalize data
images_original /= torch.max(images_original)

['N/A', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
27 classes

Data size:
torch.Size([124800, 28, 28])

Tensor data:
torch.Size([124800, 1, 28, 28])
tensor(0)
{'N/A': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, 'm': 13, 'n': 14, 'o': 15, 'p': 16, 'q': 17, 'r': 18, 's': 19, 't': 20, 'u': 21, 'v': 22, 'w': 23, 'x': 24, 'y': 25, 'z': 26}
torch.Size([124800])
tensor(4800)
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25])


# Create train/test groups using DataLoader

In [5]:
# Add image permutations
custom_transform = nn.Sequential(
  T.RandomVerticalFlip(p=.5),
  T.RandomRotation(180)
)

# Increase the dataset size
images = torch.cat([images_original] * 1)
labels = torch.cat([labels_original] * 1)
images = custom_transform(images)

In [6]:
# Step 2: use scikitlearn to split the data
train_data,test_data, train_labels,test_labels = train_test_split(images, labels, test_size=.1)

# Step 3: convert into PyTorch Datasets
train_data = TensorDataset(train_data, train_labels)
test_data  = TensorDataset(test_data, test_labels)

# Step 4: translate into dataloader objects
batchsize    = 16
train_loader = DataLoader(train_data,batch_size=batchsize,shuffle=True,drop_last=True, num_workers=6)
test_loader  = DataLoader(test_data,batch_size=test_data.tensors[0].shape[0])

In [7]:
# check size (should be images X channels X width X height
print( train_loader.dataset.tensors[0].shape )
print( train_loader.dataset.tensors[1].shape )

torch.Size([112320, 1, 28, 28])
torch.Size([112320])


# Create the DL model

In [8]:
# create a class for the model
def makeTheNet(printtoggle=False):

  class emnistnet(nn.Module):
    def get_conv_layer_output_size(self, input_size: (int, int), padding: (int, int), kernel_size: (int, int), stride: (int, int), pool_size: (int, int)) -> (int, int):
      dim0 = int((((input_size[0] + (2 * padding[0]) - kernel_size[0]) / stride[0]) + 1) / pool_size[0])
      dim1 = int((((input_size[1] + (2 * padding[1]) - kernel_size[1]) / stride[1]) + 1) / pool_size[1])
      return dim0, dim1

    def __init__(self,printtoggle):
      super().__init__()
      
      # print toggle
      self.print = printtoggle

      ### -------------- feature map layers -------------- ###
      # first convolution layer
      self.conv1  = nn.Conv2d(1, 16, 3, padding=4)
      self.bnorm1 = nn.BatchNorm2d(self.conv1.out_channels)
      self.conv1_pool_size = (2, 2)
      self.conv1_size = self.get_conv_layer_output_size((28, 28), self.conv1.padding, self.conv1.kernel_size, self.conv1.stride, self.conv1_pool_size)

      # second convolution layer
      self.conv2  = nn.Conv2d(self.conv1.out_channels, 64, 3, padding=2)
      self.bnorm2 = nn.BatchNorm2d(self.conv2.out_channels)
      self.conv2_pool_size = (2, 2)
      self.conv2_size = self.get_conv_layer_output_size(self.conv1_size, self.conv2.padding, self.conv2.kernel_size, self.conv2.stride, self.conv2_pool_size)

      # third convolution layer
      self.conv3  = nn.Conv2d(self.conv2.out_channels, 256, 3, padding=1)
      self.bnorm3 = nn.BatchNorm2d(self.conv3.out_channels)
      self.conv3_pool_size = (2, 2)
      self.conv3_size = self.get_conv_layer_output_size(self.conv2_size, self.conv3.padding, self.conv3.kernel_size, self.conv3.stride, self.conv3_pool_size)

      # fourth convolution layer
      self.conv4  = nn.Conv2d(self.conv3.out_channels, 1024, 3, padding=1)
      self.bnorm4 = nn.BatchNorm2d(self.conv4.out_channels)
      self.conv4_pool_size = (2, 2)
      self.conv4_size = self.get_conv_layer_output_size(self.conv3_size, self.conv4.padding, self.conv4.kernel_size, self.conv4.stride, self.conv4_pool_size)

      ### -------------- linear decision layers -------------- ###
      self.fc1 = nn.Linear(self.conv3.out_channels * self.conv3_size[0] * self.conv3_size[1], 256)
      self.fc2 = nn.Linear(self.fc1.out_features, 128)
      self.fc3 = nn.Linear(self.fc2.out_features, 26)

    def forward(self,x):
      
      if self.print: print(f'Input: {list(x.shape)}')
      
      # first block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv1(x), self.conv1_pool_size)
      x = F.relu(self.bnorm1(x))
      x = F.dropout(x, p=0.25, training=self.training)
      if self.print: print(f'First CPR block: {list(x.shape)}')

      # second block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv2(x), self.conv2_pool_size)
      x = F.relu(self.bnorm2(x))
      x = F.dropout(x, p=0.25, training=self.training)
      if self.print: print(f'Second CPR block: {list(x.shape)}')

      # third block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv3(x), self.conv3_pool_size)
      x = F.relu(self.bnorm3(x))
      x = F.dropout(x, p=0.25, training=self.training)
      if self.print: print(f'Third CPR block: {list(x.shape)}')

      # reshape for linear layer
      nUnits = x.shape.numel()/x.shape[0]
      x = x.view(-1,int(nUnits))
      if self.print: print(f'Vectorized: {list(x.shape)}')
      
      # linear layers
      x = F.relu(self.fc1(x))
      x = F.dropout(x, p=0.5, training=self.training)
      x = F.relu(self.fc2(x))
      x = F.dropout(x, p=0.5, training=self.training)
      x = self.fc3(x)
      if self.print: print(f'Final output: {list(x.shape)}')

      return x

  # create the model instance and copy to device (GPU)
  net = emnistnet(printtoggle).to(device)
  
  # loss function
  lossfun = nn.CrossEntropyLoss()

  # optimizer
  optimizer = torch.optim.Adam(net.parameters(), lr=.001, weight_decay=0.0001)

  return net,lossfun,optimizer

In [9]:
# test the model with one batch
net,lossfun,optimizer = makeTheNet(True)

X,y = next(iter(train_loader))
yHat = net(X.to(device)).cpu()

# check size of output
print('\nOutput size:')
print(yHat.shape)

# # now let's compute the loss
loss = lossfun(yHat,torch.squeeze(y))
print(' ')
print('Loss:')
print(loss)

Input: [16, 1, 28, 28]
First CPR block: [16, 16, 17, 17]
Second CPR block: [16, 64, 9, 9]
Third CPR block: [16, 256, 4, 4]
Vectorized: [16, 4096]
Final output: [16, 26]

Output size:
torch.Size([16, 26])
 
Loss:
tensor(3.2706, grad_fn=<NllLossBackward0>)


# Create a function that trains the model

In [12]:
# a function that trains the model
def function2trainTheModel():

  # number of epochs
  numepochs = 5

  numbatches = len(list(train_loader))
  
  # create a new model
  net,lossfun,optimizer = makeTheNet()

  # initialize losses (calculate everything on device) to reduce IO overhead
  trainLoss = torch.zeros(numepochs).to(device)
  testLoss  = torch.zeros(numepochs).to(device)
  trainErr  = torch.zeros(numepochs).to(device)
  testErr   = torch.zeros(numepochs).to(device)

  batchLoss = torch.zeros(numbatches).to(device)
  batchErr  = torch.zeros(numbatches).to(device)

  # loop over epochs
  for epochi in range(numepochs):
    print(f"Starting epoch {epochi}...")
    # loop over training data batches
    net.train()
    batch_idx = 0
    for X,y in train_loader:
      X = X.to(device)
      y = y.to(device)

      # forward pass and loss
      yHat = net(X)
      loss = lossfun(yHat,y)

      # backprop
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # loss and error from this batch
      batchLoss[batch_idx] = loss
      batchErr[batch_idx] = torch.mean((torch.argmax(yHat,axis=1) != y).float())
      batch_idx += 1
    # end of batch loop...

    # and get average losses and error rates across the batches
    trainLoss[epochi] = torch.mean(batchLoss)
    trainErr[epochi]  = 100*torch.mean(batchErr)

    ### test performance
    net.eval()
    X,y = next(iter(test_loader)) # extract X,y from test dataloader
    X = X.to(device)
    y = y.to(device)
    with torch.no_grad(): # deactivates autograd
      yHat = net(X)
      loss = lossfun(yHat,y)
      
    # get loss and error rate from the test batch
    testLoss[epochi] = loss
    testErr[epochi]  = 100*torch.mean((torch.argmax(yHat,axis=1) != y).float())

    print(f"Training... Epoch [{epochi+1}/{numepochs}] done. Error: {testErr[epochi]:.02f}%")

  # end epochs

  # Move all results to CPU
  trainLoss = trainLoss.cpu().detach().numpy()
  testLoss  = testLoss.cpu().detach().numpy()
  trainErr  = trainErr.cpu().detach().numpy()
  testErr   = testErr.cpu().detach().numpy()

  # function output
  return trainLoss,testLoss,trainErr,testErr,net

# Run the model and show the results!

In [None]:
from copy import deepcopy
best_train_loss = []
best_test_loss = []
best_train_err = []
best_test_err = []
bet_net = []

num_repeats = 3
for i in range(num_repeats):
  trainLoss,testLoss,trainErr,testErr,net = function2trainTheModel()

  print(f"Training [{i+1}/{num_repeats}] finished. Error: [{testErr[-1]:.2f}%]")

  if len(best_test_err) == 0 or best_test_err[-1] > testErr[-1]:
    best_train_loss = deepcopy(trainLoss)
    best_test_loss = deepcopy(testLoss)
    best_train_err = deepcopy(trainErr)
    best_test_err = deepcopy(testErr)
    bet_net = deepcopy(net)


Starting epoch 0...
Training... Epoch [1/5] done. Error: 10.91%
Starting epoch 1...
Training... Epoch [2/5] done. Error: 8.21%
Starting epoch 2...
Training... Epoch [3/5] done. Error: 7.41%
Starting epoch 3...
Training... Epoch [4/5] done. Error: 6.70%
Starting epoch 4...


In [None]:
fig,ax = plt.subplots(1,2,figsize=(16,5))

ax[0].plot(best_train_loss,'s-',label='Train')
ax[0].plot(best_test_loss,'o-',label='Test')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('Loss (MSE)')
ax[0].set_title('Model loss')

ax[1].plot(best_train_err,'s-',label='Train')
ax[1].plot(best_test_err,'o-',label='Test')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Error rates (%)')
ax[1].set_title(f'Final model test error rate: {best_test_err[-1]:.2f}%')
ax[1].legend()

plt.show()

In [None]:
### visualize some images

# extract X,y from test dataloader
X,y = next(iter(test_loader))
X = X.to(device) # push data to GPU
y = y.to(device) # push data to GPU
yHat = net(X)

# pick some examples at random to show
randex = np.random.choice(len(y),size=21,replace=False)

# visualize some images
fig,axs = plt.subplots(3,7,figsize=(15,6))

for i,ax in enumerate(axs.flatten()):

  # extract the image and its target letter
  I = np.squeeze( X[randex[i],0,:,:] ).cpu() # .cpu() to transfer back from GPU!
  trueLetter = letterCategories[ y[randex[i]] ]
  predLetter = letterCategories[ torch.argmax(yHat[randex[i],:]) ]
  
  # color-code the accuracy (using ternary operator)
  col = 'gray' if trueLetter==predLetter else 'hot'
  
  # visualize
  ax.imshow(I.T,cmap=col)
  ax.set_title('True %s, predicted %s' %(trueLetter,predLetter),fontsize=10)
  ax.set_xticks([])
  ax.set_yticks([])

plt.show()

# Accuracy by letter

In [None]:
import sklearn.metrics as skm

# compute the confusion matrix
C = skm.confusion_matrix(y.cpu(),torch.argmax(yHat.cpu(),axis=1),normalize='true')

# visualize it
fig = plt.figure(figsize=(10,10))
plt.imshow(C,'Blues',vmax=.05)

# make the plot look nicer
plt.xticks(range(26),labels=letterCategories)
plt.yticks(range(26),labels=letterCategories)
plt.title('TEST confusion matrix')
plt.xlabel('True number')
plt.xlabel('Predicted number')
plt.ylabel('True number')

plt.show()