In [1]:
import torch
import torch.nn as nn

class CnnClassifier(nn.Module):
  # n_hidden: number of units at the last fc layer
  def __init__(self, n_hidden, cnn_layers=None):
    super(CnnClassifier, self).__init__()
    
    # in_data size: (batch_size, 1, 28, 28)
    self.cnn_layers = cnn_layers if cnn_layers is not None else nn.Sequential(
      # conv1_out size: (batch_size, 4, 26, 26)
        nn.Conv2d(in_channels=1, out_channels=4, kernel_size=(3, 3), stride=(1, 1), padding=0),
        nn.ReLU(),
      # conv2_out size: (batch_size, 8, 12, 12)
        nn.Conv2d(in_channels=4, out_channels=8, kernel_size=(3, 3), stride=(2, 2), padding=0),
        nn.ReLU(),
      # conv3_out size: (batch_size, 16, 5, 5)
        nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3, 3), stride=(2, 2), padding=0),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=1, padding=0, dilation=1, return_indices=False, ceil_mode=False),
      # conv4_out size: (batch_size, 32, 1, 1)
        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), stride=(1, 1), padding=0),
        nn.Sigmoid()
    )
    
    # linear layers transforms flattened image features into logits before the softmax layer
    self.linear = nn.Sequential(
      nn.Linear(32, n_hidden),
      nn.ReLU(),
      nn.Linear(n_hidden, 47) # there are 10 classes
    )
    
    self.softmax = nn.Softmax(dim=1)
    self.loss_fn = nn.CrossEntropyLoss(reduction='sum') # will be divided by batch size
    
  def forward(self, in_data):
    img_features = self.cnn_layers(in_data).view(in_data.size(0), 32)
    logits = self.linear(img_features)
    return logits
  
  def loss(self, logits, labels): 
    return self.loss_fn(logits, labels) / logits.size(0)
  
  def top1_accuracy(self, logits, labels):
    # get argmax of logits along dim=1 (this is equivalent to argmax of predicted probabilites)
    predicted_labels = torch.argmax(logits, dim=1, keepdim=False) # size (batch_size,)
    n_corrects = predicted_labels.eq(labels).sum(0) # sum up all the correct predictions
    return n_corrects / logits.size(0) * 100 # in percentage
  
  def topk_accuracy(self, k, logits, labels):
    predicted_labels = torch.topk(logits, k, dim=1)[1]
    n_corrects = torch.tensor([0])
    for i in range(k):
        n_corrects += predicted_labels[:, i].eq(labels).sum(0)
    return n_corrects.double() / logits.size(0) * 100 # in percentage

In [2]:
from torch.utils.data import Dataset

class LocalEMNIST(Dataset):
  '''
  ds: emnist dataset provided in project files
  flatten: bool, flatten it or not
  '''
  def __init__(self, ds, flatten):
    self.flatten = flatten
    self.ds = ds
    
  def __len__(self):
    return len(self.ds)
  
  def __getitem__(self, index):
    image, label = self.ds[index]
    image = image.view(-1) if self.flatten else image
    return image, label

In [3]:
from torchvision.datasets import EMNIST
from torch.utils.data import ConcatDataset, Subset
from torchvision.transforms import ToTensor, Compose
import numpy as np
    
def show_image(dataset, index):
  import matplotlib.pyplot as plt
  plt.imshow(dataset[index][0][0], cmap=plt.get_cmap('gray'))

def get_datasets(split='balanced', save=False):
  download_folder = './data'
  
  transform = Compose([ToTensor()])

  dataset = ConcatDataset([EMNIST(root=download_folder, split=split, download=True, train=False, transform=transform),
                           EMNIST(root=download_folder, split=split, download=True, train=True, transform=transform)])
    
  # Ignore the code below with argument 'save'
  if save:
    random_seed = 4211 # do not change
    n_samples = len(dataset)
    eval_size = 0.2
    indices = list(range(n_samples))
    split = int(np.floor(eval_size * n_samples))

    np.random.seed(random_seed)
    np.random.shuffle(indices)

    train_indices, eval_indices = indices[split:], indices[:split]

    # cut to half
    train_indices = train_indices[:len(train_indices)//2]
    eval_indices = eval_indices[:len(eval_indices)//2]

    np.savez('train_test_split.npz', train=train_indices, test=eval_indices)
  
  # just use save=False for students
  # load train test split indices
  else:
    with np.load('./train_test_split.npz') as f:
      train_indices = f['train']
      eval_indices = f['test']

  train_dataset = Subset(dataset, indices=train_indices)
  eval_dataset = Subset(dataset, indices=eval_indices)
  
  return train_dataset, eval_dataset

In [4]:
from torchvision.utils import make_grid
import torch

def train(model, loaders, optimizer, writer, n_epochs, ckpt_path, device='cpu'):
  def run_epoch(train_or_eval):
    epoch_loss = 0.
    epoch_acc = 0.
    epoch_acc_top3 = 0.
    for i, batch in enumerate(loaders[train_or_eval], 1):
      in_data, labels = batch
      in_data, labels = in_data.to(device), labels.to(device)
      if train_or_eval == 'train':
        optimizer.zero_grad()
        
      logits = model(in_data)
      batch_loss = model.loss(logits, labels)
      batch_acc = model.topk_accuracy(1, logits, labels)
      batch_acc_top3 = model.topk_accuracy(3, logits, labels)
      
      epoch_loss += batch_loss.item()
      epoch_acc += batch_acc.item()
      epoch_acc_top3 += batch_acc_top3.item()
      
      if train_or_eval == 'train':
        batch_loss.backward()
        optimizer.step()
        
    epoch_loss /= i
    epoch_acc /= i
    epoch_acc_top3 /= i
    
    losses[train_or_eval] = epoch_loss
    accs_top1[train_or_eval] = epoch_acc
    accs_top3[train_or_eval] = epoch_acc_top3
    
    if writer is None:
      print('epoch %d %s loss %.4f acc %.4f acc_top3 %.4f' % (epoch, train_or_eval, epoch_loss, epoch_acc, epoch_acc_top3))
    elif train_or_eval == 'eval':
      name = "lr-" + str(optimizer.param_groups[0]["lr"]) + "-mom-" + optimizer.param_groups[0]["momentum"]
      writer.add_scalars('%s_loss' % name,
                         tag_scalar_dict={'train': losses['train'], 
                                          'eval': losses['eval']}, 
                         global_step=epoch)
      
      writer.add_scalars('%s_top1_accuracy' % name,
                         tag_scalar_dict={'train': accs_top1['train'], 
                                          'eval': accs_top1['eval']}, 
                         global_step=epoch)
    
      writer.add_scalars('%s_top3_accuracy' % name,
                         tag_scalar_dict={'train': accs_top3['train'], 
                                          'eval': accs_top3['eval']}, 
                         global_step=epoch)
      
      # For instructional purpose, add images here, just the last in_data
      if epoch % 10 == 0:
        if len(in_data.size()) == 2: # when it is flattened, reshape it
          in_data = in_data.view(-1, 1, 28, 28)
          
        img_grid = make_grid(in_data.to('cpu'))
        writer.add_image('%s/eval_input' % model.__class__.__name__, img_grid, epoch)
        
  # main statements
  losses = dict()
  accs_top1 = dict()
  accs_top3 = dict()
    
  for epoch in range(1, n_epochs+1):
    run_epoch('train')
    run_epoch('eval')
    
    # For instructional purpose, show how to save checkpoints
    if ckpt_path is not None:
      torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'losses': losses,
        'accs': accs_top1
      }, '%s/%d.pt' % (ckpt_path, epoch))

In [5]:
train_ds, eval_ds = get_datasets()

In [None]:
from torch.utils.data import DataLoader
from torch.utils.data import SubsetRandomSampler
from torch.optim import Adam, SGD
from tensorboardX import SummaryWriter

lr = 0.001
n_hidden = 64
ckpt_path = None
n_epochs = 100
batch_size = 128
gpu = 0

if gpu == -1:
    DEVICE = 'cpu'
elif torch.cuda.is_available():
    DEVICE = gpu
    
model = CnnClassifier(n_hidden).to(DEVICE)
opt_class = SGD
optimizer = opt_class(model.parameters(), lr=lr, momentum=0.9)

dataloaders = {
    'train': DataLoader(LocalEMNIST(train_ds, flatten=False), batch_size=batch_size, drop_last=False, shuffle=True),
    'eval': DataLoader(LocalEMNIST(eval_ds, flatten=False), batch_size=batch_size, drop_last=False)
}

writer = SummaryWriter('./logs/cnn/%s' % opt_class.__name__)

In [None]:
train(model, dataloaders, optimizer, writer, n_epochs, ckpt_path, DEVICE)

In [45]:
pretrained_model = CnnClassifier(n_hidden, cnn_layers=torch.load("pretrained_encoder.pt", map_location='cuda:0')['model'])
pretrained_model.cuda()
train(pretrained_model, dataloaders, optimizer, writer, n_epochs, ckpt_path, DEVICE)


KeyboardInterrupt: 

In [None]:
model.parameters()