<a href="https://colab.research.google.com/github/Pivoz/Machine-Learning-Collection/blob/master/PersonReID_TensorExtractor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[https://colab.research.google.com/github/dadebulba/DeepLearningProject/blob/main/DeepLearningProject.ipynb](https://colab.research.google.com/github/dadebulba/DeepLearningProject/blob/main/DeepLearningProject.ipynb)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip "/content/drive/MyDrive/UNITN/5° anno/Deep Learning 2021/dataset.zip" -d dataset

# Deep Learning Project - People ReID

In [3]:
# import necessary libraries
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, utils
from torch.utils.data import Dataset, DataLoader
import os
from os import listdir
from os.path import isfile, join
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
import random
import gc
random.seed(10)
# print cuda info
print(f"Cuda available: {torch.cuda.is_available()}")
print(f"Cuda device count: {torch.cuda.device_count()}")

Cuda available: True
Cuda device count: 1


# Network
## Siamese Network

In [4]:
'''
Input arguments
  num_classes: number of classes in the dataset.
               This is equal to the number of output neurons.
'''

class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()
        
    def forward(self, x):
        return x

class Siamese(nn.Module):

    def __init__(self, resnet):
        super(Siamese, self).__init__()
        self.resnet = resnet
        self.resnet.fc = Identity()
        self.linear = torch.nn.Sequential(
          torch.nn.Linear(in_features=2048, out_features=1024),
          torch.nn.Linear(in_features=1024, out_features=512),
          torch.nn.Sigmoid()
        )

    def forward_one(self, x):
        x = self.resnet(x)
        x = x.view(x.size()[0], -1)
        x = self.linear(x)
        return x

    def forward(self, x1, x2):
        out1 = self.forward_one(x1)
        out2 = self.forward_one(x2)
        return out1, out2

def initialize_alexnet(num_classes):
  # load the pre-trained Alexnet
  #alexnet = torchvision.models.alexnet(pretrained=True)
  wide_resnet = torchvision.models.resnet50(pretrained=True)
  num_features = wide_resnet.fc.in_features
  wide_resnet.fc = torch.nn.Sequential(
    torch.nn.Linear(in_features=num_features, out_features=1024),
    torch.nn.Linear(in_features=1024, out_features=512),
    torch.nn.Linear(in_features=512, out_features=num_classes),
    torch.nn.Sigmoid()
  )
  #print(resnext)
  # get the number of neurons in the penultimate layer
  #in_features = alexnet.classifier[6].in_features
  
  # re-initalize the output layer
  #alexnet.classifier[6] = torch.nn.Sequential(
  #  torch.nn.Linear(in_features=in_features, out_features=num_classes),
  #  torch.nn.Sigmoid()
  #)
  return wide_resnet

In [5]:
class PeopleTestDataset(Dataset):
    def __init__(self, X1, X2, query_dir, test_dir):
        self.X1 = X1
        self.X2 = X2
        self.query_dir = query_dir
        self.test_dir = test_dir

    def __len__(self):
        return len(self.X1)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name1 = self.X1[idx]
        img_name2 = self.X2[idx]

        image1 = Image.open("%s/%s" % (self.query_dir, img_name1))
        image2 = Image.open("%s/%s" % (self.test_dir, img_name2))
        image1 = T.ToTensor()(image1)
        image2 = T.ToTensor()(image2)
        image1 = F.interpolate(image1, size=128)  
        image2 = F.interpolate(image2, size=128)  

        sample = (image1, img_name1, image2, img_name2)
        return sample

class TestingDataset(Dataset):
  def __init__(self, images, dir):
      self.images = images
      self.dir = dir

  def __len__(self):
      return len(self.images)

  def __getitem__(self, idx):
      if torch.is_tensor(idx):
          idx = idx.tolist()

      img_name = self.images[idx]

      image = Image.open("%s/%s" % (self.dir, img_name))
      image = T.ToTensor()(image)
      image = F.interpolate(image, size=128)

      sample = (image, img_name)
      return sample

In [6]:
def getDataToEvaluate(test_dir, query_dir):
    test_files = [f for f in listdir(test_dir)]
    query_files = [f for f in listdir(query_dir)]

    # test_files.sort()
    # query_files.sort()

    X1 = []
    X2 = []
    for query in query_files:
        for test in test_files:
            X1.append(query)
            X2.append(test)
    return X1, X2
    
def imshow(img,text=None,should_save=False):
    npimg = img.cpu().numpy()
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()   

In [14]:
def log_values(writer, step, loss, prefix):
  writer.add_scalar(f"{prefix}/loss", loss, step)

def extractTensors():

  # Instantiates the model
  net = initialize_alexnet(num_classes=56).to('cuda:0')
  net = Siamese(net)
  net.load_state_dict(torch.load("/content/drive/MyDrive/models/siamese_net_reid_resnet50_5epoch.pth"))
  net.to('cuda:0')
  net.eval()
  query = [f for f in listdir("./dataset/queries")]
  test = [f for f in listdir("./dataset/test")]

  query_images, test_images = getDataToEvaluate(test_dir="dataset/test", query_dir="dataset/queries")
  #print(len(X1), len(X2))
  '''
  test_data = PeopleTestDataset(X1=query_images,
                                X2=test_images,
                                query_dir="dataset/queries",
                                test_dir="dataset/test")
  '''
  testing_dataset = TestingDataset(images=test, dir="./dataset/test")
  test_dataloader = torch.utils.data.DataLoader(testing_dataset, 1, shuffle=False, num_workers=4) #before num_workers=4

  query_dataset = TestingDataset(images=query, dir="./dataset/queries")
  query_dataloader = torch.utils.data.DataLoader(query_dataset, 1, shuffle=False, num_workers=2) #before num_workers=4

  # For queries
  if not os.path.exists('./dataset/query_tensors'):
    os.mkdir("./dataset/query_tensors")
  for idx, (image, image_name) in enumerate(query_dataloader):
    if (idx % 100 == 0):
      print(idx)

    # Compute the forward pass
    tensor = image.to('cuda:0')
    tensor_to_save = net.forward_one(tensor)
    torch.save(tensor_to_save, "./dataset/query_tensors/{}.ph".format(image_name[0].split(".")[0]))

  print("Query tensors extraction completed\n")

  # For test
  if not os.path.exists('./dataset/test_tensors'):
    os.mkdir("./dataset/test_tensors")
  for idx, (image, image_name) in enumerate(test_dataloader):
    if (idx % 100 == 0):
      print(idx)

    # Compute the forward pass
    tensor = image.to('cuda:0')
    tensor_to_save = net.forward_one(tensor)
    torch.save(tensor_to_save, "./dataset/test_tensors/{}.ph".format(image_name[0].split(".")[0]))

  print("Test tensors extraction completed\n")

In [9]:
# import shutil
# shutil.make_archive("query_tensors_archive", 'zip', "./dataset/query_tensors")

In [10]:
# !unzip "/content/query_tensors_archive.zip" -d query_tensors
# !unzip "/content/test_tensors_archive.zip" -d test_tensors

In [65]:
def main(threshold=0.001):
  query_tensors = [f for f in listdir("./dataset/query_tensors")]
  query_images = [f for f in listdir("./dataset/queries")]
  test_tensors = [f for f in listdir("./dataset/test_tensors")]
  test_images = [f for f in listdir("./dataset/test")]

  query_tensors.sort()
  query_images.sort()
  test_tensors.sort()
  test_images.sort()

  test_tensors_cuda = []
  for test in test_tensors:
    test_tensor = torch.load("{}/{}".format("./dataset/test_tensors", test))
    test_tensor.to('cuda:0')
    test_tensors_cuda.append(test_tensor)

  print("Test loaded")

  f = open("reid_results.txt", "w")

  for idxQ, query in enumerate(query_tensors):
    #if idxQ == 20:
    #  break

    count = 0
    #d#istance = 0
    #concatenated = None

    print(query, "processing")
    query_tensor = torch.load("{}/{}".format("./dataset/query_tensors", query))
    query_tensor.to('cuda:0')

    to_print = "{}:".format(query_images[idxQ])

    for idxT, test in enumerate(test_tensors_cuda):
      euclidean_distance = F.pairwise_distance(query_tensor, test)
      if euclidean_distance.item() < threshold:
        to_print = "{}{},".format(to_print, test_images[idxT])
        count+=1

    if count != 1:
      print("###### WARNING ###### {} with {} match\n".format(query_images[idxQ], count))


    f.write(to_print[:-1])
    f.write("\n")

  f.close()
  print("Done")

In [None]:
extractTensors()

In [None]:
main()