In [1]:
# load required libraries
import torch
from torch.utils.data import Dataset
from torch.nn import functional as F
import torch.nn as nn
from torch.autograd import Variable
import torchvision
from sklearn.metrics import accuracy_score

In [2]:
# download mnist dataset
train = torchvision.datasets.MNIST(root = "./mnist_train", download = True, train = True)
test = torchvision.datasets.MNIST(root = "./mnist_test", download = True, train = False)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./mnist_train/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting ./mnist_train/MNIST/raw/train-images-idx3-ubyte.gz to ./mnist_train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./mnist_train/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting ./mnist_train/MNIST/raw/train-labels-idx1-ubyte.gz to ./mnist_train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./mnist_train/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting ./mnist_train/MNIST/raw/t10k-images-idx3-ubyte.gz to ./mnist_train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./mnist_train/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting ./mnist_train/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./mnist_train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./mnist_test/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting ./mnist_test/MNIST/raw/train-images-idx3-ubyte.gz to ./mnist_test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./mnist_test/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting ./mnist_test/MNIST/raw/train-labels-idx1-ubyte.gz to ./mnist_test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./mnist_test/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting ./mnist_test/MNIST/raw/t10k-images-idx3-ubyte.gz to ./mnist_test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./mnist_test/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting ./mnist_test/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./mnist_test/MNIST/raw



In [3]:
# change pil image to pytorch tensor and normalize it
transform = torchvision.transforms.Compose([
                        torchvision.transforms.ToTensor(),
                        torchvision.transforms.Normalize(
                        (0.1307,), (0.3081,))])

In [4]:
# custom data set - added random number & sum
class CustomDataset(Dataset):
    def __init__(self, data, transform = None):
        self.data = data  # complete dataset
        self.transform = transform  # function to transform & normalize image

    def __len__(self):
        return len(self.data)  # size of dataset

    def __getitem__(self, idx):
        img = self.data[idx][0]  # extract image of id = idx from the complete dataset
        label = torch.tensor([self.data[idx][1]])  # extract label of id = idx from the complete dataset
        rand_num = torch.randint(low = 0, high = 10, size = (1, ))  # generate a random number from 0 to 9
        sum = label + rand_num  # sum of label and random number

        label = F.one_hot(label, num_classes = 10)[0]  # one hot encoding of label
        rand_num = F.one_hot(rand_num, num_classes = 10)[0]  #one hot encoding of random number
        sum = F.one_hot(sum, num_classes = 19)[0]  # one hot encoding of sum

        if self.transform is not None:
            img = self.transform(img)  # apply transformation if transformation function is passed to data loader

        return img, label, rand_num, sum

In [5]:
# create train & test data
train_data = CustomDataset(train, transform = transform)
test_data = CustomDataset(test, transform = transform)

In [6]:
# data loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size = 16, shuffle = True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size = 16, shuffle = False)

In [7]:
# check one batch from train data
dataiter = iter(train_loader)
images, labels, rand_num, sum = next(dataiter)

print(labels)
print(rand_num)
print(sum)

tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],

In [8]:
# construct cnn class
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 8, kernel_size = 3)  # apply 8 channel convolution with kernel size 3*3 on image
        self.conv2 = nn.Conv2d(8, 16, kernel_size = 3)  # apply 16 channel convolution with kernel size 3*3 on image
        self.conv3 = nn.Conv2d(16, 32, kernel_size = 3)  # apply 32 channel convolution with kernel size 3*3 on image
        self.fc1 = nn.Linear(5 * 5 * 32 + 10, 256)  # dense layer on flattened convolution embedding 

        self.fc2 = nn.Linear(256, 64)  # dense layer
        self.fc3 = nn.Linear(64, 10)  # output layer for label
        self.fc4 = nn.Linear(64, 19)  # output label for sum

    def forward(self, img, rand_num):
        x = F.relu(self.conv1(img))  #input = 1 * 28 * 28, output = 8 * 26 * 26
        x = F.relu(self.conv2(x))  #input = 8 * 26 * 26, output = 16 * 24 * 24
        x = F.max_pool2d(x, 2)  #input = 16 * 24 * 24, output = 32 * 12 * 12

        x = F.relu(self.conv3(x))  #input = 32 * 12 * 12, output = 32 * 10 * 10
        x = F.max_pool2d(x, 2)  #input = 32 * 10 * 10, output = 32 * 5 * 5

        x = x.view(-1, 5 * 5 * 32)  # flatten cnn embedding
        
        # random number input is concatenated with embeddings from last convolution layer after max pooling
        x = torch.cat((x, rand_num), 1)  # combine flattened cnn embedding & random number input

        x = F.relu(self.fc1(x))  # 1st dense layer
        x = F.relu(self.fc2(x))  # 2nd dense layer

        img_out = F.log_softmax(self.fc3(x), dim = 1)  # output layer to output probabilities for label
        sum_out = F.log_softmax(self.fc4(x), dim = 1)  # output layer to output probabilities for sum

        return img_out, sum_out

In [9]:
# initilize the model
net = Net()
#set device to be cuda
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

Net(
  (conv1): Conv2d(1, 8, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
  (conv3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=810, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=10, bias=True)
  (fc4): Linear(in_features=64, out_features=19, bias=True)
)

In [10]:
# cross entropy loss functions for both label & sum
# chose cross entropy loss as it as multi class classification problem for both label & sum
# assumed that sum is also a class variable, not a numeric variable
criterion_label = nn.CrossEntropyLoss()
criterion_sum = nn.CrossEntropyLoss()

# adam optimizer with learning rate of 1e-3
optimizer = torch.optim.Adam(net.parameters(), lr = 0.001) 

#number of epochs for which model will be trained
epoch = 10

# train model
for epoch in range(epoch):
    # initialize total loss as 0 at start of each epoch
    train_loss = 0
    test_loss = 0
    
    ##### Model Training #####
    # specify that model will be trained, in this model gradients will flow and weights will be updated
    net.train()
    #iterate over batches of data from train data loader
    for data in train_loader:
        # extract images, label, random number & sum from batches
        img_tr, label_tr, rand_num_tr, sum_tr = data

        # attach data to gpu
        img_tr = img_tr.to(device = device, dtype = torch.float)
        label_tr = label_tr.to(device = device)
        rand_num_tr = rand_num_tr.to(device = device)
        sum_tr = sum_tr.to(device = device)
        
        # convert data to pytorch variables, so that autograd can be applied on them
        img_tr, label_tr = Variable(img_tr), Variable(label_tr)
        rand_num_tr, sum_tr = Variable(rand_num_tr), Variable(sum_tr)
        
        # clear out the gradients
        optimizer.zero_grad()
        
        # predict label & sum from the model by passing image & random number as inputs
        output_label, output_sum = net(img_tr, rand_num_tr)
        
        # convert label & sum from one hot encoding to classes
        label_tr =  torch.argmax(label_tr, dim = 1)
        sum_tr =  torch.argmax(sum_tr, dim = 1)
        
        # calculate losses for both label & sum
        loss_label_tr = criterion_label(output_label, label_tr)
        loss_sum_tr = criterion_sum(output_sum, sum_tr)

        # add both losses from label & sum to calculate total loss
        loss_tr = loss_label_tr + loss_sum_tr

        # bakcward pass on loss & update weights
        loss_tr.backward()
        optimizer.step()
        
        # update total epoch loss by adding loss from each batch
        train_loss += loss_tr.item()
    
    avg_train_loss = train_loss / len(train_loader)

    ##### Model Evaluation #####
    # specify that model will be just evaluated without any gradient flow
    with torch.no_grad():
         net.eval()
         #iterate over batches of data from test data loader
         for data in train_loader:
             # extract images, label, random number & sum from batches
             img_te, label_te, rand_num_te, sum_te = data

             # attach data to gpu
             img_te = img_te.to(device = device, dtype = torch.float)
             label_te = label_te.to(device = device)
             rand_num_te = rand_num_te.to(device = device)
             sum_te = sum_te.to(device = device)
        
             # predict label & sum from the model by passing image & random number as inputs
             output_label, output_sum = net(img_te, rand_num_te)
        
             # convert label & sum from one hot encoding to classes
             label_te =  torch.argmax(label_te, dim = 1)
             sum_te =  torch.argmax(sum_te, dim = 1)
        
             # calculate losses for both label & sum
             loss_label_te = criterion_label(output_label, label_te)
             loss_sum_te = criterion_sum(output_sum, sum_te)

             # add both losses from label & sum to calculate total loss
             loss_te = loss_label_te + loss_sum_te

             # update total epoch loss by adding loss from each batch
             test_loss += loss_te.item()
         
         avg_test_loss = test_loss / len(test_loader)
    
    # print average loss of each epoch
    print(f'Epoch {epoch}: | Train Loss: {avg_train_loss:.5f} | Test Loss: {avg_test_loss:.5f}')


Epoch 0: | Train Loss: 1.64804 | Test Loss: 2.52513
Epoch 1: | Train Loss: 0.20629 | Test Loss: 0.54847
Epoch 2: | Train Loss: 0.09629 | Test Loss: 0.40790
Epoch 3: | Train Loss: 0.07404 | Test Loss: 0.31103
Epoch 4: | Train Loss: 0.06020 | Test Loss: 0.30970
Epoch 5: | Train Loss: 0.04956 | Test Loss: 0.24033
Epoch 6: | Train Loss: 0.04338 | Test Loss: 0.17115
Epoch 7: | Train Loss: 0.03997 | Test Loss: 0.14499
Epoch 8: | Train Loss: 0.03771 | Test Loss: 0.16685
Epoch 9: | Train Loss: 0.03233 | Test Loss: 0.20463


In [11]:
#checking model performance on train data

#initialize lists to store true & predicted label & sum
label_true = []
sum_true = []
label_pred = []
sum_pred = []

# no gradient flow
with torch.no_grad():
    # iterate over batches in train data
    for data in train_loader:
        # extract data from batch
        img, label, rand_num, sum = data
        
        # move data to gpu
        img = img.to(device = device, dtype = torch.float)
        label = label.to(device = device)
        rand_num = rand_num.to(device = device)
        sum = sum.to(device = device)              
        
        #predict output from network
        output_label, output_sum = net(img, rand_num)
        
        #convert labels & sum from one hot encoding to class
        label =  torch.argmax(label, dim = 1)
        sum =  torch.argmax(sum, dim = 1)
        
        # find predicted class using max prob
        output_label =  torch.argmax(output_label, dim = 1)
        output_sum =  torch.argmax(output_sum, dim = 1)
        
        # append true label & sum
        label_true.extend(list(label.detach().cpu().numpy()))
        sum_true.extend(list(sum.detach().cpu().numpy()))
        
        # append predicted label & sum
        label_pred.extend(list(output_label.detach().cpu().numpy()))
        sum_pred.extend(list(output_sum.detach().cpu().numpy()))

# calculate accuracy for label & sum
print('Accuracy on Train Data - Label: ', round(accuracy_score(label_true, label_pred), 3))
print('Accuracy on Train Data - Sum: ', round(accuracy_score(sum_true, sum_pred), 3))

Accuracy on Train Data - Label:  0.996
Accuracy on Train Data - Sum:  0.993


In [12]:
#checking model performance on test data

#initialize lists to store true & predicted label & sum
label_true = []
sum_true = []
label_pred = []
sum_pred = []

# no gradient flow
with torch.no_grad():
    # iterate over batches in test data
    for data in test_loader:
        # extract data from batch
        img, label, rand_num, sum = data
        
        # move data to gpu
        img = img.to(device = device, dtype = torch.float)
        label = label.to(device = device)
        rand_num = rand_num.to(device = device)
        sum = sum.to(device = device)              
        
        #predict output from network
        output_label, output_sum = net(img, rand_num)
        
        #convert labels & sum from one hot encoding to class
        label =  torch.argmax(label, dim = 1)
        sum =  torch.argmax(sum, dim = 1)
        
        # find predicted class using max prob
        output_label =  torch.argmax(output_label, dim = 1)
        output_sum =  torch.argmax(output_sum, dim = 1)
        
        # append true label & sum
        label_true.extend(list(label.detach().cpu().numpy()))
        sum_true.extend(list(sum.detach().cpu().numpy()))
        
        # append predicted label & sum
        label_pred.extend(list(output_label.detach().cpu().numpy()))
        sum_pred.extend(list(output_sum.detach().cpu().numpy()))

# calculate accuracy for label & sum
print('Accuracy on Test Data - Label: ', round(accuracy_score(label_true, label_pred), 3))
print('Accuracy on Test Data - Sum: ', round(accuracy_score(sum_true, sum_pred), 3))

Accuracy on Test Data - Label:  0.992
Accuracy on Test Data - Sum:  0.99
