# Problem 2

In [None]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torchsummary import summary
import numpy as np

import torchvision
import torchvision.transforms

from torch.utils.data.sampler import SubsetRandomSampler

from_numpy = torch.from_numpy

cuda = torch.cuda.is_available()
if cuda:
    print('cuda is available')
else:
    print('cuda is not available')



## Setting hyper-parameters

In [None]:
## Sets hyper_param
batch_size = 50 # mini_batch size
num_epochs = 10 # number of training epochs    
lr0 = 0.02 # learning rate

store_every = 1000
model_type = 'CNN'
PATH = '/content/models/'

## Loading MNIST data

In [None]:
## Load Dataset and creates loaders
## mnist images are 1x28x28
## label is an int from 0 to 9

data_size = (1,28,28)
mnist_transforms = torchvision.transforms.Compose(
        [torchvision.transforms.ToTensor()])

mnist_train = torchvision.datasets.MNIST(
        root='./data', train=True, 
        transform=mnist_transforms, download=True)
mnist_test = torchvision.datasets.MNIST(
        root='./data', train=False, 
        transform=mnist_transforms, download=True)

# Creating data indices for training and validation splits:
len_data = len(mnist_train)
indices = list(range(len_data))
id_split = int(0.85 * len_data)
np.random.shuffle(indices)
train_indices, valid_indices = indices[:id_split], indices[id_split:]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)

# Train set
train_loader = torch.utils.data.DataLoader(
        mnist_train, batch_size=batch_size, sampler=train_sampler, num_workers=2)

# Validation set
valid_loader = torch.utils.data.DataLoader(
        mnist_train, batch_size=batch_size, sampler=valid_sampler, num_workers=2)

# Test set
test_loader = torch.utils.data.DataLoader(
        mnist_test, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
print("Total train data: ", len(mnist_train), " Total test data: ", len(mnist_test))
print("Training set size: ", len(train_loader)*batch_size, 
      " Validation set size: ", len(valid_loader)*batch_size)

## Building the CNN model

In [None]:
# building model
class ResLinear(nn.Module):

    def __init__(self, in_features, out_features, activation=nn.ReLU()):
        super(ResLinear, self).__init__()
        
        self.in_features = in_features
        self.out_features = out_features
        self.activation = activation
        
        self.linear = nn.Linear(in_features, out_features)
        if in_features != out_features:
            self.project_linear = nn.Linear(in_features, out_features)
        
    def forward(self, x):
        inner = self.activation(self.linear(x))
        if self.in_features != self.out_features:
            skip = self.project_linear(x)
        else:
            skip = x
        return inner + skip


class Flatten(nn.Module):
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return x


if model_type == 'MLP':        
    model = nn.Sequential(
        ResLinear(784, 312),
        nn.ReLU(),
        ResLinear(312, 312),
        nn.ReLU(),
        ResLinear(312, 10)
    )
elif model_type == 'CNN':
    model = nn.Sequential(
        nn.Conv2d(1, 32, 5), #1 input channel, 16 output channel, 5x5 kernel
        nn.ReLU(),
        nn.MaxPool2d(2),
        nn.Conv2d(32, 64, 3),
        nn.ReLU(),
        nn.MaxPool2d(2),
        Flatten(),
        ResLinear(1600, 256),
        nn.ReLU(),
        ResLinear(256, 10)
    )
    
if cuda:
    model = model.cuda()

summary(model,data_size)

## Setting the optimizer

In [None]:
## Setting the optimizer

criterion = nn.CrossEntropyLoss() # to compute the loss
optimizer = optim.SGD(model.parameters(), lr=lr0)
lr_lambda = lambda epoch: 0.1**(epoch/float(num_epochs))
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)



In [None]:
## Defining the evaluation routines
def accuracy(proba, y):
    correct = torch.eq(proba.max(1)[1], y).sum().type(torch.FloatTensor)
    return correct / y.size(0)
    
    
def evaluate(dataset_loader, criterion):
    LOSSES = 0
    COUNTER = 0
    for batch in dataset_loader:
        optimizer.zero_grad()

        x, y = batch
        if model_type == 'MLP':
            x = x.view(-1,784)
            y = y.view(-1)
        elif model_type == 'CNN':
            x = x.view(-1,*data_size)
            y = y.view(-1)
            
        if cuda:
            x = x.cuda()
            y = y.cuda()
            
        loss = criterion(model(x), y)
        n = y.size(0)
        LOSSES += loss.sum().data.cpu().numpy() * n
        COUNTER += n
    
    return LOSSES / float(COUNTER)

## Train the model

In [None]:
## Defines the train function
def train_model():
    
    LOSSES = 0
    COUNTER = 0
    ITERATIONS = 0
    learning_curve_nll_train = list()
    learning_curve_nll_valid = list()
    learning_curve_acc_train = list()
    learning_curve_acc_valid = list()
    best_acc = -np.inf
    for e in range(num_epochs):
        print(f'============= EPOCH {e} ========================')
        scheduler.step()
        for batch in train_loader:
            optimizer.zero_grad()

            x, y = batch
            if model_type == 'MLP':
                x = x.view(-1,784)
                y = y.view(-1)
            elif model_type == 'CNN':
                x = x.view(-1,*data_size)
                y = y.view(-1)
            if cuda:
                x = x.cuda()
                y = y.cuda()
           
            loss = criterion(model(x), y)
            loss.backward()
            optimizer.step()
            
            n = y.size(0)
            LOSSES += loss.sum().data.cpu().numpy() * n
            COUNTER += n
            ITERATIONS += 1
            if ITERATIONS%(store_every/5) == 0:
                avg_loss = LOSSES / float(COUNTER)
                LOSSES = 0
                COUNTER = 0
                print(" Iteration {}: TRAIN {}".format(
                    ITERATIONS, avg_loss))   
                
        train_loss = evaluate(train_loader, criterion)
        learning_curve_nll_train.append(train_loss)
        valid_loss = evaluate(valid_loader, criterion)
        learning_curve_nll_valid.append(valid_loss)

        train_acc = evaluate(train_loader, accuracy)
        learning_curve_acc_train.append(train_acc)
        valid_acc = evaluate(valid_loader, accuracy)
        learning_curve_acc_valid.append(valid_acc)
        if round(valid_acc,3) > best_acc:
            best_acc = round(valid_acc,3)
            torch.save(model.state_dict(), 
                       f'{PATH}model_acc_{best_acc}.pth')
            print('saved model')

        print(" [NLL] TRAIN {} / VALIDATION {}".format(
            train_loss, valid_loss))
        print(" [ACC] TRAIN {} / VALIDATION {}".format(
            train_acc, valid_acc))
        
        
    return learning_curve_nll_train, \
           learning_curve_nll_valid, \
           learning_curve_acc_train, \
           learning_curve_acc_valid, 

In [None]:
nll_train, nll_valid, acc_train, acc_valid = train_model()

## Evaluate the model accuracy on test set

In [None]:
# Load the best model

model = nn.Sequential()
model.load_state_dict(torch.load(
    f'{PATH}model_acc_0.985.pth'))
model.eval()

In [None]:
# Evaluate model

test_loss = evaluate(test_loader, criterion)
test_acc = evaluate(test_loader, accuracy)

print("Model evaluation ===================")
print("Test accuracy: ", str(test_acc))
print("Test loss: ", str(test_loss))

## Plot test vs validation loss and accuracy

In [None]:
import matplotlib.pyplot as plt

plt.plot((1-np.array(acc_train))*100, label='train')
plt.plot((1-np.array(acc_valid))*100, label='validation')
plt.legend(bbox_to_anchor=(1, 1), loc=2)
plt.title('Train and validation errors')
plt.xlabel('Epoch')
plt.ylabel('Error (%)')

plt.tight_layout()