<a href="https://colab.research.google.com/github/dadebulba/DeepLearningProject/blob/main/DeepLearningProject_person_reid_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Learning Project - Person Re-identification task evaluation

[https://colab.research.google.com/github/dadebulba/DeepLearningProject/blob/main/DeepLearningProject_person_reid_evaluation.ipynb](https://colab.research.google.com/github/dadebulba/DeepLearningProject/blob/main/DeepLearningProject_person_reid_evaluation.ipynb)

Importing from Google Drive the dataset.zip and extract into dataset folder, change the path with your dataset location

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip "/content/drive/MyDrive/UNITN/5° anno/Deep Learning 2021/dataset.zip" -d dataset

importing necessary libraries

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, utils
from torch.utils.data import Dataset, DataLoader
import os
from os import listdir
from os.path import isfile, join
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
import random
random.seed(10)
# print cuda info
print(f"Cuda available: {torch.cuda.is_available()}")
print(f"Cuda device count: {torch.cuda.device_count()}")

## Define Siamese Network
This step is used to load the saved model and use it during evaluation 

In [4]:
class Identity(nn.Module):
  """Identity layer to use into network"""
  def __init__(self):
      super(Identity, self).__init__()
      
  def forward(self, x):
      return x

class Siamese(nn.Module):
  """Siamese network using two resnet-50 as branches"""
  """
  Args:
    resnet: trained resnet-50
  """
  def __init__(self, resnet):
      super(Siamese, self).__init__()
      self.resnet = resnet
      self.resnet.fc = Identity()
      self.linear = torch.nn.Sequential(
        torch.nn.Linear(in_features=2048, out_features=1024),
        torch.nn.Linear(in_features=1024, out_features=512),
        torch.nn.Sigmoid()
      )
  """
  Returns: resulting tensor from input inference into one branch of siamese
  Args:
    x: input image
  """
  def forward_one(self, x):
      x = self.resnet(x)
      x = x.view(x.size()[0], -1)
      x = self.linear(x)
      return x
  """
  Returns: resulting tensors from input inference into siamese
  Args:
    x1: input image1
    x2: input image2
  """
  def forward(self, x1, x2):
      out1 = self.forward_one(x1)
      out2 = self.forward_one(x2)
      return out1, out2

'''
Returns: fine tuned resnet-50
Args:
  num_classes: number of classes in the dataset.
               This is equal to the number of output neurons.
'''
def initialize_resnet(num_classes):
  #load pre-trained resnet
  resnet = torchvision.models.resnet50(pretrained=True)
  num_features = resnet.fc.in_features
  resnet.fc = torch.nn.Sequential(
    torch.nn.Linear(in_features=num_features, out_features=1024),
    torch.nn.Linear(in_features=1024, out_features=512),
    torch.nn.Linear(in_features=512, out_features=num_classes),
    torch.nn.Sigmoid()
  )

  return resnet

In [5]:
class PeopleValidationDataset(Dataset):
    """People training dataset containing tuple of images with corresponding similarity value."""

    def __init__(self, X1, X2, root_dir, transform):
        """
        Args:
            X1: first list of image names
            X2: first list of image names
            root_dir: folder where to find the images to load
            transform (optional): Optional transform to be applied on a sample.
        """
        self.transform = transform
        self.X1 = X1
        self.X2 = X2
        self.root_dir = root_dir

    def __len__(self):
        return len(self.X1)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name1 = self.X1[idx]
        img_name2 = self.X2[idx]

        image1 = Image.open("%s/%s" % (self.root_dir, img_name1))
        image2 = Image.open("%s/%s" % (self.root_dir, img_name2))
        if self.transform != None:
          image1 = self.transform(image1)
          image2 = self.transform(image2)
        else:
          image1 = T.ToTensor()(image1)
          image2 = T.ToTensor()(image2)
        image1 = F.interpolate(image1, size=128)  
        image2 = F.interpolate(image2, size=128)  

        sample = (image1, img_name1, image2, img_name2)
        return sample

In [6]:
"""
Returns: 
  train_X1: first list of inputs used during training
  train_X2: second list of inputs used during training
  train_Y: similarity values for training inputs
  val_X1: first list of inputs used during validation
  val_X2: second list of inputs used during validation
  val_Y: similarity values for validation inputs
Input arguments
  root_dir: directory containing the dataset images
  val_rate (optional): percentage of images to put into validation set
"""
def split_training_data(root_dir, val_rate=0.03):
  full_dataset_X1 = []
  full_dataset_X2 = []
  full_dataset_Y = []
  img_files = [f for f in listdir(root_dir)]
  img_files.sort()
  if (len(img_files) % 2 != 0):
    img_files = img_files[:-1]

  for idx, img in enumerate(img_files):
    if(idx % 2 == 0):
      full_dataset_X1.append(img)
    if(idx % 2 != 0):
      full_dataset_X2.append(img)
      if (int(full_dataset_X1[-1].split("_")[0]) == int(full_dataset_X2[-1].split("_")[0])):
        full_dataset_Y.append(torch.tensor(1))
      else:
        full_dataset_Y.append(torch.tensor(0))

  val_X1 = []
  val_X2 = []
  val_Y = []
  train_X1 = []
  train_X2 = []
  train_Y = []
  for idx, _ in enumerate(full_dataset_Y):
    if (idx <= len(full_dataset_Y)*val_rate):
      val_X1.append(full_dataset_X1[idx])
      val_X2.append(full_dataset_X2[idx])
      val_Y.append(full_dataset_Y[idx])
    else:
      train_X1.append(full_dataset_X1[idx])
      train_X2.append(full_dataset_X2[idx])
      train_Y.append(full_dataset_Y[idx])

  return train_X1, train_X2, train_Y, val_X1, val_X2, val_Y

"""
Returns: 
  val_map: a map with image id as key and list of associated images as value
Input arguments
  val_X1: first list of inputs used during validation
  val_X2: second list of inputs used during validation
"""
def build_val_map(val_X1, val_X2):
  val_map = {}
  for i in range(len(val_X1)):
    if int(val_X1[i].split("_")[0]) not in val_map:
      val_map[int(val_X1[i].split("_")[0])] = []
    if int(val_X2[i].split("_")[0]) not in val_map:
      val_map[int(val_X2[i].split("_")[0])] = []
    val_map[int(val_X1[i].split("_")[0])].append(val_X1[i])
    val_map[int(val_X2[i].split("_")[0])].append(val_X2[i])
  for key in val_map:
    val_map[key] = list(set(val_map[key]))
  return val_map

"""
Returns: 
  val_loader: DataLoader used during evaluation
Input arguments
  img_root: directory containing the dataset images
"""
def get_data(img_root):

  # Get splitted data
  train_X1, train_X2, train_Y, val_X1, val_X2, val_Y = split_training_data(root_dir=img_root)
  val_map = build_val_map(val_X1, val_X2)
  val_merged =  list(set(val_X1 + val_X2))
  print(len(val_map), len(val_merged))

  val_X1 = []
  for key in val_map:
    val_X1 = val_X1 + [val_map[key][0] for i in range(len(val_merged))]
  val_X2 = val_merged*len(val_map.keys())
  print(len(val_X1), len(val_X2))
  validation_data = PeopleValidationDataset(X1=val_X1,
                                     X2=val_X2,
                                     root_dir=img_root,
                                     transform=None)

  val_loader = torch.utils.data.DataLoader(validation_data, shuffle=False, num_workers=0) #before num_workers=4
  
  return val_loader

In [7]:
def test(net, val_loader, threshold , device='cuda:0'):
  predictions = {}
  ground_truth = {}
  net.eval() # Strictly needed if network contains layers which has different behaviours between train and test
  with torch.no_grad():
    dataiter = iter(val_loader)
    i = 0
    while True:
      try:
        x0, name0, x1, name1 = next(dataiter)
        i+=1
        if i % 5000 == 0:
          print("Passed images:", i)
      except:
        break
      x0 = x0.to('cuda:0')
      x1 = x1.to('cuda:0')
      concatenated = torch.cat((x0,x1),0)

      # Forward pass
      output1, output2 = net.forward(x0, x1)
      euclidean_distance = F.pairwise_distance(output1, output2)

      if name0 not in ground_truth:
        ground_truth[name0] = []
      if (name0[0].split("_")[0] == name1[0].split("_")[0]):
        ground_truth[name0].append(name1)

      if name0 not in predictions:
        predictions[name0] = []
      if euclidean_distance.item() < threshold:
        predictions[name0].append(name1)
        

  return predictions, ground_truth

In [8]:
def evaluate_map(predictions, ground_truth):
      '''
      Computes the mAP (https://jonathan-hui.medium.com/map-mean-average-precision-for-object-detection-45c121a31173) of the predictions with respect to the given ground truth
      In person reidentification mAP refers to the mean of the AP over all queries.
      The AP for a query is the area under the precision-recall curve obtained from the list of predictions considering the
      ground truth elements as positives and the other ones as negatives

      :param predictions: dictionary from query filename to list of test image filenames associated with the query ordered
                          from the most to the least confident prediction.
                          Represents the predictions to be evaluated.
      :param ground_truth: dictionary from query filename to set of test image filenames associated with the query
                            Represents the ground truth on which to evaluate predictions.

      :return:
      '''

      m_ap = 0.0
      for current_ground_truth_query, current_ground_truth_query_set in ground_truth.items():

          # No predictions were performed for the current query, AP = 0
          if not current_ground_truth_query in predictions:
              continue

          current_ap = 0.0  # The area under the curve for the current sample
          current_predictions_list = predictions[current_ground_truth_query]

          # Recall increments of this quantity each time a new correct prediction is encountered in the prediction list
          delta_recall = 1.0 / len(current_ground_truth_query_set)

          # Goes through the list of predictions
          encountered_positives = 0
          for idx, current_prediction in enumerate(current_predictions_list):
              # Each time a positive is encountered, compute the current precition and the area under the curve
              # since the last positive
              if current_prediction in current_ground_truth_query_set:
                  encountered_positives += 1
                  current_precision = encountered_positives / (idx + 1)
                  current_ap += current_precision * delta_recall

          m_ap += current_ap

      # Compute mean over all queries
      m_ap /= len(ground_truth)

      return m_ap

In [16]:
# Logger for loss and accuracy at each step 
def log_values(writer, step, loss, prefix):
  writer.add_scalar(f"{prefix}/loss", loss, step)

def main(device='cuda:0', 
         img_root='./dataset',
         model_root="/content/drive/MyDrive/UNITN/5° anno/Deep Learning 2021/models/siamese_15epoch_net_reid_resnet50_5epoch.pth",
         step=0.01,
         num_step=20):

  writer = SummaryWriter(log_dir="runs/exp4")

  # Get dataloader containing data to evaluate
  val_loader = get_data(img_root="%s/train" % (img_root))

  # Instantiates the model
  net = initialize_resnet(num_classes=56) # taken from previous trained resnet last layer number of neurons
  net = Siamese(net)
  net.load_state_dict(torch.load(model_root))
  net.to(device)

  prev_mAP = 0
  # Starting from a threshold of 0 increase by step value for num_step, will stop at end of num_step or when mAP degrades
  for i in [step*(j+1) for j in range(num_step)]:
    predictions, ground_truth = test(net, val_loader, i)    
    mAP = evaluate_map(predictions, ground_truth)
    print("mAP for threshold %s is %s" % (i, mAP))
    if mAP < prev_mAP:
      print("Performance degradation, breaking...", mAP, prev_mAP)
      break
    prev_mAP = mAP

In [None]:
main()