<a href="https://colab.research.google.com/github/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/Assignment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>    

# Group Number: 47
# Student 1: Ahmet Ayrancioglu
# Student 2: Ricardo Andrade
# Student 3: Ruben Wolters

## Setup

In [1]:
import requests
import io
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
%pylab inline

Populating the interactive namespace from numpy and matplotlib


### Training data set


For Assignment 1 you need to use a specific data set prepared using images from the [Omniglot dataset](https://github.com/brendenlake/omniglot). The provided training data set contains images of handwritten characters of size (28,28). 



For training data, the dataset contains 10000 sets of 6 images each. Each set consists of 5 support images and 1 query image. In each set, the first five columns are support images and the last one is a query image.

For training labels, the dataset contains 10000 sets of 5 binary flags for support images. 1 indicates the same character is given in the query image and 0 means not. For example, a label [1,0,0,1,1] means the support images with index 0,3,4 are the same character of query image.

 
 
The following cell provides code that loads the data from hardcoded URLs.You can use the code in this cell to load the dataset or download the data set from the given URLs to your local drive (or your Google drive) and modify the code to load the data from another location. 




In [2]:
def load_numpy_arr_from_url(url):
    """
    Loads a numpy array from surfdrive. 
    
    Input:
    url: Download link of dataset 
    
    Outputs:
    dataset: numpy array with input features or labels
    """
    
    response = requests.get(url)
    response.raise_for_status()

    return np.load(io.BytesIO(response.content)) 
    
    
    
#Downloading may take a while..
train_data = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/4OXkVie05NPjRKK/download')
train_label = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/oMLFw60zpFX82ua/download')

print(f"train_data shape: {train_data.shape}")
print(f"train_label shape: {train_label.shape}\n")

train_data shape: (10000, 6, 28, 28)
train_label shape: (10000, 5)



Now, we plot the first 5 cases in the training dataset. The last column corresponds with the query images of each task. All other images are support images. The image enclosed in a red box denotes the target image that your model should be able to recognize as the same class as the query image. 

In [3]:
def plot_case(caseID,train_data,labels):
    """
    Plots a single sample of the query dataset
    
    Inputs
    caseID: Integer between 0 and 99, each corresponding to a single sample in the query dataset 
    """
    

    support_set,queries = np.split(train_data, [5], axis=1)
    
    f, axes = plt.subplots(1, 6, figsize=(20,5))

    # plot anchor image
    axes[5].imshow(queries[caseID, 0])
    axes[5].set_title(f"Query image case {caseID}", fontsize=15)

    # show all test images images 
    [ax.imshow(support_set[caseID, i]) for i, ax in enumerate(axes[0:-1])]


    # Add the patch to the Axes
    for ind in np.where(labels[caseID]==True)[0]:
        axes[ind].add_patch(Rectangle((0,0),27,27,linewidth=2, edgecolor='r',facecolor='none'))


In [4]:
# [plot_case(caseID,train_data,train_label) for caseID in range(5)] ;

### Query data set

For this task you need to use the following query data set. The dataset contains 1000 sets of 6 images each. The images are also of hand written characters, however these characters are not present in the training data set. The characters in the query data set all come from the Greek alphabet that is not part of the set of alphabets in the training data. 


In [5]:
    
#Downloading may take a while..
test_data = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/06c34QVUr69CxWY/download')
test_label = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/LQIH1CW7lfDXevk/download')

print(f"test_data shape: {test_data.shape}")
print(f"test_label shape: {test_label.shape}\n")

test_data shape: (1000, 6, 28, 28)
test_label shape: (1000, 5)



In [6]:
# [plot_case(caseID,test_data,test_label) for caseID in range(5)] ;

## Build pytorch dataset and dataload

In [7]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, TensorDataset
import numpy as np
from PIL import Image
import itertools

# This dataset is used for testing, where we need all support images, query image, 
# and labels for all sample, so we can calculate per sample metrics, etc.
class RetrivalDataset(Dataset):
  def __init__(self, data, targets):
    # To make the images 28x28x1 instead of 28x28
    data = np.expand_dims(data, 2)

    self.X = torch.FloatTensor(data)
    self.y = torch.LongTensor(targets)

        
  def __getitem__(self, index):
    x = self.X[index]
    y = self.y[index]
    return x, y
  
  
  def __len__(self):
    return len(self.X)


# This dataset is used for training, 
# it consists of triplets in order to use the triple loss easily
class TripetDataset(Dataset):

  # Generate all triplets for all samples
  # We hold these triplets in a regular list as some samples can have
  # 4 triplets and some can have 6 and we cant have an uneven array/tensor
  def __init__(self, data, targets):
    all_triplets = []
    for i in range(len(data)):
      # the x and y
      x = data[i]
      y = targets[i]

      # get the anchor, support, and pos, neg samples
      a = x[-1]
      support = x[:-1]
      pos_samples = support[y == 1]
      neg_samples = support[y == 0]

      # generate all pairs of positive and negative samples
      pairs = itertools.product(pos_samples, neg_samples)
      pairs = list(pairs)
      pairs = np.array(pairs)

      # add the anchor to all pairs to create triplets
      sample_triplets = np.insert(pairs, 1, a, axis=1)

      # expands the dims so the iamge shape is 1x28x28 not 28x28
      sample_triplets = np.expand_dims(sample_triplets, 2)

      # add the triplets of this sample to all triplets
      all_triplets.append(sample_triplets)

    all_triplets = np.array(all_triplets, dtype=object,)
    self.triplets = all_triplets

  # We return one random triplet from all possible triplets for the sample at index
  def __getitem__(self, index):
    # get the triplets of this sample
    sample_triplets = self.triplets[index]

    # select a random index of a triplet
    rand_index = np.random.randint(0, len(sample_triplets))

    # select a ranom triplet using the index
    triplet = sample_triplets[rand_index]
    return triplet
  
  
  def __len__(self):
    return len(self.triplets)

In [8]:
from torch.utils.data import random_split

# Datasets
train_dataset = TripetDataset(train_data, train_label)
test_dataset = RetrivalDataset(test_data, test_label)

# Split the training dataset into training and validation using 0.8 for training
valid_split_ratio = 0.1
train_size = int(len(train_dataset) * (1 - valid_split_ratio))
validation_size = len(train_dataset) - train_size
train_dataset, validation_dataset = random_split(train_dataset, [train_size,validation_size])

# Data Loaders
batch_size=32
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = torch.utils.data.DataLoader(validation_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

## LOSS DEFINITION

In [9]:
class TripletLoss(nn.Module):
  def __init__(self, margin):
    super(TripletLoss, self).__init__()
    self.margin = margin
    self.relu = torch.nn.ReLU()

  def euc_dist(self, x1, x2):
    return (x1 - x2).pow(2).sum(1).pow(1/2)

  def forward(self, anchor, pos, neg):
    pos_dist = self.euc_dist(anchor, pos)
    neg_dist = self.euc_dist(anchor, neg)

    losses = pos_dist - neg_dist + self.margin
    losses = self.relu(losses)
    loss = losses.mean()

    return loss

## MODEL DEFINITION

In [10]:
## Model Definition ##
import torch
import torch.nn as nn
import torch.nn.functional as F

class BestNet(nn.Module):
  def __init__(self):
    super(BestNet, self).__init__()

    self.conv_layers = nn.Sequential(
      nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding='same'),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(2, 2),
      nn.Dropout(0.3),

      nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding='same'),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(2, 2),
      nn.Dropout(0.3),

      nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding='same'),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(2, 2),
      nn.Dropout(0.3),
    )

    self.dense_layers = nn.Sequential(
      nn.Linear(1152, 512),
      nn.ReLU(inplace=True),
      nn.Linear(512, 128),
      nn.Sigmoid(),
    )

  def forward(self, x):
    x = self.conv_layers(x)
    x = x.view(x.size()[0], -1)
    x = self.dense_layers(x)
    return x
  

model = BestNet()
model

BestNet(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Dropout(p=0.3, inplace=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (5): ReLU(inplace=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p=0.3, inplace=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (9): ReLU(inplace=True)
    (10): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (11): Dropout(p=0.3, inplace=False)
  )
  (dense_layers): Sequential(
    (0): Linear(in_features=1152, out_features=512, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=512, out_features=128, bias=True)
    (3): Sigmoid()
  )
)

## Trainer & Definitions

In [11]:
## Training ##
import numpy as np
from tqdm import tqdm
# from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score, classification_report


class Trainer():
  def __init__(self,
               model: torch.nn.Module,
               device: torch.device,
               criterion: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               training_DataLoader: torch.utils.data.Dataset,
               validation_DataLoader: torch.utils.data.Dataset,
               test_DataLoader: torch.utils.data.Dataset,
               epochs: int,
               patience: int, 
              ):
      
    self.model = model
    self.criterion = criterion
    self.optimizer = optimizer
    self.training_DataLoader = training_DataLoader
    self.validation_DataLoader = validation_DataLoader
    self.test_DataLoader = test_DataLoader
    self.device = device
    self.epochs = epochs
    self.patience = patience

  def __str__(self):
    info = [
      f'device: {self.device}',
      f'epochs: {self.epochs}',
      f'criterion: {self.criterion}',
      f'patience: {self.patience}',
      f'optimizer: {self.optimizer}',
    ]

    trainer_str = "\n".join(info)
    return trainer_str


  def run_train_step(self, epoch, pbar):
    train_losses=[]
    self.model.train()
    for batch in self.training_DataLoader:
      # zerograd the parameters
      self.optimizer.zero_grad(set_to_none=True)

      # Get the data and move it to device
      batch = batch.to(self.device)

      # flatten the batch so we can pass it through the model
      batch = batch.flatten(0,1)

      # forward pass
      e_batch = self.model(batch)

      # unflatten the result so we can use it in the loss
      e_batch = e_batch.unflatten(0, (-1, 3))
      e_a, e_p, e_n = e_batch[:,0], e_batch[:,1], e_batch[:,2]
      
      # calculate loss
      loss = self.criterion(e_a, e_p, e_n)  # calculate loss
      loss_value = loss.item()
      train_losses.append(loss_value)

      # backprop  
      loss.backward()
      self.optimizer.step()
      
      # Update the loss on the progress bar
      pbar.set_postfix({f"Loss": f"{np.mean(train_losses):.4f}"})
      pbar.update()

    return train_losses


  def run_eval_step(self):
    valid_losses = []
    self.model.eval()
    for batch in self.validation_DataLoader:
      # Get the data and move it to device
      batch = batch.to(self.device)

      # flatten the batch so we can pass it through the model
      batch = batch.flatten(0,1)

      with torch.no_grad():
        # forward pass
        e_batch = self.model(batch)

        # unflatten the result so we can use it in the loss
        e_batch = e_batch.unflatten(0, (-1, 3))
        e_a, e_p, e_n = e_batch[:,0], e_batch[:,1], e_batch[:,2]
        
        # calculate loss
        loss = self.criterion(e_a, e_p, e_n)  # calculate loss
        loss_value = loss.item()
        valid_losses.append(loss_value)

    return valid_losses


  def run_trainer(self):
    stop = False
    best_val_loss = np.inf
    remaining_patience = self.patience
    # For number of epochs
    for epoch in range(self.epochs):
      # progress bar
      with tqdm(self.training_DataLoader, position=0) as pbar:
        # the label of the progress bar
        pbar.set_description_str(f'Epoch {epoch + 1}/{self.epochs}')

        # train phase
        train_losses = self.run_train_step(epoch, pbar)
        
        # validation phase
        val_losses = self.run_eval_step()

        # loss calculations
        loss = np.mean(train_losses)
        val_loss = np.mean(val_losses)

        # Print training and validation loss
        pbar.set_postfix({f"Loss": f"{loss:.4f}", f"Val-Loss": f"{val_loss:.4f}"})

        # early stopping
        if val_loss > best_val_loss:
          remaining_patience -= 1
          if remaining_patience <= 0:
            break
        else:
          remaining_patience = self.patience
          best_val_loss = val_loss
          torch.save(self.model.state_dict(), model_path)

In [12]:
def run_test(model, data_loader, device, threshold):
  dataLoader = data_loader
  predictions = []
  labels = []
  model.eval()

  # define pairwiseDistance function
  pdist = torch.nn.PairwiseDistance(p=2.0, eps=1e-06, keepdim=False)

  # don't update the gradients
  with torch.no_grad():
    # for every batch
    for batch in dataLoader:
      x, y = batch
      x.to(device)

      # flatten x in order to pass it through the model
      x = x.flatten(0,1)

      # pass it through the model to get the embeddings
      e_x = model(x)

      # unflatten the 0th dimension so we get batches images dimensions back
      e_x = e_x.unflatten(0, (-1, 6))

      # get the support and the query images
      supports = e_x[:,-1].unsqueeze(1)
      queries = e_x[:,:-1]

      # find the distances
      dist = pdist(supports, queries)

      # using the threshold determine the predictions
      pred = dist[:] < threshold

      # append the prections and the labels
      predictions.append(pred)
      labels.append(y)

  predictions = np.concatenate(predictions).reshape((-1))
  labels = np.concatenate(labels).reshape((-1))

  cr = classification_report(labels, predictions)
  print(cr) 

In [13]:
def determine_threshold(model, data_loader, device):
  dataLoader = data_loader
  predictions = []
  labels = []
  model.eval()

  # define pairwiseDistance function
  pdist = torch.nn.PairwiseDistance(p=2.0, eps=1e-06, keepdim=False)

  # define float zero
  zero = torch.torch.tensor(0, dtype=torch.float32)

  # accumulate thresholds
  thresholds = []

  # don't update the gradients
  with torch.no_grad():
    # for every batch
    for batch in dataLoader:
      x, y = batch
      x.to(device)

      # flatten x in order to pass it through the model
      x = x.flatten(0,1)

      # pass it through the model to get the embeddings
      e_x = model(x)

      # unflatten the 0th dimension so we get batches images dimensions back
      e_x = e_x.unflatten(0, (-1, 6))

      # get the support and the query images
      supports = e_x[:,-1].unsqueeze(1)
      queries = e_x[:,:-1]

      # find the distances
      dist = pdist(supports, queries)

      # find the mean similarity of pos examples
      pos_examples = torch.where(y == 1, dist, zero)
      pos_mean = torch.sum(pos_examples) / torch.count_nonzero(pos_examples)

      # find the mean similarity of neg examples
      neg_examples = torch.where(y == 0, dist, zero)
      neg_mean = torch.sum(neg_examples) / torch.count_nonzero(neg_examples)

      # calculate the threshold
      threshold = (pos_mean + neg_mean) / 2

      # store the threshold for the batch
      thresholds.append(threshold)

    final_threshold = np.mean(thresholds)

  return final_threshold

## Training

In [14]:
# TRAINER DEFINITION

# device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device(device)
    
# model save file
model_path = 'saved_model'

# model
model = BestNet().double().to(device)

# epochs
epochs = 60

# margin value
margin=1

# loss
criterion = TripletLoss(margin)

# optimizer
optimizer = torch.optim.AdamW(model.parameters())

# trainer
trainer = Trainer(model=model,
                  device=device,
                  criterion=criterion,
                  optimizer=optimizer,
                  training_DataLoader=train_loader,
                  validation_DataLoader=validation_loader,
                  test_DataLoader=test_loader,
                  epochs=epochs,
                  patience=10,
                  )

print(trainer)

device: cuda
epochs: 60
criterion: TripletLoss(
  (relu): ReLU()
)
patience: 10
optimizer: AdamW (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 0.001
    maximize: False
    weight_decay: 0.01
)


In [15]:
# TRAINING

trainer.run_trainer()

Epoch 1/60: 100%|██████████| 282/282 [00:11<00:00, 24.83it/s, Loss=0.5524, Val-Loss=0.3260]
Epoch 2/60: 100%|██████████| 282/282 [00:11<00:00, 25.57it/s, Loss=0.3176, Val-Loss=0.2209]
Epoch 3/60: 100%|██████████| 282/282 [00:11<00:00, 25.58it/s, Loss=0.2644, Val-Loss=0.1825]
Epoch 4/60: 100%|██████████| 282/282 [00:10<00:00, 25.67it/s, Loss=0.2231, Val-Loss=0.1776]
Epoch 5/60: 100%|██████████| 282/282 [00:10<00:00, 25.68it/s, Loss=0.1951, Val-Loss=0.1460]
Epoch 6/60: 100%|██████████| 282/282 [00:10<00:00, 25.67it/s, Loss=0.1694, Val-Loss=0.1182]
Epoch 7/60: 100%|██████████| 282/282 [00:11<00:00, 25.56it/s, Loss=0.1490, Val-Loss=0.1160]
Epoch 8/60: 100%|██████████| 282/282 [00:10<00:00, 25.65it/s, Loss=0.1374, Val-Loss=0.0998]
Epoch 9/60: 100%|██████████| 282/282 [00:10<00:00, 25.67it/s, Loss=0.1292, Val-Loss=0.0959]
Epoch 10/60: 100%|██████████| 282/282 [00:10<00:00, 25.68it/s, Loss=0.1180, Val-Loss=0.0788]
Epoch 11/60: 100%|██████████| 282/282 [00:10<00:00, 25.71it/s, Loss=0.1092, Val

## Testing

In [16]:
model = BestNet()
model.load_state_dict(torch.load(model_path))

<All keys matched successfully>

In [17]:
threshold_dataset = RetrivalDataset(train_data, train_label)
threshold_loader = torch.utils.data.DataLoader(threshold_dataset, batch_size=batch_size, shuffle=True)

threshold = determine_threshold(model, threshold_loader, device)
print(threshold)

4.559809


In [18]:
run_test(model, test_loader, device, threshold)

              precision    recall  f1-score   support

           0       0.96      0.94      0.95      3057
           1       0.90      0.94      0.92      1943

    accuracy                           0.94      5000
   macro avg       0.93      0.94      0.94      5000
weighted avg       0.94      0.94      0.94      5000

