# EVA 7 - Assignment 2.5 - Python 101 + Pytorch 101

## Create a NN to add random number to number detected from MNIST image

### Import Dependencies

In [None]:
!pip install torchsummary

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, TensorDataset
from torch.utils.data import DataLoader
from torchsummary import summary

import matplotlib.pyplot as plt
import numpy as np

### Check for CUDA - Always use GPU if available

In [None]:
# check if cuda is available
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

### Import MNIST Dataset, Add Random numbers & Sums to create Test and Train Datasets

In [None]:
torch.manual_seed(1) ## Define seed so generation of random numbers remains same across multiple runs
batch_size = 128 ## Define batch size
kwargs = {'num_workers': 2, 'pin_memory': True} if use_cuda else {}


class RandomMNISTDataset(Dataset):
    def __init__(self, MNISTDataset):
        self.MNISTDataset = MNISTDataset
        
    def __getitem__(self, index):
        image = self.MNISTDataset[index][0]
        label = self.MNISTDataset[index][1]
        randNum = torch.randint(0,9, (1,1))
        randNum_oneHot = F.one_hot(randNum, num_classes=10).type(torch.float32)
        sum = label + randNum
        return image, label, randNum_oneHot, sum

    def __len__(self):
        return len(self.MNISTDataset)

mnist_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
MNIST_trainset = torchvision.datasets.MNIST('/tmp', train=True, download=True, transform=mnist_transform)
MNIST_testset = torchvision.datasets.MNIST('/tmp', train=False, download=True, transform=mnist_transform)

train_dataset = RandomMNISTDataset(MNIST_trainset)
test_dataset = RandomMNISTDataset(MNIST_testset)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

#### Batch Shape

In [None]:
batch = next(iter(train_loader))

images, labels, randNum, sums = batch

images.shape, labels.shape

In [None]:
randNum[0], sums[0]

In [None]:
sums.squeeze()

#### Visualize Batch

In [None]:
grid = torchvision.utils.make_grid(images[:30], nrow=10)
plt.figure(figsize=(15,15))
plt.imshow(np.transpose(grid, (1,2,0)))
print('labels:', labels[:30])

### Define Neural Network

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5) 
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=5)
        
        self.fc_rand1 = nn.Linear(in_features = 10, out_features = 20)
        self.fc1 = nn.Linear(in_features=192+20, out_features=100)
        self.fc2 = nn.Linear(in_features=100, out_features=60)
        self.out = nn.Linear(in_features=60, out_features=29)
    
    def forward(self, image, randNum):
        ## Input Layer
        x = image
        
        ## Conv layer 1
        x = self.conv1(x) ## Input image 28x28x1, Output 24x24x6
        x = F.relu(x)
        x = F.max_pool2d(x,kernel_size=2,stride=2) ## Input 24x24x6, Output 12x12x6
        
        ## Conv layer 2
        x = self.conv2(x) ## Input 12x12x6, Output 8x8x12
        x = F.relu(x)
        x = F.max_pool2d(x,kernel_size=2,stride=2) ## Input 8x8x12, Output 4x4x12
        
        ## Reshape
        x = x.reshape(-1, 12*4*4)
        
        ## Process random number
        y = randNum.type(torch.float32)
        ## Pass one hot encoded random number through fully connected layer 10>20 neurons
        y = self.fc_rand1(y)
        y = F.relu(y)
        y = y.reshape(-1, 20)
        
        ## Concatenate MNIST convolution output with Random Number fc output
        x1 = torch.cat((x, y), dim = 1)
        
        ## Fully connected layers
        x1 = self.fc1(x1)
        x1 = F.relu(x1)
        
        x1 = self.fc2(x1)
        x1 = F.relu(x1)
        x1 = self.out(x1)
        
        #print(x1.shape)
        mnist_output = F.softmax(x1[:,0:10], dim = 0)
        sum_output = F.softmax(x1[:,10:], dim = 0)
        
        return mnist_output, sum_output

#### Model Summary

In [None]:
model = Net().to(device)
summary(model, [(1,28, 28),(1,1,10)])
print(model)

In [None]:
test = torch.randint(0,9,(1,1))
print(test[0,0:10])
test1 = F.one_hot(test, num_classes =10)
print(test1)

In [None]:
images, labels, randNum, sums = images.to(device), labels.to(device), randNum.to(device), sums.to(device)
mnist_pred, sum_pred = model(images, randNum)
mnist_pred

In [None]:
mnist_pred.shape, sum_pred.shape ## Shapes of outputs are defined by batch size

In [None]:
mnist_pred[0].sum() ## Each row must sum to 1, considering softmax function has been applied to equate values to probabilities

In [None]:
correct pred

### Define Train & Test Functions

In [None]:
from tqdm import tqdm
torch.set_grad_enabled(True)

In [None]:
## Define training function
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    pbar = tqdm(train_loader)
    for batch_idx, (images, labels, randNum, sums) in enumerate(pbar):
        images, labels, randNum, sums = images.to(device), labels.to(device), randNum.to(device), sums.to(device)
        ## Zero out all gradients to prevent accumulation
        optimizer.zero_grad()
        ## Forward pass
        mnist_output, sum_output = model(images, randNum)
        
        ## Calculate loss
        MNIST_loss = F.cross_entropy(mnist_output, labels)
        sum_loss = F.cross_entropy(sum_output, sums.squeeze())
        loss = (MNIST_loss + sum_loss)
        
        ## Backpropagation
        loss.backward()
        optimizer.step()
        pbar.set_description(desc= f'loss={loss.item()} batch_id={batch_idx}')

In [None]:
def get_num_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()

## Define testing function
def test(model, device, test_loader):
    model.eval()
    test_loss, mnist_test_loss, sums_test_loss  = 0, 0, 0
    correct_mnist, correct_sums = 0, 0
    
    with torch.no_grad():
        for images, labels, randNum, sums in test_loader:
            
            images, labels, randNum, sums = images.to(device), labels.to(device), randNum.to(device), sums.to(device)
            ## Forward pass
            mnist_output, sum_output = model(images, randNum)
            
            ## Calculate loss
            mnist_test_loss += F.cross_entropy(mnist_output, labels, reduction='sum').item()  
            sums_test_loss += F.cross_entropy(sum_output, sums.squeeze(), reduction='sum').item()
            
            correct_mnist = get_num_correct(mnist_output, labels)

            correct_sums = get_num_correct(sum_output, sums.squeeze())

    test_loss = mnist_test_loss + sums_test_loss
    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average MNIST loss: {:.4f}, MNIST_Accuracy: {}/{} ({:.0f}%)\n'.format(
        mnist_test_loss, correct_mnist, len(test_loader.dataset),
        100. * correct_mnist / len(test_loader.dataset)))
    
    print('\nTest set: Average SUM loss: {:.4f}, SUM_Accuracy: {}/{} ({:.0f}%)\n'.format(
        sums_test_loss, correct_sums, len(test_loader.dataset),
        100. * correct_sums / len(test_loader.dataset)))

### Train & Test Network

In [None]:
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

epochs = 30
for epoch in range(1, epochs):
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)
    
'''
Neural network is unable to learn because of random number FC output being added to MNIST convolution ouput and then being
passed through a series of fully connected layers. 
Lack of a discernible pattern with the random numbers generated leads to the sum output being random as well; with the neural
network having no specific pattern/characteristics/properties to learn and exploit to create a reliable prediction.
'''