In [34]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

Mounted at /content/drive


In [35]:

import numpy as np
import torch
import torchvision
from torch.nn import Linear, ReLU, Dropout, Hardswish, CrossEntropyLoss
from torch.optim import Adam, lr_scheduler
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset, RandomSampler
from torchvision.utils import make_grid
from torchvision import datasets
from torchvision.transforms import v2
import skimage.io as io
from tempfile import TemporaryDirectory
import os
import time
from PIL import Image
import matplotlib.pyplot as plt
import random
import json


In [None]:
cwd = os.getcwd()

In [36]:
t2img = v2.ToPILImage()
img2t = v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)])

# Creating feature extractor from trained model

In [37]:

mobilenet_params_path = os.path.join(cwd,'models/mobilenet/mobilenet_model')
mobilenet_model = torchvision.models.mobilenet_v3_large(weights = 'IMAGENET1K_V2')
mobilenet_model.classifier = torch.nn.Sequential(
    Linear(in_features=960, out_features=1280, bias=True),
    Hardswish(),
    Dropout(p=0.2, inplace=True),
    Linear(in_features=1280, out_features=37, bias=True)
)

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
device ='cpu'
print(device)
mobilenet_model = mobilenet_model.to(device)




cpu


In [38]:

# loading the model
mobilenet_model.load_state_dict(torch.load(mobilenet_params_path, map_location = torch.device('cpu')))
mobilenet_model.eval()

MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bi

In [39]:

# removing last layer from model
feature_extractor = torch.nn.Sequential(*list(mobilenet_model.children())[:-1])
feature_extractor.eval()

Sequential(
  (0): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bias=False

# Sampling data for retrieval

In [40]:
from torch.utils.data.sampler import SubsetRandomSampler
pets_path_train = os.path.join(cwd,'dataset', 'train')

transforms = v2.Compose([
    v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]),
    v2.Resize((224,224), interpolation=v2.InterpolationMode.NEAREST),
    ])

training_data = datasets.OxfordIIITPet(
    root=pets_path_train,
    split='trainval',
    download=False,
    transform = transforms,
    target_types = ['category']
)

# OPTIONAL: setting random seed for reproductible results
random_seed = 42
random.seed(random_seed)

# shuffling dataset indexes
len_dataset = len(training_data)
indexes = list(range(len_dataset))
random.shuffle(indexes)


# selecting query and reference subset from dataset
num_references = 50
num_targets = 200
reference_indexes = indexes[:num_references]
target_indexes = indexes[num_references:num_references+num_targets]


# creating subset dataloaders
reference_dataloader = DataLoader(training_data, batch_size = 1, sampler = SubsetRandomSampler(reference_indexes))
target_dataloader = DataLoader(training_data, batch_size = 1, sampler = SubsetRandomSampler(target_indexes))






In [41]:

annot_dir = os.path.join(cwd,'dataset/train/oxford-iiit-pet/annotations')
images_dir = os.path.join(cwd,'dataset/train/oxford-iiit-pet/images')

# mapping classes to breed names
# [BREED, CLASS ID [1:37], SPECIES [1 CAT, 2 DOG], BREED ID [CAT [1:25], DOG[1:12]]]
with open(os.path.join(annot_dir,'trainval.txt'), 'r') as train_annot:
  targets = [line.strip().split() for line in train_annot.readlines()]

classes = {}
for pet in targets:
  pet[1] = int(pet[1])-1
  if pet[1] not in classes:
    classes[pet[1]] = '_'.join(pet[0].split('_')[:-1])


In [42]:
# generating list of tuples (image_path, label)

dataset_images = []
with open(os.path.join(annot_dir,'trainval.txt'), 'r') as train_annot:
  for line in train_annot.readlines():
    line = line.strip().split()
    label = int(line[1]) - 1
    img_name = line[0]
    img_path = os.path.join(images_dir,line[0]+'.jpg')

    dataset_images.append([img_path,label])

# Performing retrieval on selected subsets

In [43]:
# generating lists of query images
#{'path': image_path,
# 'true_label': image true label,
# 'pred_label': image label predicted by model,
# 'features': extracted image features}

targets = []
features_path = os.path.join(cwd,'retrieval_features')
for idx in target_indexes:
  path, true_label = dataset_images[idx]
  img = Image.open(path,'r')
  img = transforms(img)
  features = feature_extractor(img.unsqueeze(0)).squeeze([2,3])
  pred_label = mobilenet_model(img.unsqueeze(0))
  pred_label = torch.max(pred_label,1)[1].item()

  target_dict = {
      'path': path,
      'true_label': true_label,
      'pred_label': pred_label,
      'features': features.tolist()
  }
  target_path = os.path.join(features_path,str(idx))
  targets.append(target_path)
  with open(target_path, "w") as json_file:
      json.dump(target_dict, json_file)


In [44]:
references = []
for idx in reference_indexes:
  path, true_label = dataset_images[idx]
  img = Image.open(path,'r')
  img = transforms(img)
  features = feature_extractor(img.unsqueeze(0)).squeeze([2,3])
  pred_label = mobilenet_model(img.unsqueeze(0))
  pred_label = torch.max(pred_label,1)[1].item()
  references.append({
      'path': path,
      'true_label': true_label,
      'pred_label': pred_label,
      'features': features
  })

In [45]:
# calculating euclidian distance between each reference image and each image and all images in target subset
# sorting images by similarity for each reference image

for reference in references:
  ref_features = reference['features']
  distances = []
  for idx, target in enumerate(targets):
    with open(target,'r') as target_json:
      target = json.load(target_json)
      target['features'] = torch.tensor(target['features'])

    features = target['features']
    dist = torch.cdist(ref_features, features, p=2)
    distances.append((idx,dist))

  distances = sorted(distances, key = lambda item: item[1], reverse = False)
  reference['distances'] = distances


## Displaying examples

In [46]:
K = 10
row = 0
for reference in references[:5]:

  img = transforms(Image.open(reference['path']))

  if row<5:
    plt.figure(figsize = [18,5])
    plt.subplot(1,5,1)
    plt.imshow(t2img(img))
    plt.title(classes[reference['true_label']])
    plt.xlabel(f"predicted: {classes[reference['pred_label']]}")
  col = 2

  distances = reference['distances'][:K]
  for idx, distance in distances:

    with open(targets[idx],'r') as target_json:
      target = json.load(target_json)
      target['features'] = torch.tensor(target['features'])

    target_img = transforms(Image.open(target['path']))

    if col<5 and row<5:
      plt.subplot(1,5,col)
      plt.imshow(t2img(target_img))
      plt.title(f"#{col-1} nearest:\n{classes[target['true_label']]}")
      plt.xlabel(f"predicted: {classes[target['pred_label']]}")
      col+=1
  row+=1


Output hidden; open in https://colab.research.google.com to view.

# Evaluating retrieval performance

In [47]:
# counting each class occurence in target set for calculating recall
classes_count = {}
for target_json in targets:

  with open(target_json,'r') as target_json:
    target = json.load(target_json)
    target['features'] = torch.tensor(target['features'])
  label = target['true_label']
  if label not in classes_count:
    classes_count[label] = 1
  else:
    classes_count[label] += 1

In [48]:
# calculating retrieval metrics for first K images
K_ranks = [1,3,5,10, 20]

K_accuracies = {}
for K in K_ranks:
  precisions = []
  recalls = []
  first_K_appearances = 0
  for reference in references:

    label = reference['true_label']
    distances = reference['distances'][:K]
    preds = []
    for idx, distance in distances:

      with open(targets[idx],'r') as target_json:
        target = json.load(target_json)
        target['features'] = torch.tensor(target['features'])
      preds.append(target['true_label'])


    preds = np.array(preds)
    correct_preds = sum(preds == label)
    is_in_first_K = 1 if correct_preds>0 else 0
    first_K_appearances += is_in_first_K

    K_pre = correct_preds/len(preds)
    precisions.append(K_pre)
    K_rec = correct_preds/min(classes_count[label],K)
    recalls.append(K_rec)

  # average percentage of correctly retrieved images out of the first K nearest images
  K_accuracies[f'K{K} precision'] = sum(precisions)/len(precisions)
  # average percentage of relevant images that were retrieved correctly
  K_accuracies[f'K{K} recall'] = sum(recalls)/len(recalls)
  # percent of queries that returned at least one correct image in first K images
  K_accuracies[f'K{K} score'] = first_K_appearances/len(precisions)



print('K\tTop K precision\tTop K recall\tFirst K precision')
for K in K_ranks:
  K_precision = K_accuracies[f'K{K} precision']
  K_recall = K_accuracies[f'K{K} recall']
  K_score = K_accuracies[f'K{K} score']
  print(f"{K}\t{K_precision:2.2f}\t\t{K_recall:2.2f}\t\t{K_score:2.2f}")


K	Top K precision	Top K recall	First K precision
1	0.90		0.90		0.90
3	0.81		0.84		0.98
5	0.71		0.83		0.98
10	0.47		0.86		0.98
20	0.26		0.94		1.00
