In [None]:
import numpy as np
import time
import os
from pathlib import Path
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

import torch
import torchvision
from torch import nn, optim
from torchsummary import summary
import torch.utils.data as data_utils


In [None]:
projectDevice = "cuda" if torch.cuda.is_available() else "cpu"
batchSize = 32
numEpochs = 10
learningRate = 0.01
inputSize = (1,28,28)

In [None]:
def process_x(x):
  x = x.type(torch.float32)

  # Normalize the data between [-1.0, 1.0]
  x /= 127.5
  x -= 1.0
  x = torch.unsqueeze(x, dim=1) # [NxHxW] -> [CxHxW] for PyTorch
  return x

def load_data(ds=torchvision.datasets.MNIST):
  training, test = ds('./data/mnist', download=True), ds('./data/mnist', download=True, train=False)
  train_images, train_labels = process_x(training.data), training.targets
  test_images, test_labels = process_x(test.data), test.targets

  return (train_images, train_labels), (test_images, test_labels)

In [None]:
(train_x, train_y), (test_x, test_y) = load_data()

In [None]:
class CNN(nn.Module):
    def __init__(self, dropout_rate=0.0):
        super(CNN, self).__init__()

        self.conv_layer1 = nn.Conv2d(in_channels=1, out_channels=32,
                            kernel_size=3)
        self.conv_layer2 = nn.Conv2d(32, 64, 3)
        self.fc = nn.Linear(in_features= 64*5*5, out_features=10)

        self.batch_norm1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2,stride=2)

        self.batch_norm2 = nn.BatchNorm2d(64)

        self.flat = nn.Flatten()
        self.drop = nn.Dropout(p=dropout_rate)

    def forward(self, x):
        # x.shape = BxCxHxW

        # Block 1
        out = self.conv_layer1(x)
        out = self.batch_norm1(out)
        out = self.relu(out)
        out = self.pool(out)

        # Block 2
        out = self.conv_layer2(out)
        out = self.batch_norm2(out)
        out = self.relu(out)
        out = self.pool(out)

        out = self.flat(out)
        out = self.drop(out)
        out = self.fc(out)

        return out


In [None]:
def train_model(model, epochs, batchSize, device=projectDevice):
  model_history = {"loss":[], "sparse_categorical_accuracy": [],
                   "val_loss":[], "val_sparse_categorical_accuracy":[]}

  model.to(device)
  summary(model, input_size=inputSize, device=device)

  optimizer = optim.Adam(model.parameters(), lr=learningRate)
  criterion = nn.CrossEntropyLoss()

  train = data_utils.TensorDataset(train_x, train_y)
  train_loader = data_utils.DataLoader(train, batch_size=batchSize, shuffle=True)

  val = data_utils.TensorDataset(test_x, test_y)
  val_loader = data_utils.DataLoader(val, batch_size=batchSize, shuffle=False)

  for epoch in range(epochs):

    start = time.time()
    print(f'Epoch {epoch+1}')
    train_loss = 0.0
    train_correct = 0.0

    val_loss = 0.0
    val_correct = 0.0

    model.train()

    for data in tqdm(train_loader):

        x, y = data
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()

        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        train_loss += outputs.shape[0] * loss.item()
        train_correct += (outputs.argmax(1) == y).detach().cpu().sum()

    train_loss = train_loss/float(len(train_x))
    train_correct = train_correct/float(len(train_x))

    model_history["loss"].append(train_loss)
    model_history["sparse_categorical_accuracy"].append(train_correct)

    model.eval()
    with torch.no_grad():

      for data in val_loader:
          x, y = data
          x, y = x.to(device), y.to(device)

          outputs = model(x)
          loss = criterion(outputs, y)

          val_loss += outputs.shape[0] * loss.item()
          val_correct += (outputs.argmax(1) == y).detach().cpu().sum()

    val_loss = val_loss/float(len(test_x))
    val_correct = val_correct/float(len(test_x))

    model_history["val_loss"].append(val_loss)
    model_history["val_sparse_categorical_accuracy"].append(val_correct)

    end = time.time()

    print(f"Time: {(end-start):.2f} s  - ",
          f"Loss: {train_loss:.4f}  -  Categorical_Accuracy: {train_correct:.4f}  - ",
          f"Val_Loss: {val_loss:.4f}  -  Categorical_Val_Accuracy: {val_correct:.4f}")

  print('Finished Training')

  return model, model_history

In [None]:
modelCNN = CNN()
modelCNN, modelCNNHistory = train_model(modelCNN, numEpochs, batchSize, projectDevice)

In [None]:
plt.plot(
    modelCNNHistory['sparse_categorical_accuracy'],
    linestyle='dashed')
plt.plot(
    modelCNNHistory['val_sparse_categorical_accuracy'],
    linestyle='dotted')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train Accuracy', 'Test Accuracy'], loc='lower right')
plt.show()

In [None]:
def modelEval(model, test_images, test_labels, device=projectDevice):

  num_correct = 0
  num_total = 0

  model.to(device)
  model.eval()

  average_time = 0

  pbar = tqdm(range(len(test_images)), total=len(test_images), bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}')  # progress bar

  with torch.no_grad():
    for idx in pbar:
      start = time.time()
      num_total = num_total + 1

      input = test_images[idx:idx+1].to(device)

      output = model(input)

      if output.argmax(1) == test_labels[idx]:
        num_correct = num_correct + 1
      end = time.time()

      average_time += end-start

      pbar.set_description(f'Test Image {idx+1} Eval time: {end-start:.10f}s', refresh=False)
  print('Average time per image:', average_time/len(test_images))
  print('Accuracy:', num_correct * 1.0 / num_total)

In [None]:
start = time.time()
modelEval(modelCNN, test_x, test_y, projectDevice)
end = time.time()
print(f'Eval time: {end-start:.2f}s')