In [0]:
from google.colab import drive

drive.mount('/gdrive')
gdrive_root = '/gdrive/My Drive'

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [0]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import Adam, lr_scheduler
from torchvision import transforms, datasets
import numpy as np
from matplotlib import pyplot as plt
import math
from PIL import Image

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"device setting: {device}")

device setting: cpu


In [0]:
# training & optimization hyper-parameters
max_epoch = 20
learning_rate = 0.001

# model hyper-parameters
output_dim = 10 

# Boolean value to select training process
training_process = True

In [0]:
def load_mnist(path='/gdrive/my_data', batch_size=100, shift_pixels=2):
    """
    Construct dataloaders for training and test data. Data augmentation is also done here.
    :param path: file path of the dataset
    :param download: whether to download the original data
    :param batch_size: batch size
    :param shift_pixels: maximum number of pixels to shift in each direction
    :return: train_loader, test_loader
    """
    kwargs = {'num_workers': 1, 'pin_memory': True} 
    try:
      train_loader = torch.utils.data.DataLoader(
          datasets.MNIST(path, train=True, transform=transforms.Compose([transforms.RandomCrop(size=28, padding=shift_pixels), transforms.ToTensor()])), batch_size=batch_size, shuffle=True, **kwargs)
      test_loader = torch.utils.data.DataLoader(
          datasets.MNIST(path, train=False, transform=transforms.ToTensor()), batch_size=batch_size, shuffle=True, **kwargs)
    except:
      train_loader = torch.utils.data.DataLoader(
          datasets.MNIST(path, train=True,download=True, transform=transforms.Compose([transforms.RandomCrop(size=28, padding=shift_pixels), transforms.ToTensor()])), batch_size=batch_size, shuffle=True, **kwargs)
      test_loader = torch.utils.data.DataLoader(
          datasets.MNIST(path, train=False,download=True, transform=transforms.ToTensor()), batch_size=batch_size, shuffle=True, **kwargs)

    return train_loader, test_loader

In [0]:
class MyClassifier(nn.Module):
  def __init__(self):
        super(MyClassifier, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=128, kernel_size=5)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=5)
        self.conv3 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=5)
        self.fc1 = nn.Linear(in_features=256 * 4 * 4, out_features=328)
        self.fc2 = nn.Linear(in_features=328, out_features=192)
        self.fc3 = nn.Linear(in_features=192, out_features=output_dim)

  def forward(self, x):
      x = self.pool(self.relu((self.conv1(x))))
      x = self.relu(self.conv2(x))
      x = self.relu((self.conv3(x)))
      x = x.view(-1, 256 * 4 * 4)
      x = self.relu(self.fc1(x))
      x = self.relu(self.fc2(x))
      outputs = self.fc3(x)
      return outputs

In [0]:
my_classifier = MyClassifier()
my_classifier = my_classifier.to(device)

# Print your neural network structure
print(my_classifier)

optimizer = optim.Adam(my_classifier.parameters(), lr=learning_rate)

MyClassifier(
  (conv1): Conv2d(1, 128, kernel_size=(5, 5), stride=(1, 1))
  (relu): ReLU()
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(128, 256, kernel_size=(5, 5), stride=(1, 1))
  (conv3): Conv2d(256, 256, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=4096, out_features=328, bias=True)
  (fc2): Linear(in_features=328, out_features=192, bias=True)
  (fc3): Linear(in_features=192, out_features=10, bias=True)
)


In [0]:
ckpt_dir = os.path.join(gdrive_root, 'checkpoints')
if not os.path.exists(ckpt_dir):
  os.makedirs(ckpt_dir)
  
best_acc = 0.
ckpt_path = os.path.join(ckpt_dir, 'lastest.pt')
if os.path.exists(ckpt_path):
  ckpt = torch.load(ckpt_path)
  try:
    my_classifier.load_state_dict(ckpt['my_classifier'])
    optimizer.load_state_dict(ckpt['optimizer'])
    best_acc = ckpt['best_acc']
  except RuntimeError as e:
      print('wrong checkpoint')
  else:    
    print('checkpoint is loaded !')
    print('current best accuracy : %.2f' % best_acc)

In [0]:
if training_process:
  it = 0
  train_losses = []
  test_losses = []
  for epoch in range(max_epoch):
    # train phase
    my_classifier.train()
    
    train_loader, test_loader = load_mnist(path=gdrive_root+'/my_data', batch_size=100)
    for inputs, labels in train_loader:
      it += 1

      # load data to the GPU.
      inputs = inputs.to(device)
      labels = labels.to(device)

      # feed data into the network and get outputs.
      logits = my_classifier(inputs)

      # calculate loss
      # Note: `F.cross_entropy` function receives logits, or pre-softmax outputs, rather than final probability scores.
      loss = F.cross_entropy(logits, labels)

      # Note: You should flush out gradients computed at the previous step before computing gradients at the current step. 
      #       Otherwise, gradients will accumulate.
      optimizer.zero_grad()

      # backprogate loss.
      loss.backward()

      # update the weights in the network.
      optimizer.step()

      # calculate accuracy.
      acc = (logits.argmax(dim=1) == labels).float().mean()

      if it % 2000 == 0:
        print('[epoch:{}, iteration:{}] train loss : {:.4f} train accuracy : {:.4f}'.format(epoch, it, loss.item(), acc.item()))

    # save losses in a list so that we can visualize them later.
    train_losses.append(loss)  

    # test phase
    n = 0.
    test_loss = 0.
    test_acc = 0.
    my_classifier.eval()
    for test_inputs, test_labels in test_loader:
      test_inputs = test_inputs.to(device)
      test_labels = test_labels.to(device)

      logits = my_classifier(test_inputs)
      test_loss += F.cross_entropy(logits, test_labels, reduction='sum').item()
      test_acc += (logits.argmax(dim=1) == test_labels).float().sum().item()
      n += test_inputs.size(0)

    test_loss /= n
    test_acc /= n
    test_losses.append(test_loss)
    
    print('[epoch:{}, iteration:{}] test_loss : {:.4f} test accuracy : {:.4f}'.format(epoch, it, test_loss, test_acc)) 

    # save checkpoint whenever there is improvement in performance
    if test_acc > best_acc:
      best_acc = test_acc
      # Note: optimizer also has states ! don't forget to save them as well.
      ckpt = {'my_classifier':my_classifier.state_dict(),
              'optimizer':optimizer.state_dict(),
              'best_acc':best_acc}
      torch.save(ckpt, ckpt_path)
      print('checkpoint is saved !')


[epoch:0, iteration:600] test_loss : 0.0319 test accuracy : 0.9908
checkpoint is saved !
[epoch:1, iteration:1200] test_loss : 0.0220 test accuracy : 0.9931
checkpoint is saved !
[epoch:2, iteration:1800] test_loss : 0.0231 test accuracy : 0.9925
[epoch:3, iteration:2000] train loss : 0.0056 train accuracy : 1.0000
[epoch:3, iteration:2400] test_loss : 0.0300 test accuracy : 0.9925
[epoch:4, iteration:3000] test_loss : 0.0256 test accuracy : 0.9923
[epoch:5, iteration:3600] test_loss : 0.0245 test accuracy : 0.9930
[epoch:6, iteration:4000] train loss : 0.0415 train accuracy : 0.9900
[epoch:6, iteration:4200] test_loss : 0.0245 test accuracy : 0.9930
[epoch:7, iteration:4800] test_loss : 0.0255 test accuracy : 0.9936
checkpoint is saved !
[epoch:8, iteration:5400] test_loss : 0.0273 test accuracy : 0.9919
[epoch:9, iteration:6000] train loss : 0.0029 train accuracy : 1.0000
[epoch:9, iteration:6000] test_loss : 0.0221 test accuracy : 0.9934
[epoch:10, iteration:6600] test_loss : 0.0199