#### **Welcome to Assignment 3 on Deep Learning for Computer Vision.**
This notebook consists of two parts. In Part-1 you'll have to code a Siamese Network, for Part-2 you need to go through a official PyTorch tutorial, understand it and answer some questions.
  
#### **Instructions**
1. Use Python 3.x to run this notebook
2. Write your code only in between the lines 'YOUR CODE STARTS HERE' and 'YOUR CODE ENDS HERE'.
you should not change anything else in the code cells, if you do, the answers you are supposed to get at the end of this assignment might be wrong.
3. Read documentation of each function carefully.
4. All the Best!

**Acknowledgement:** Some parts of this implementation are inspired from https://github.com/adambielski/siamese-triplet


# Part-1

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data.sampler import BatchSampler
from torch.optim import lr_scheduler
from PIL import Image
import timeit

## Please DONOT remove these lines. 
torch.manual_seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(0)
########################

#### YOUR CODE STARTS HERE ####
# check availability of GPU and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#### YOUR CODE ENDS HERE ####


#### Prepare the dataset for Siamese Network

In [None]:
class SiameseDataset(Dataset):
    def __init__(self, train=True):
        
        self.train = train
        #### YOUR CODE STARTS HERE ####
        # define a set of transforms for preparing the dataset
        self.transform = transforms.Compose([
                              transforms.ToTensor(), # convert the image to a pytorch tensor
                              transforms.Normalize((0.137,), (0.3081,)) # normalise the images with mean and std of the dataset
                          ])
        # Load the MNIST training, test datasets using `torchvision.datasets.MNIST
        # set the train parameter to self.train and transform parameter to self.transform
        self.dataset = datasets.MNIST('./data/', train=self.train, download=True,
                          transform=self.transform)
        #### YOUR CODE ENDS HERE ####
        if self.train:
            #### YOUR CODE STARTS HERE ####
            # assign input (x-values) of training data 
            self.train_data = self.dataset.train_data
            # assign labels of training data 
            self.train_labels = self.dataset.train_labels
            # get the set of all the labels in the dataset
            self.labels_all = set(self.train_labels.numpy())
            self.label_to_idx = {} # assign a unique index to all labels in the dataset and store them in a dictionary 
            for each_label in self.labels_all:
              self.label_to_idx[each_label] = np.where(self.train_labels.numpy() == each_label)[0]
            #### YOUR CODE ENDS HERE ####
        else:
            #### YOUR CODE STARTS HERE ####
            # assign input (x-values) of test data 
            self.test_data = self.dataset.test_data
            # assign labels of test data 
            self.test_labels = self.dataset.test_labels
            # get the set of all labels in the dataset
            self.labels_all = set(self.test_labels.numpy())
            self.label_to_idx = {} # assign a unique index to all labels in the dataset and store them in a dictionary 
            for each_label in self.labels_all:
              self.label_to_idx[each_label] = np.where(self.test_labels.numpy() == each_label)[0]
            #### YOUR CODE ENDS HERE ####
            # DONOT change this line of code  
            random_state = np.random.RandomState(0)

            positive_samples = [] # this will be a list of lists
            for ind in range(0, len(self.test_data), 2):
              positive_samples.append([ind, random_state.choice(self.label_to_idx[self.test_labels[ind].item()]), 1])
            
            negative_samples = []
            for ind in range(1, len(self.test_data), 2):
              negative_samples.append([ind, random_state.choice(self.label_to_idx[np.random.choice(
                                                           list(self.labels_all - set([self.test_labels[ind].item()])))]), 0])
            
            # combine both positive and negative samples
            #### YOUR CODE STARTS HERE ####
            self.test_samples = positive_samples + negative_samples
            #### YOUR CODE ENDS HERE ####
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        if self.train:
            target = np.random.randint(0, 2)
            first_image, first_label = self.train_data[index], self.train_labels[index].item()
            if target == 1:
                siamese_index = index
                while siamese_index == index:
                    siamese_index = np.random.choice(self.label_to_idx[first_label])
            else:
                siamese_label = np.random.choice(list(self.labels_all - set([first_label])))
                siamese_index = np.random.choice(self.label_to_idx[siamese_label])
            second_image = self.train_data[siamese_index]
        else:
            first_image = self.test_data[self.test_samples[index][0]]
            second_image = self.test_data[self.test_samples[index][1]]
            target = self.test_samples[index][2]
        first_image = Image.fromarray(first_image.numpy(), mode='L')
        second_image = Image.fromarray(second_image.numpy(), mode='L')
        first_image = self.transform(first_image)
        second_image = self.transform(second_image)
        return (first_image, second_image), target


In [None]:
class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()
        #### YOUR CODE STARTS HERE ####
        # Define a sequential block as per the instructions below:
        # Build three blocks with each block containing: Conv->PReLU->Maxpool layers
        # Three conv layers should have 16, 32, 64 output channels respectively
        # Use convolution kernel size 3
        # For maxpool use a kernel size of 2 and stride of 2

        self.convnet = nn.Sequential(nn.Conv2d(1, 16, 3), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2),
                                     nn.Conv2d(16, 32, 3), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2),
                                     nn.Conv2d(32, 64, 3), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2),)
        # Define linear->PReLU->linear->PReLU->linear
        # The first two linear layers should have 256 and 128 output nodes
        # The final FC layer should have 2 nodes
        self.fc = nn.Sequential(nn.Linear(64 * 1 * 1, 256),
                                nn.PReLU(),
                                nn.Linear(256, 128),
                                nn.PReLU(),
                                nn.Linear(128, 2)
                                )
        #### YOUR CODE ENDS HERE ####

    def forward(self, x):
      #### YOUR CODE STARTS HERE ####
        # Define the forward pass, convnet -> fc
        output = self.convnet(x)
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        #### YOUR CODE ENDS HERE ####
        return output

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self, embedding_net):
        super(SiameseNetwork, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2):
        # Call the embedding network for both the inputs and return the output
        #### YOUR CODE ENDS HERE ####
        op1 = self.embedding_net(x1)
        op2 = self.embedding_net(x2)
        #### YOUR CODE ENDS HERE ####
        return op1, op2

$$
L\left(x_{0}, x_{1}, y\right)=\frac{1}{2} y\left\|f\left(x_{0}\right)-f\left(x_{1}\right)\right\|_{2}^{2}+\frac{1}{2}(1-y)\left\{\max \left(0, m-\left\|(f\left(x_{0}\right)-f\left(x_{1}) + \epsilon\right)\right\|_{2}\right)\right\}^{2}
$$

In [None]:
class ContrastiveLossSiamese(nn.Module):

    def __init__(self, margin):
        super(ContrastiveLossSiamese, self).__init__()
        self.margin = margin
        self.eps = 1e-9

    def forward(self, output1, output2, target):
        # Use the equation mentioned above to define the loss
        #### YOUR CODE STARTS HERE ####
        distances = (output2 - output1).pow(2).sum(1) 
        loss_value = 0.5 * (target.float() * distances +
                        (1 + -1 * target).float() * F.relu(self.margin - (distances + self.eps).sqrt()).pow(2))
        #### YOUR CODE ENDS HERE ####
        loss_value = loss_value.mean()

        return loss_value


In [None]:
def train(model, train_loader, device, optimizer, criterion, epoch):
    model.train()
    losses = []
    total_loss = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        target = target if len(target) > 0 else None
        #### YOUR CODE STARTS HERE ####
        # send the image, target to the device
        # data is not a single value here,
        # ensure datatype of variable `data` is tuple
        data = tuple(each.to(device) for each in data)
        target = target.to(device)
        # flush out the gradients stored in optimizer
        optimizer.zero_grad()
        # pass the image to the model and assign the output to variable named outputs
        # python star operator will be useful here
        # if the datatype of outputs is not a tuple, make it to a tuple
        outputs = model(*data)
        if type(outputs) not in (tuple, list):
            outputs = (outputs,)
        loss_inputs = outputs
        # create inputs to the contrastive loss
        # datatype of target should be tuple
        if target is not None:
          target = (target,)
        loss_inputs += target
        # calculate the loss
        loss = criterion(*loss_inputs)
        # append the loss to losses list and update the total_loss variable
        losses.append(loss.item())
        total_loss += loss.item()
        # do a backward pass
        loss.backward()
        # update the weights
        optimizer.step()
        #### YOUR CODE ENDS HERE ####

        if batch_idx % 20 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data[0]), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), np.mean(losses)))  
    total_loss /= (batch_idx + 1)
    print('Average loss on training set: {:.6f}'.format(total_loss))

def test(model, test_loader, device, criterion):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
          target = target if len(target) > 0 else None
          if not type(data) in (tuple, list):
              data = (data,)
          #### YOUR CODE STARTS HERE ####
          # send the image, target to the device
          # data is not a single value here,
          # ensure datatype of variable `data` is tuple
          data = tuple(each.to(device) for each in data)
          target = target.to(device)
          # pass the image to the model and assign the output to variable named outputs
          # python star operator will be useful here
          # if the datatype of outputs is not a tuple, make it to a tuple
          outputs = model(*data)
          if type(outputs) not in (tuple, list):
              outputs = (outputs,)
          # create inputs to the contrastive loss
          # datatype of target should be tuple
          loss_inputs = outputs
          if target is not None:
              target = (target,)
          loss_inputs += target
          # calculate the loss
          loss = criterion(*loss_inputs)
          # update the test+loss variable
          test_loss += loss.item()
          #### YOUR CODE ENDS HERE ####

    test_loss /= len(test_loader)
    print('Average loss on test set: {:.6f}'.format(test_loss))


In [None]:
# define the training and test sets
# use SiameseDataset
train_dataset = SiameseDataset(train=True)
test_dataset = SiameseDataset(train=False)

# create dataloaders for training and test datasets
# use a batch size of 128 and set shuffle=True for the training set, set num_workers to 2 and pin_memory to True
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2, pin_memory=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=2, pin_memory=True)

margin = 1.
# create a instance of the embedding network and pass it as input to Siamese network
embedding_net = EmbeddingNet()
model = SiameseNetwork(embedding_net)
if torch.cuda.is_available():
    model.cuda()
# define the contrative loss with the specified margin
criterion = ContrastiveLossSiamese(margin)
optimizer = optim.Adam(model.parameters(), lr=0.01)



In [None]:
start = timeit.default_timer()
for epoch in range(1, 5):
  train(model, train_dataloader, device, optimizer, criterion, epoch)
  test(model, test_dataloader, device, criterion)

stop = timeit.default_timer()
print('Total time taken: {} seconds'.format(int(stop - start)) )

Average loss on training set: 0.053572
Average loss on test set: 0.028328
Average loss on training set: 0.026986
Average loss on test set: 0.020759
Average loss on training set: 0.024230
Average loss on test set: 0.022729
Average loss on training set: 0.021231
Average loss on test set: 0.026279
Total time taken: 127 seconds


### Question 1

Run the code cell above and report the average loss on the test set loss (If you are not getting the exact number shown in options, please report the closest number).
1. 0.03
2. 0.3
3. 0.001
4. 1

# Part-2