# Lab 4
Siamese CNN for miniImageNet dataset

You can find some basic information about Siamese Network [here](https://hackernoon.com/one-shot-learning-with-siamese-networks-in-pytorch-8ddaab10340e)

## Useful links:

Information on Pytorch layers: [link](https://pytorch.org/docs/stable/nn.html)

And more specifically:


*   Linear layers: [link](https://pytorch.org/docs/stable/nn.html#linear-layers)
*   Loss layers: [link](https://pytorch.org/docs/stable/nn.html#loss-functions)
*   Activation functions: [link](https://pytorch.org/docs/stable/nn.html#non-linear-activations-weighted-sum-nonlinearity)
*   Datasets and dataloaders: [link](https://pytorch.org/docs/stable/data.html)
*   Saving and loading models: [link](https://pytorch.org/tutorials/beginner/saving_loading_models.html)
*   Data transforms: [link](https://pytorch.org/docs/stable/torchvision/transforms.html)

## Few-shot testing

**n-way k-shot** testing means:


*   Our testing scenario has **n** categories
*   Every category has **k** images that we can use for training (we know which category they belong to)

## glob function
If we want to get the content of a folder to a list:
```
content_list = glob("folder_path/*")
```

## Splitting strings:
You can split a string into a list given some delimeter:
Split by whitespace:
```
list_of_strings = single_string.split()
```
Split by comma:
```
list_of_strings = single_string.split(",")
```

## Numpy operations

###numpy.where()
usuful to find all indices where a certain condition is met:
to find all indices where we have value 1 in our_array:
```
indices = numpy.where(our_array == 1)[0]
```

### numpy.delete()
To delete elements from an array by index:
```
new_array = numpy.delete(old_array, indices_to_be_deleted)
```

### numpy.random.choice()
Method of choosing multiple elements from an array (with a boolean possibility to indicate if the elements can repeat or not)
```
chosen_elements = numpy.random.choice(array, how_many_to_choose, repeat_or_not)
```

# Accessing elements inside a defined network (PyTorch):
if the network is defined as class and has any fields defined using keyword **self** we can later refer to them.
If we have created an object of class Net() and named it ```NN``` and the Net had a field ```self.anything``` we can later refer to it as ```NN.anything```.


# Saving and loading models
Keeping progress of our training is very important. Being able to save and load our previous models will become very helpful.

Working on entire model:
```
PATH = "./model_path.pt"
# saving entire model:
torch.save(model, PATH)
# loading entire model:
model = torch.load(PATH)
```

# INSTRUCTIONS


Copy this notebook to your Google Colab

Mount your Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

If you haven't already - create a folder in your Google Drive named "data".

You can do it either manually or as command line:
```
%cd /content/gdrive/My\ Drive/
%mkdir data
```

In [None]:
# general path:
data_path = "/content/gdrive/My\ Drive/data/miniImageNet"

Follow the directions from [here](https://mtl.yyliu.net/download/) to download the miniImageNet dataset. You should download 3 .tar files: train.tar, val.tar and test.tar.

Put them in the ```/content/gdrive/My\ Drive/data/miniImageNet/``` folder on your Google Drive. If you don't have such folder, create it first.

Move to that folder:

In [None]:
# go to the folder:
%cd /content/gdrive/My\ Drive/data/miniImageNet
# print out the content of the folder:
%ls

Now untar the files. This might take few minutes.

In [None]:
# Untar each file:
!tar -xvf train.tar
!tar -xvf val.tar
!tar -xvf test.tar

Confirm correct creation of 3 folders (train, test and val):

In [None]:
%ls

ConvNet.py    [0m[01;34mtest[0m/     test.txt  train.tar  [01;34mval[0m/     val.txt
[01;34m__pycache__[0m/  test.tar  [01;34mtrain[0m/    train.txt  val.tar


Create 3 txt files describing our data.
Name them **train.txt**, **val.txt** and **test.txt**.

Each file will have (in a separate line) information about a path to an image followed by a whitespace and an integer describing the label that the image belongs to.

Example:
```
path_to_image_1.jpg 0
path_to_image_2.jpg 0
...
...
path_to_image_124.jpg 7
```

Modify the code below to do that. Ensure that all folders have exactly 600 images.

In [None]:
from glob import glob

def folder_to_txt(split = "train"):

  print("Inside:", split, "split")
  # empty string to keep track of the data:
  txt = ""

  # get the list of folders inside the current "split folder":
  folder_list = glob("./"+split+"/*")
  n_categories = len(folder_list)

  # enumerate through the folders:
  for i, folder_name in enumerate(folder_list):
    # get list of images in the current folder:
    img_list = None   # TO BE IMPLEMENTED (using glob function)
    n_images = len(img_list)

    print(folder_name,"Category:", i, "Found:", n_images, "images")

    # save information on the images into the text file
    for img in img_list:
      txt += None     # TO BE IMPLEMENTED (add information about the current image -> path and label according to the example above. Info on every image should be in a different line.)
  # save all information to a text file
  print("Saving...")
  txt_file = open("./"+split+".txt","w+")
  txt_file.writelines(txt) 
  txt_file.close()


folder_to_txt("train")
folder_to_txt("val")
folder_to_txt("test")

Imports:

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as transforms
from torchvision import datasets

import numpy as np
import matplotlib.pyplot as plt
import random
from PIL import Image

Create a custom dataset where you will use your text files to store information about your data.

In [None]:
# data transforms:
data_transforms = {
    "train": transforms.Compose([
        transforms.RandomResizedCrop(84),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(degrees=10),
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    "val": transforms.Compose([
        transforms.Resize(84),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    "test": transforms.Compose([
        transforms.Resize(84),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# Create your custom dataset below:
class Siamese_Dataset(Dataset):
    
    def __init__(self, txt, transform=None):
        # prepare two lists where we will keep track of image paths and their labels
        self.img_path = []
        self.labels = []
        self.transform = transform

        # read the txt file into our empty lists:
        with open(txt) as f:
            for line in f:
                # TO BE IMPLEMENTED
                # Append correct information to the lists.
                # self.img_path should contain only paths to images (as strings)
                # self.labels should contain labes (as integers)
                self.img_path.append(None)
                self.labels.append(None)

        # let's convert the list to a numpy array        
        self.labels = np.asarray(self.labels)        
        
    def __len__(self):
        return len(self.labels)
        
    def __getitem__(self, index):
        # get the path and label of an image at index 'index'
        path = self.img_path[index]
        label = self.labels[index]

        # with 50% chance find an image from a different class
        # in other 50% of cases - find a different image from the same class:

        p = random.random() # random float between 0-1
        similarity_label = 0
        if p > 0.5:
          # find an image from a different class
          possible_indices = None # TO BE IMPLEMENTED (find all indices where images have different label than our current image)
          similarity_label = 0
        else:
          # find a different image from the same class:
          possible_indices =  None # TO BE IMPLEMENTED (find all indices where images have the same label as our current image)
          # remove our current image from possible choices:
          possible_indices = None # TO BE IMPLEMENTED (remove the index of our current image from the list)
          similarity_label = 1
        
        # choose a second image at random:
        second_index = np.random.choice(possible_indices, 1)[0]
       
        # get its path:
        second_path = self.img_path[second_index]


        # open image 1:
        with open(path, 'rb') as f:
            image1 = Image.open(f).convert('RGB')
        # open image 2:
        with open(second_path, 'rb') as f:
            image2 = None # TO BE IMPLEMENTED (read the second image similarly to the first one)
        if self.transform is not None:
            image1 = self.transform(image1)
            image2 = None # TO BE IMPLEMENTED (transform the second image similarly to the first one)
        return [image1, image2], similarity_label


In [None]:
# number of subprocesses to use for data loading
num_workers = 0 # means to use all
# how many samples per batch to load
batch_size = 128

# create training dataset
train_data = Siamese_Dataset("./train.txt", data_transforms["train"])

# create train and val data loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, num_workers=num_workers, shuffle=True)

Define the network:

We will use an existing network. Copy the ConvNet.py file from [here](https://drive.google.com/file/d/1Hf6j95u88qwJM3TnsWeiKGLTxaFHh4h4/view?usp=sharing) and put it in your "miniImageNet" folder on your Google Drive.

In [None]:
# import the module (to use the network):
from ConvNet import Convnet

# create our own network:
class SiameseNet(nn.Module):

    def __init__(self):
        super().__init__()
        self.net = Convnet()

    def forward(self, x1, x2):
        out1 = None # TO BE IMPLEMENTED (perform a forward pass using our convnet with x1 as input)
        out2 = None # TO BE IMPLEMENTED (perform a forward pass using our convnet with x2 as input)
        return out1, out2

Now let's specify the loss function that we will use.
For this reason let's adapt Contrastive Loss implemented [here](https://github.com/adambielski/siamese-triplet/blob/master/losses.py)

In [None]:
# loss function:
class ContrastiveLoss(nn.Module):
    """
    Contrastive loss
    Takes embeddings of two samples and a target label == 1 if samples are from the same class and label == 0 otherwise
    """

    def __init__(self, margin):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.eps = 1e-9

    def forward(self, output1, output2, target, size_average=True):
        distances = (output2 - output1).pow(2).sum(1)  # squared distances
        losses = 0.5 * (target.float() * distances + (1 + -1 * target).float() * F.relu(self.margin - (distances + self.eps).sqrt()).pow(2))
        return losses.mean() if size_average else losses.sum()

Let's create a function that will take our model as a parameter and will choose n images from each category and generate their embeddings (extract feature vectors that describe the images).

In [None]:
# helper function
def read_images(indices, paths):
  images = []
  for idx in indices:
    with open(paths[idx], 'rb') as f:
      image = Image.open(f).convert('RGB')
      image = data_transforms["test"](image)
    images.append(image)
  return torch.stack(images)

Let's read the information about the validation and test splits and store it in two lists:

In [None]:
# let's read the information about test data first and store it:
def get_data_info(txt_file):
  img_path = []
  labels = []
  with open(txt_file) as f:
    for line in f:
      # TO BE IMPLEMENTED
      # Append correct information to the lists.
      # self.img_path should contain only paths to images (as strings)
      # self.labels should contain labes (as integers)
      img_path.append(None)
      labels.append(None)
  labels = np.asarray(labels) 
  return img_path, labels

val_img_path, val_labels = get_data_info("./val.txt")
test_img_path, test_labels = get_data_info("./test.txt") 

Extract embeddings:

In [None]:
def test(model, phase = "val", n_samples = 50, n_labels = 5, n_predict = 5):
  # max number for n_labels: 20 for test and 16 for val
  # max number for n_samples+n_predict: 600

  # n_samples = number of testing images per category
  # n_predict = k-shot (number of training images per category)
  # n_labels = n-way (how many categories)

  # choose different images and labels depending if we are in the validation or test phase:
  if phase == "val":
    labels = val_labels
    img_path = val_img_path
  else:
    labels = test_labels
    img_path = test_img_path

  # find all unique labels in the current split (phase)
  unique_labels = np.unique(labels)
  # choose n_labels out of all labels (wihtout repetitions)
  unique_labels = None # TO BE IMPLEMENTED 

  outputs = []
  centroids = []
  labels = []
  for i, label in enumerate(unique_labels):
    # find images with a correct label:
    indices = np.where(labels == label)[0] 
    # choose n_samples+n_predict indices without repetition:
    indices = None # TO BE IMPLEMENTED 

    images = read_images(indices, img_path)          # convert paths to images to tensors
    output = None # TO BE IMPLEMENTED (forward pass on images -> since we have only one image (not a pair) use only one part of the Siamese network)
    # convert to Numpy:
    output = output.data.numpy() 

    # calculate the center of the support samples:         
    center = np.mean(output[n_samples:], axis=0)
    centroids.append(center) 

    # the rest of samples is our query:
    outputs.append(output[:n_samples])
    labels.append(i*np.ones(n_samples).astype(int))

  # we have list of separate outputs (and labels) for each category. Convert it to a single array (one for outputs and one for labels):
  embeddings = np.asarray(outputs).reshape(n_samples * n_labels, -1)
  labels = np.asarray(labels).reshape(n_samples * n_labels)
  centroids = np.asarray(centroids)
  return embeddings, labels, centroids


Let's create a testing routine to predict samples in the val/test set:

In [None]:
# Nearest Category Mean
def NCM(query, train):
  train = torch.tensor(train)
  query = torch.tensor(query)
  n = query.shape[0]
  m = train.shape[0]
  query = query.unsqueeze(1).expand(n, m, -1)
  train = train.unsqueeze(0).expand(n, m, -1)
  logits = -((query - train) ** 2).sum(dim=2)
  return logits

# Calculating the confusion matrix
def create_confusion_matrix(logits, labels):
  n_classes = len(np.unique(labels))

  confusion_matrix = None # TO BE IMPLEMENTED (create an array of zeros of size n_classses x n_classes)

  # from the logits (scores) get the prediction:
  pred = torch.argmax(logits, dim=0).numpy()

  # iterate through our images:
  for i in range(len(pred)):
    # for each image add 1 to the confusion matrix cell depending on the image's ground truth (label) and our prediction:
    confusion_matrix[None, None] += 1 # TO BE IMPLEMENTED 

  # calculate the accuracy from the confusion matrix:
  accuracy = np.sum(np.diag(confusion_matrix))/np.sum(confusion_matrix)
  return accuracy

Training the network.

We will iterate through our dataset. For evey iteration we need to:


In [None]:
# initialize the model:
model = SiameseNet()
print(model)

# specify loss:
criterion = ContrastiveLoss(margin=1.0)

# specify optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# specify scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# number of epochs to train the model
n_epochs = 5

n_iterations = int(len(train_data)/batch_size)

max_iters = 30

for epoch in range(n_epochs):
    # monitor training loss
    train_loss = 0.0
    model.train() # prep model for training
    ###################
    # train the model #
    ###################

    for ii, (data, target) in enumerate(train_loader):
        if ii % 5 == 0:
          print("Epoch:", epoch, "Iteration:", ii, "out of:", n_iterations)
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass: compute predicted outputs by passing inputs to the model
        outputs1, outputs2 = model(data[0], data[1])
        # calculate the loss
        loss = criterion(outputs1, outputs2, target)
        
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        
        # update running training loss
        train_loss += loss.item()*data[0].size(0)

        if ii >= max_iters:
          break
    
    # if you have a learning rate scheduler - perform a its step in here
    scheduler.step()
    # print training statistics 
    # calculate average loss over an epoch
    train_loss = train_loss/((ii+1)*batch_size)

    print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch+1, train_loss))

    # Run the validation pass multiple times and report the average:
    model.eval()  # prep model for testing
    n_tests = 10
    accuracy = 0.0
    for i in range(n_tests):
      embeddings, labels, centroids = test(model, phase = "val", n_samples = 100, n_labels = 5, n_predict = 5)
      scores = NCM(centroids, embeddings)
      accuracy += create_confusion_matrix(scores, labels)
    print("Overall accuracy:", 100*(accuracy/n_tests), "%")

After training, test the model on the test set. Write the code for it below. Use the example of validation set as your reference.

In [None]:
# TO BE IMPLEMENTED

## To do:
Run each test scenario 25 times and report the average accuracy. Use the example of validation set as your reference.

1.   Test the trained network in a 5-way 5-shot scenario with 100 unknown examples per each category
2.   Test the trained network in a 20-way 5-shot scenario with 100 unknown examples per each category
3.   Test the trained network in a 5-way 2-shot scenario with 100 unknown examples per each category
4.   Copy a pre-trained model from [here](https://drive.google.com/file/d/1w5Pl_mmSK90UWJLon7NSDPac_FzvLD5U/view?usp=sharing) to your folder, load it and perform the same 3 test comparisons as above.

In your research report - provide information on final training loss and validation accuracy. Run the 3 different testing scenarios described above (both for your model and the pre-trained one), provide the accuracies observed, and explain the differences in the observed accuracies.

