In [None]:
# @author: Ishman Mann
# @date: 13/10/2022
# 
# @description:
#   Classification model for CIFAR-10 dataset using a CNN in pyTorch
#
# @resources:
#   https://www.learnpytorch.io/
#
# @notes:
#
#
# @todo:
#   Build model
#   Train model -- think of where to put softmax, crossEntropyLoss has softmax in it, so I can't 
#                  put softmax in my model layers. When computing accuracy, just call softmax there 
#   Create a confusion matrix
#   Add image augmentation
#   Further hyperparameter tuning

In [None]:
######################################################################################################
# Magic lines
%load_ext tensorboard

# Imports

import matplotlib
from matplotlib import pyplot as plt
import numpy as np
import os
import pandas as pd

import shutil
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

import torchvision
from torchvision import datasets
from torchvision import transforms

from timeit import default_timer as timer 
from tqdm.auto import tqdm # for progress bar

In [2]:
# Set device as gpu
device = "cuda" if torch.cuda.is_available() else "cpu"

# Set random seeds
torch.manual_seed(42)

'd:\\Documents\\computer-vision-bootcamp-pyTorch'

In [None]:
# Create folder to save models/metadata

MODEL_VERSION = '13'
modelTopDir = os.path.join("./saved_models", f"model_v{MODEL_VERSION}")

if (not os.path.exists(modelTopDir)):
    modelCheckpointsDir = os.path.join(modelTopDir,'checkpoints') # model checkpoints will be stored here
    modelDir = os.path.join(modelTopDir,'model') # final model will be stored here
    modelStatsDir = os.path.join(modelTopDir,'stats') # logs dir, pyplot, and train history will be stored here
    modelLogsDir = os.path.join(modelStatsDir, 'logs') 

    os.makedirs(modelCheckpointsDir)
    os.makedirs(modelDir)
    os.makedirs(modelStatsDir)
    os.makedirs(modelLogsDir)
else:
  raise Exception('Saved model folders for model version manually specified already exist.')

# Create trian_history.txt file 
trainHistoryFilepath = os.path.join(modelStatsDir, "train_history.txt")

In [None]:
# Delete and recreate datasets folder (For Google Colab only)
if os.path.exists("./datasets"):
  shutil.rmtree("./datasets", ignore_errors=True)
  os.makedirs("./datasets")

In [None]:
######################################################################################################
# Loading testing and training data

trainData = datasets.CIFAR10(
    root="./datasets",
    train=True, # get train data
    download=True,
    transform=ToTensor(), # converts PIL to torch.tensor
    target_transform=None #dont transform targets (labels)!
)

testData = datasets.CIFAR10(
    root="./datasets",
    train=False, # get test data
    download=True,
    transform=ToTensor(),
    target_transform=None #dont transform targets (labels)!
) 

CLASS_NAMES = trainData.classes

In [None]:
# Split off some validation data
TRAIN_LENGTH = int(len(trainData.data)*0.8)
VALIDATE_LENGTH = int(len(trainData.data)*0.2)
trainData, validateData = torch.utils.data.random_split(trainData, [TRAIN_LENGTH, VALIDATE_LENGTH])

In [None]:
# Viewing a sample image
image, label = trainData[0]
imagePermuted = image.permute(1,2,0)
print(imagePermuted.shape)
plt.imshow(imagePermuted)
plt.title(CLASS_NAMES[label])

In [None]:
# Batch the data using DataLoader

BATCH_SIZE = 32

trainDataloader = DataLoader(trainData, batch_size=BATCH_SIZE, shuffle=True)
validateDataloader = DataLoader(validateData, batch_size=BATCH_SIZE, shuffle=False)
testDataloader = DataLoader(testData, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
######################################################################################################
# Create the model class

class CIFAR10ModelV1(nn.Module):

  def __init__(self, inputChannels: int, hiddenUnitsCnn: int, hiddenUnitsFc: int, outputShape: int):
    super().__init__()

    # Convolution & pooling layers
    self.cnn_layer_1 = nn.Sequential(
      nn.Conv2d(in_channels=inputChannels, out_channels=hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.Conv2d(in_channels=hiddenUnitsCnn, out_channels=hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.MaxPool2d(kernel_size=2, stride=2),
      nn.Dropout(p=0.2)
    )

    self.cnn_layer_2 = nn.Sequential(
      nn.Conv2d(in_channels=hiddenUnitsCnn, out_channels=2*hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=2*hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.Conv2d(in_channels=2*hiddenUnitsCnn, out_channels=2*hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=2*hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.MaxPool2d(kernel_size=2, stride=2),
      nn.Dropout(p=0.2)
    )

    self.cnn_layer_3 = nn.Sequential(
      nn.Conv2d(in_channels=2*hiddenUnitsCnn, out_channels=4*hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=4*hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.Conv2d(in_channels=4*hiddenUnitsCnn, out_channels=4*hiddenUnitsCnn,
                kernel_size=3, stride=1, padding="same"),
      nn.ReLU(),
      nn.BatchNorm2d(num_features=4*hiddenUnitsCnn, eps=1e-05, momentum=0.1, affine=True),
      nn.MaxPool2d(kernel_size=2, stride=2),
      nn.Dropout(p=0.1)
    )

    # Fully connected (FC) layers
    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=4*hiddenUnitsCnn*4*4, # 8*8 comes from maxpooling 32*32 pixels thrice
                   out_features=hiddenUnitsFc),
        nn.ReLU(),
        nn.BatchNorm1d(num_features=hiddenUnitsFc, eps=1e-05, momentum=0.1, affine=True),
        nn.Linear(in_features=hiddenUnitsFc,
                  out_features=outputShape)
    )

  # Placing model layers in forward()
  def forward(self, x: torch.Tensor):
    x = self.cnn_layer_1(x)
    x = self.cnn_layer_2(x)
    x = self.cnn_layer_3(x)
    x = self.classifier(x)
    return x


# Instantiate a model
modelInst = CIFAR10ModelV1(inputChannels=3, hiddenUnitsCnn=32, hiddenUnitsFc=512, outputShape=len(CLASS_NAMES)).to(DEVICE)
modelInst

In [None]:
# Set loss function, optimizer, and accuracy function
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=modelInst.parameters(), lr=0.0005, weight_decay=0.0025)

def accuracy_function(yActual, yPredicted):
  return torch.mean(torch.eq(yActual, yPredicted).float()).item()

In [None]:
######################################################################################################
# Create the training and testing loop

NUM_EPOCHS = 50

trainStartTime = timer()

trainLosses = []
trainAccuracies = []
testLosses = []
testAccuracies = []

for epoch in tqdm(range(NUM_EPOCHS)):
  epochStartTime = timer()
  epochHeaderMsg = f"Epoch: {epoch}\n------------------"
  print(epochHeaderMsg)
 
  ################################################################################
  # Training loop
  trainLoss, trainAccuracy = 0, 0
  modelInst.train()
  for xTrain, yTrain in trainDataloader:
    xTrain, yTrain = xTrain.to(DEVICE), yTrain.to(DEVICE)
    currentBatchSize = len(xTrain)
    # 1 - Forward pass
    trainLogits = modelInst(xTrain)
    trainPredictions = torch.softmax(trainLogits, dim=1).argmax(dim=1) # gets 1D tensor of predicted classes
    # 2 - Compute batch loss, accuracy and accumulate into trainLoss, trainAccuracy
    loss = loss_function(trainLogits, yTrain) # use logits 
    trainLoss += loss*currentBatchSize
    trainAccuracy += accuracy_function(yActual=yTrain, yPredicted=trainPredictions)*currentBatchSize
    # 3 - Set gradients to 0
    optimizer.zero_grad()
    # 4 - Compute loss gradients
    loss.backward()
    # 5 - Optimizer (This one uses gradient descent)
    optimizer.step()
  
  # Divide accumulated accuracy/loss by number of individual images, and print
  trainLoss /= TRAIN_LENGTH
  trainAccuracy /= TRAIN_LENGTH
  trainLosses.append(trainLoss)
  trainAccuracies.append(trainAccuracy)
  trainMsg = f"Train loss: {trainLoss:.5f}, Train accuracy: {trainAccuracy:.5f}%,\n"
  print(trainMsg)

  ################################################################################
  # Testing loop
  testLoss, testAccuracy = 0, 0
  modelInst.eval()
  with torch.inference_mode():
    for xValidate, yValidate in validateDataloader:
      xValidate, yValidate = xValidate.to(DEVICE), yValidate.to(DEVICE)
      currentBatchSize = len(xValidate)
      # 1 - Forward pass
      testLogits = modelInst(xValidate)
      testPredictions = torch.softmax(testLogits, dim=1).argmax(dim=1) 
      # 2 - Compute batch loss, accuracy and accumulate into testLoss, testAccuracy
      testLoss += loss_function(testLogits, yValidate)*currentBatchSize # use logits
      testAccuracy += accuracy_function(yActual=yValidate, yPredicted=testPredictions)*currentBatchSize
    
    # Divide accumulated accuracy/loss by number of individual images, and print
    testLoss /= VALIDATE_LENGTH
    testAccuracy /= VALIDATE_LENGTH
    testLosses.append(testLoss)
    testAccuracies.append(testAccuracy)
    testMsg = f"Test loss: {testLoss:.5f}, Test accuracy: {testAccuracy:.5f}%\n"
    print(testMsg)

  ################################################################################
  # Each epoch, print epoch train time
  epochEndTime = timer()
  epochTimeMsg = f"Train time: {(epochEndTime-epochStartTime):.5f} sec\n"
  print(epochTimeMsg)

  # Save accuracy and loss in logs
  try:
    writer = SummaryWriter(modelLogsDir)
    writer.add_scalar('Loss/train', trainLoss, epoch)
    writer.add_scalar('Loss/test', testLoss, epoch)
    writer.add_scalar('Accuracy/train', trainAccuracy, epoch)
    writer.add_scalar('Accuracy/test', testAccuracy, epoch)
    writer.close()
  except:
    pass    

  # Save history info
  try: 
    trainHistoryFile = open(trainHistoryFilepath, "a")
    trainHistoryFile.write(epochHeaderMsg)
    trainHistoryFile.write(trainMsg)
    trainHistoryFile.write(testMsg)
    trainHistoryFile.write(epochTimeMsg)
    trainHistoryFile.close()
  except:
    pass 

  # Save model data every few epochs
  if (epoch%5 == 0):
    try:
      modelCheckpointFilepath = os.path.join(modelCheckpointsDir, f"checkpoint_epoch{epoch}.pth")
      torch.save({
                  'epoch': epoch,
                  'model_state_dict': modelInst.state_dict(),
                  'optimizer_state_dict': optimizer.state_dict(),
                  'loss': loss,
                }, modelCheckpointFilepath)
    except:
      print(f"Model could not be saved at epoch {epoch}")


# At end, print total train time, and log it
trainEndTime = timer()
print(f"Total train time: {(trainEndTime-trainStartTime):.5f} sec")

try: 
  trainHistoryFile = open(trainHistoryFilepath, "a")
  trainHistoryFile.write(trainEndTime)
  trainHistoryFile.close()
except:
  pass

try:
  modelFilepath = os.path.join(modelDir, "model.pth")
  torch.save(modelInst.state_dict(), modelFilepath)
except:
  print("Final model could not be saved")

In [None]:
# Plot accuracy history using pyplot
plotFilePath = os.path.join(modelStatsDir, 'plot.png')
plt.title("Training and Validation Accuracy")
plt.plot(trainAccuracies)
plt.plot(testAccuracies)
plt.plot(torch.tensor(trainLosses))
plt.plot(torch.tensor(testLosses))
plt.xlabel('Epoch')
plt.legend(['Train Acc', 'Validation Acc','Train Loss','Validation Loss'], loc='lower right')
plt.ylabel('Accuracy')
plt.savefig(plotFilePath)
plt.show()

In [None]:
######################################################################################################
# Test the model on testing data
loss, accuracy = 0, 0
modelInst.eval()
with torch.inference_mode():
  for xTest, yTest in testDataloader:
    xTest, yTest = xTest.to(DEVICE), yTest.to(DEVICE)
    currentBatchSize = len(xTest)
    # 1 - Forward pass
    testLogits = modelInst(xTest)
    testPredictions = torch.softmax(testLogits, dim=1).argmax(dim=1) 
    # 2 - Compute batch loss, accuracy and accumulate into testLoss, testAccuracy
    loss += loss_function(testLogits, yTest)*currentBatchSize # use logits
    accuracy += accuracy_function(yActual=yTest, yPredicted=testPredictions)*currentBatchSize
  
  # Divide accumulated accuracy/loss by number of individual images, and print
  loss /= VALIDATE_LENGTH
  accuracy /= VALIDATE_LENGTH
  print(f"Test loss: {loss:.5f}, Test accuracy: {accuracy:.5f}%\n")