Imports

In [None]:
# PLEASE CONNECT TO GOOGLE COLAB'S FREE T4 GPU RUNTIME BEFORE YOU RUN THIS CELL OR THE FOLLOWING ONES
# Utils imports
import torch
import pandas as pd
import os
import math
# Dataset imports
import tensorflow as tf
import tensorflow.keras.datasets as data
from tensorflow.keras.datasets import mnist

Layers_Dense Class and Flattener Class(For MLP Layers and 1D Flattening)

In [None]:
class Layers_Dense(torch.nn.Module):
  # torch.nn.Module used in order for convenient parameter access and gradient zeroing.
  def __init__(self,n_inputs,n_neurons):
    super().__init__()
    # We wrap the tensors in a Parameter class so that we can access them through other classes that use Layers_Dense.
    self.weights = torch.nn.Parameter(torch.randn(n_inputs,n_neurons)*0.01,requires_grad = True)
    self.biases = torch.nn.Parameter((torch.zeros(1,n_neurons)),requires_grad=True)
  def forward(self,inputs):
    self.inputs = inputs
    # We're going to use pytorch's autograd to do SGD, so we don't need to save inputs.
    self.outputs = torch.matmul(self.inputs,self.weights) + self.biases
    return self.outputs
  def backwardsAndOptimize(self,learningRate):
    # We allow a learningRate pass from here so that we can still do manual gradient descent, but it will recieve inputs from higher and more abstracted classes
    with torch.no_grad():
      # Pytorch computes gradients automatically with a computational graph that tracks any changes to tensors. This also applies to gradients, so you have to explicitly declare
      # That there will be no gradient tracking for this operation.
      self.weights -= learningRate * self.weights.grad
      self.biases -= learningRate * self.biases.grad
      # How we can access gradients without passing in loss will be explained later.

class Flattener(torch.nn.Module):
  # Exists as a wrapper for the sake of being fed as a part of the CNN
  def __init__(self):
    super().__init__()
  def forward(self,inputs):
    batch_size = inputs.shape[0]
    # Flattens the inputs and returns them
    outputs = inputs.reshape(batch_size,-1)
    return outputs

Max-Pooling Class

In [None]:
# For this code example, we will use Max-Pooling specifically for downsizing images
class Max_Pooling(torch.nn.Module):
  def __init__(self,dimx,dimy):
    super().__init__()
    # We declare the dimensions for the pooling blocks beforehand
    self.dimx = dimx
    self.dimy = dimy
  def forward(self,inputs):
    # We save variables that are part of the input's shape, as we have to operate on the same tensor for gradient tracking.
    # If you do not operate on the same tensor, your gradients will not flow, and you won't be able to do SGD for your kernels.
    # Fortunately, Pytorch has built in methods that are made for pooling and kernelling while allowing gradient flow
    batch_size, color_channels, height, width = inputs.shape
    # Pytorch's tensor unfolding method splits an input tensor into rectangular chunks among certain dims with params (dimension, section length, step_size)
    blocks = inputs.unfold(2,self.dimy,self.dimy).unfold(3,self.dimy,self.dimy)
    blocks = blocks.contiguous().view(batch_size,color_channels,blocks.size(2),blocks.size(3),-1)
    # The following code reduces each patch to it's maximum value, performing the pooling operation. When you call the maxing operation, you also get indices for these maxes returned.
    outputs, _ = blocks.max(dim = -1)
    return outputs

Convolutional Layer Class

In [None]:
class Conv_Layer(torch.nn.Module):
  def __init__(self, input_channels,kernel_size,n_filters):
    super().__init__()
    # Defines the size of the square kernels to be used
    self.kernel_size = kernel_size
    # Defines the number of filters we are using during one convolution
    self.n_filters = n_filters
    # If using greyscaled images, only one channel will be processed, vs 3 in the case of colored RGB data
    self.channels = input_channels
    # Defining layer params
    self.kernels = torch.nn.Parameter(torch.randn(n_filters,input_channels,kernel_size,kernel_size)*0.1,requires_grad = True)
    # In theory you don't need biases, but they can be a useful add on for eking out extra learning
    self.biases = torch.nn.Parameter(torch.zeros(n_filters),requires_grad = True)

  def forward(self,inputs):
    # We save the dims for use
    batch_size,input_channels,height,width = inputs.shape
    # Pytorch's unfold method is designed for kerneling, which we will be making use of here
    # Creates patches that have dims: batch_size, kernel_size * kernel_size * channels, num_patches
    blocks = torch.nn.functional.unfold(inputs,kernel_size = self.kernel_size)
    # Reshapes the kernels so that they can be dotted with the blocks(batch_size, n_filters, kernel_size * kernel_size *channels)
    flattened_kernels = self.kernels.view(self.n_filters,-1)
    # Reshapes the blocks to batch_size, num_patches, kernel_size * kernel_size * channels
    blocks = blocks.transpose(1,2)
    # Multiplies the kernels and the blocks
    outputs = torch.matmul(blocks,flattened_kernels.T)
    outputs = outputs.transpose(1,2)
    # Adds the biases
    outputs = outputs + self.biases.view(1, -1, 1)
    # Reshapes the output to have the correct dims
    out_h = (height - self.kernel_size) + 1
    out_w = (width - self.kernel_size) + 1
    outputs = outputs.view(batch_size, self.n_filters, out_h, out_w)
    # Returns the outputs
    return outputs


  def backwardsAndOptimize(self,learningRate):
    with torch.no_grad():
      self.kernels -= learningRate * self.kernels.grad
      self.biases -= learningRate * self.biases.grad

Convolutional Neural Network (Container Class)

In [None]:
# This class is mostly just a container for all the other layers. You can instantiate it similarly to how you would a CNN from tensorflow.
class Conv_Classifier(torch.nn.Module):
  def __init__(self, layers):
    super().__init__()
    # Saves all of the layers in a ModuleList, allowing easier access to params and grads
    self.layers = torch.nn.ModuleList(layers)
    # Instantiate the CrossEntropyLoss here
    self.loss_fn = torch.nn.CrossEntropyLoss()


  def forward(self,inputs):
    outputs = inputs
    for i in range(0,len(self.layers)):
      # Instantiating with torch.nn.Module makes it so that you don't actually have to call the forward method with .forward(). It is implicitly done.
      outputs = self.layers[i](outputs)
      # We apply relu everywhere but the final layer
      if(isinstance(self.layers[i],(Conv_Layer,Layers_Dense)) and i != len(self.layers)-1):
        outputs = torch.nn.functional.relu(outputs)
    return outputs
  def backwardsAndOptimize(self,learningRate,loss):
    # Note: Before this, we could have gotten rid of every single backwardsAndOptimize method and done gradient descent directly by accessing params.
    # Since this is an illustrative example, it was not done here.
    # Calling loss.backward() computes gradients for every single parameter that contributes to the loss, allowing you to access gradients without passing in loss.
    loss.backward()
    # Gradients are clipped so we don't get absurdly large gradients which would prevent learning
    torch.nn.utils.clip_grad_norm_(self.parameters(), 1.0)
    # Does backpropagation for every single model parameter
    for i in range(0,len(self.layers)):
      if(isinstance(self.layers[i],(Conv_Layer,Layers_Dense))):
        self.layers[i].backwardsAndOptimize(learningRate)
    for param in self.parameters():
      # Because every class that is part of the ModuleList extends torch.nn.Module, we can access their params directly.
      if param.grad is not None:
        param.grad.zero_()
    # Here, we zero the gradients, so that they don't flow between training examples

  def predict(self,x,y,batch_size):
    # Sets up a predict method that can be used on its own for predictions or for training
    # Logits are just the raw output data, while predictions is the predicted class selection(The one that has the highest activation)
    logits = self.forward(x)
    # If the batch size is 1, we need to handle the prediction differently
    if batch_size == 1:
      prediction = torch.argmax(logits)
    else:
      prediction = torch.argmax(logits, dim=1)
    # Calculates categorical cross entropy loss, a specialized kind of loss calculation that punishes underconfident probabilities for the correct class
    loss = self.loss_fn(logits,y)
    return logits, prediction, loss

  def train_step(self,x_train,y_train,batch_size,learningRate,num_epochs):
    self.train()
    # Defining variables that help us change training conditions
    self.learningRate = learningRate
    for epoch in range(0,num_epochs):
      # For this model, we'll be implementing learning rate decay. What this does, is that it decreases the learning rate over time so that the model can
      # gradually settle into parameters that minimize the loss while avoiding overfitting(When your model memorizes the training dataset) and having the model keep making large
      #  jumps between weights.
      if epoch % 10 == 0 and epoch > 0:
        self.learningRate = self.learningRate * 0.98
      # We permute the dataset so that examples are not learned in the same order each time
      permutation = torch.randperm(x_train.shape[0])
      epoch_loss = 0.0
      for i in range(0,x_train.shape[0],batch_size):
        # We select indices for training from the data
        indices = permutation[i:i+batch_size]
        batch_x, batch_y = x_train[indices], y_train[indices]
        # We make predictions on the data
        logits, predictions, loss = self.predict(batch_x,batch_y,batch_size)
        epoch_loss += loss.item()*batch_x.size(0)
        self.backwardsAndOptimize(learningRate,loss)
      # We print avg loss per epoch so that a user can monitor the model's loss over time and make decisions based on it
      print(f"Epoch {epoch + 1}/{num_epochs} - Loss: {epoch_loss / x_train.shape[0]:.4f}")

  def validate(self,x_test,y_test):
    self.eval()
    # This validation function is meant to be used when assessing the model's ability to generate accurate predictions on testing data(data the model has never seen before)
    avg_loss = 0.0
    total_correct = 0.0
    num_samples = x_test.shape[0]
    # We'll store the predictions for later
    selections = []
    # This implementation is left non vectorized for the sake of readability of output predictions
    for i in range(0, x_test.shape[0]):
      logits,predictions,loss = self.predict(x_test[i].unsqueeze(0),y_test[i].unsqueeze(0),1) # Added unsqueeze(0) to make it a batch of 1
      selections.append(predictions)
      avg_loss += loss.item()
      if predictions == y_test[i]:
        total_correct += 1
    # We calculate avg loss and accuracy, useful stats
    avg_loss = avg_loss/num_samples
    accuracy = total_correct/num_samples
    return avg_loss,accuracy

Loading Data

In [None]:
# For this exercise, we will load the MNIST dataset, a dataset filled with greyscaled images that each have numbers inscribed. The model's job will be to classify the numbers accurately.
mnist.load_data(path = "mnist.csv")
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = torch.tensor(x_train.astype('float32') / 255.0, dtype=torch.float32).to("cuda:0")
x_test = torch.tensor(x_test.astype('float32') / 255.0, dtype=torch.float32).to("cuda:0")
y_train = torch.tensor(y_train, dtype=torch.long).to("cuda:0")
y_test = torch.tensor(y_test, dtype=torch.long).to("cuda:0")
x_train = x_train.unsqueeze(1)
x_test = x_test.unsqueeze(1)

Defining the Model

In [None]:
# The model will be defined with it's params here. Feel free to play around with this, but keep in mind that dimension mismatches will cause errors.
model = Conv_Classifier([
    # Images fed are 28 x 28
    Conv_Layer(1,3,32),
    Max_Pooling(2,2),
    # Images have been scaled down to 14 x 14
    Conv_Layer(32,3,64),
    Max_Pooling(2,2),
    # Flattens the data and feeds it to the MLPs
    Conv_Layer(64,3,64),
    Flattener(),
    # Feeds to MLP layers
    Layers_Dense(576,256),
    Layers_Dense(256,10)
]).to("cuda:0")

Training

In [None]:
# Trains with a batch_size of 64, learning rate of 0.05, and 5 epochs
model.train_step(x_train,y_train,64,0.05,5)

Testing

In [None]:
# Now that the model has been fit on the data, we will test it by having it run on x_test and y_test
avg_loss, accuracy = model.validate(x_test,y_test)
print(f"Average Loss: {avg_loss:.4f}")
print(f"Accuracy: {accuracy:.4f}")