In [44]:
# Mount drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [45]:
# Imports
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import json
import pickle
import math
import random
import torch
from torchvision.datasets import Caltech101
from torchvision import models, transforms
from torchvision.models  import resnet50, ResNet50_Weights
from scipy.special import softmax

from scipy.spatial.distance import euclidean, cosine, minkowski, correlation
from sklearn.metrics.pairwise import euclidean_distances
from PIL import Image


In [46]:
# Path to drive folder
path = '/content/drive/MyDrive/CSE515_Phase3'

In [47]:
# Caltech101 Data
data = Caltech101(root = f'{path}/data', download = True)

Files already downloaded and verified


In [48]:
# Load the features and label image mapping
with open(f'{path}/feature_descriptors.json','r') as fp:
    feature_descriptors = json.load(fp)

feature_data = pd.DataFrame(feature_descriptors).T.reset_index(names="id")
input_vectors = np.array(feature_data["layer_3"].tolist())

with open(f'{path}/label_image_map.json','r') as fp:
    label_image_map = json.load(fp)

In [49]:
# Get the Resnet50 output for an image by hooks in intermediate layers: layer3, avgpool, fc
def computeResNet50Vectors(imageId, dataset):

    image, label = dataset[imageId]
    if (image.mode == 'L') :
      image = image.convert("RGB")
    with torch.no_grad():
        resnet = models.resnet50(pretrained=True)          # Loading the ResNet50 model
        hook_output_avg_pool = []                          # list to store the ResNet50 avg pool layer output
        hook_output_layer3 = []                            # list to store the ResNet50 layer-3 layer output
        hook_output_fc = []                                # list to store the ResNet50 Full Connected layer output


        # Hook defined to capture the feature vectors from the avg_pool layer
        def hook_fn(module, input, output):
            hook_output_avg_pool.append(output)

        # Hook defined to capture the feature vectors from the Layer3
        def hook_fn_layer3(module, input, output):
            hook_output_layer3.append(output)

        # Hook defined to capture the feature vectors from the Fully Connected layer
        def hook_fn_fc(module, input, output):
            hook_output_fc.append(output)


        # Registering the defined hook as the forward hook on the avg pool layer on the ResNet 50 model
        avgpool_layer = resnet.avgpool
        avgpool_layer.register_forward_hook(hook_fn)

        # Registering the defined hook as the forward hook on the layer-3 on the ResNet 50 model
        layer3 = resnet.layer3
        layer3.register_forward_hook(hook_fn_layer3)

        # Registering the defined hook as the forward hook on the fully connected layer on the ResNet 50 model
        fc_layer = resnet.fc
        fc_layer.register_forward_hook(hook_fn_fc)

        # Performing tranform operations on the image to resize it and retrieve the tensors from each layer
        transform = transforms.Compose([transforms.Resize((224,224)), transforms.ToTensor()])
        image = transform(image).unsqueeze(0)
        resnet(image)                # Passing the image to the ResNet Model


        #Storing the output of the hooks in the predefined lists
        avgpool_vector = hook_output_avg_pool[0][0]
        layer3_vector = hook_output_layer3[0][0]
        fc_vector = hook_output_fc[0][0]


        avgpool_vector = avgpool_vector.squeeze().detach().numpy().reshape(1024, 2)           # Converting the vectors from tensor to numpy to perform mathematical operations
        reduced_avg_pool_dimensionality_vector = np.mean(avgpool_vector, axis = 1)            # Dimensional Reduction is perfromed by averaging up the consecutive numbers and reducing the resultant array to 1D

        layer3_vector = layer3_vector.detach().numpy()                                        # Converting the vectors from tensor to numpy to perform mathematical operations
        reduced_layer3_dimensionality_vector = np.mean(layer3_vector, axis = (1, 2))          # Dimensional Reduction is perfromed by averaging up the 14, 14 slice and reducing the resultant array to 1D

        fc_vector = fc_vector.detach().numpy()                                                            # Converting the vectors from tensor to numpy to perform mathematical operations
        resnet = softmax(fc_vector)
    return (reduced_avg_pool_dimensionality_vector, reduced_layer3_dimensionality_vector, fc_vector, resnet)  # returning the feature descriptors of all 3 layers in a tuple

In [50]:
# SVM

In [51]:
class SVM:
    def _init_(self, learning_rate=0.001, lambda_param=0.01, n_iters=1000):
        self.lr = learning_rate
        self.lambda_param = lambda_param
        self.n_iters = n_iters
        self.w = None
        self.b = None

    def fit(self, X, y):
        n_samples, n_features = X.shape

        y_ = np.where(y <= 0, -1, 1)

        # init weights
        self.w = np.zeros(n_features)
        self.b = 0

        for _ in range(self.n_iters):
            for idx, x_i in enumerate(X):
                condition = y_[idx] * (np.dot(x_i, self.w) - self.b) >= 1
                if condition:
                    self.w -= self.lr * (2 * self.lambda_param * self.w)
                else:
                    self.w -= self.lr * (2 * self.lambda_param * self.w - np.dot(x_i, y_[idx]))
                    self.b -= self.lr * y_[idx]


    def predict(self, X):
        approx = np.dot(X, self.w) - self.b
        return np.sign(approx)

In [52]:
# SVM Model for Multi-Label Classification (One-vs-Rest)
class MultiLabelSVM:
    def __init__(self, learning_rate=0.001, num_epochs=10000):
        self.learning_rate = learning_rate
        self.num_epochs = num_epochs
        self.weights = None
        self.bias = None

    def train(self, X, y, label):
        num_samples, num_features = X.shape
        self.weights = np.zeros(num_features)
        self.bias = 0

        # Convert multi-labels to binary labels (1 for the target label, -1 for others)
        binary_labels = np.where(y == label, 1, -1)

        for epoch in range(self.num_epochs):
            # SVM decision function
            scores = np.dot(X, self.weights) + self.bias

            # Hinge loss
            margin = 1 - binary_labels * scores
            margin[margin < 0] = 0

            # Gradient descent update
            gradient_weights = -np.dot(X.T, binary_labels * (margin > 0))
            gradient_bias = -np.sum(binary_labels * (margin > 0))

            self.weights -= self.learning_rate * gradient_weights
            self.bias -= self.learning_rate * gradient_bias

    def predict(self, X):
        return np.dot(X, self.weights) + self.bias


In [53]:
# Task4b Output

In [54]:
# LSH and get t images code
class LSH:
  def __init__(self, L, h, input_vectors):
    self.L = L
    self.h = h
    self.W = 0.0
    self.vectors = input_vectors
    self.lsh_hyperplanes = {}
    self.lsh_hyperplane_ranges = {}
    self.hash_tables = [{} for _ in range(self.L)]

  def create_index(self):
    self.init_random_hyperplanes()
    self.W = min(self.lsh_hyperplane_ranges.values()) / float(50)

    for vector_id, vector in enumerate(self.vectors):
      for layer_id in range(self.L):
        bucket_key = self.get_bucket_key(vector, layer_id)
        if bucket_key not in self.hash_tables[layer_id]:
          self.hash_tables[layer_id][bucket_key] = set()
        self.hash_tables[layer_id][bucket_key].add(vector_id)

  def init_random_hyperplanes(self):
    # Define the origin vector with zeros, size should match the number of features (1024)
    origin_vector = np.zeros(self.vectors.shape[1])

    # Iterate through each layer and hash
    for layer_id in range(self.L):
      for hash_id in range(self.h):
        random_hyperplane_vector = []

        # Iterate through each feature
        for col in range(self.vectors.shape[1]):
          # Get min and max of the current feature across all data points
          min_val = self.vectors[:, col].min()
          max_val = self.vectors[:, col].max()

          # Get a random value between min and max
          random_val = random.uniform(min_val, max_val)
          random_hyperplane_vector.append(random_val)

        random_hyperplane_vector = np.array(random_hyperplane_vector)

        # Store the hyperplane vector and its distance from the origin
        hyperplane_key = (layer_id, hash_id)
        self.lsh_hyperplanes[hyperplane_key] = random_hyperplane_vector
        self.lsh_hyperplane_ranges[hyperplane_key] = euclidean(origin_vector, random_hyperplane_vector)

  def get_bucket_key(self, vector, layer_id):
    bucket_key = []
    for hash_id in range(self.h):
      hyperplane = self.lsh_hyperplanes[(layer_id, hash_id)]
      projected_value = self.project_on_hyperplane(vector, hyperplane)
      bucket_key.append(self.assign_vector(projected_value))
    return tuple(bucket_key)

  def project_on_hyperplane(self, input_vector, lsh_vector):
    dp = np.dot(input_vector, lsh_vector)
    if dp == 0.0:
      return 0
    projection = dp/np.dot(lsh_vector, lsh_vector)*lsh_vector
    magnitude = np.linalg.norm(projection)
    return magnitude

  def assign_vector(self, value):
    if value < 0:
      return math.floor(value/self.W)
    else:
      return math.ceil(value/self.W)

  def query(self, query_vector):
    results = []
    for layer_id in range(self.L):
      bucket_key = self.get_bucket_key(query_vector, layer_id)
      results.extend(list(self.hash_tables[layer_id].get(bucket_key, set())))
    return results

  def get_adjacent_buckets(self, bucket_key, layer_id):
    adjacent_keys = []

    mini = min(min(self.hash_tables[layer_id].keys()))
    maxi = max(max(self.hash_tables[layer_id].keys()))

    for delta in range(1, maxi - mini + 1):
      for i in range(len(bucket_key)):
        modified_key = list(bucket_key)
        modified_key[i] -= delta
      adjacent_keys.append(tuple(modified_key))
      for i in range(len(bucket_key)):
        modified_key = list(bucket_key)
        modified_key[i] += delta
      adjacent_keys.append(tuple(modified_key))

    return adjacent_keys

  def expanded_lsh_query(self, query_vector, t):
    results = self.query(query_vector)

    # Check if we need to expand the search
    if len(set(results)) >= t:
      return results

    # Expand to adjacent buckets
    for layer_id in range(self.L):
      bucket_key = self.get_bucket_key(query_vector, layer_id)
      for adj_key in self.get_adjacent_buckets(bucket_key, layer_id):
        if adj_key in self.hash_tables[layer_id].keys() and adj_key != bucket_key:
          results.extend(list(self.hash_tables[layer_id].get(adj_key, set())))
          if len(set(results)) >= t:
              return results

    return results

  def print_bucket_sizes(self):
    for layer_id, layer in enumerate(self.hash_tables):
      print(f"Layer {layer_id}:")
      for bucket_key, bucket in layer.items():
        print(f"  Bucket {bucket_key}: {len(bucket)} images")

In [55]:
def get_lsh(lsh, query_image_index, t):
  resnet_outputs = computeResNet50Vectors(query_image_index, data)
  query_vector = resnet_outputs[1]

  candidates_union = lsh.expanded_lsh_query(query_vector, t)

  candidates_id = list(set(candidates_union))

  candidate_vectors = [lsh.vectors[i] for i in candidates_id]

  if len(candidates_id) < t:
    distances = euclidean_distances([query_vector], input_vectors)[0]
    sorted_indices = np.argsort(distances)

    similar_images = [(i * 2, distances[i]) for i in sorted_indices[:t]]

    return similar_images

  # Calculate Euclidean distances
  distances = euclidean_distances([query_vector], candidate_vectors)[0]

  # Sort candidates by distance
  sorted_indices = np.argsort(distances)

  # Visualize the t most similar images
  similar_images = [(list(candidates_id)[i]*2, distances[i]) for i in sorted_indices[:t]]

  return similar_images

In [56]:
# Load the precalculated LSH
def load_lsh_index_from_file(filename):
  with open(filename, 'rb') as file:
    lsh = pickle.load(file)
  return lsh

In [57]:
# display images
def display_images(image_indices, dataset, w, h):
    fig, axs = plt.subplots(1, len(image_indices), figsize=(30, 10))
    for i,(index, score) in enumerate(image_indices):
        image_array = np.array(dataset[int(index)][0])  # Convert JpegImageFile to NumPy array
        image = Image.fromarray(image_array)
        axs[i].imshow(image.resize((w, h)))
        axs[i].axis('off')
        axs[i].set_title(f"Image {int(index)}")
    plt.show()

In [58]:
# Get binary features for image
def get_binary(feature_spc, images):
  #calculating threshold to convert features to boolean values
  thresholds = np.mean(feature_spc, axis=0) + np.std(feature_spc, axis=0)
  binary_data = (feature_spc > thresholds).astype(int)
  binary_imgid_ftr = {}
  for i, tupl in enumerate(images):
    binary_imgid_ftr[tupl[0]] = binary_data[i]
  return binary_imgid_ftr

In [59]:
# Display Images
def display_given_count_images(sorted_keys, number):
  fig, axs = plt.subplots(1, len(sorted_keys[:number]), figsize=(30, 10))
  for i, index in enumerate(sorted_keys[:number]):
      image_array = np.array(data[int(index)][0])
      image = Image.fromarray(image_array)
      axs[i].imshow(image.resize((200, 200)))
      axs[i].axis('off')
      axs[i].set_title(f"Id-{int(index)}")
  plt.show()

In [60]:
# Calculate feature significance
def feature_wgts_from_relevance(relv_of_pred, binary_ftr):
  vry_rel_obj = []
  rel_obj = []
  irrel_obj = []
  vry_irrel_obj = []
  for imgId in relv_of_pred.keys():
    if relv_of_pred[imgId] == '2':
      vry_rel_obj.append(binary_ftr[imgId])
    elif relv_of_pred[imgId] == '1':
      rel_obj.append(binary_ftr[imgId])
    elif relv_of_pred[imgId] == '-1':
      irrel_obj.append(binary_ftr[imgId])
    elif relv_of_pred[imgId] == '-2':
      vry_irrel_obj.append(binary_ftr[imgId])

  vry_rel_obj = np.array(vry_rel_obj)
  rel_obj = np.array(rel_obj)
  irrel_obj = np.array(irrel_obj)
  vry_irrel_obj = np.array(vry_irrel_obj)

  # Calculating P(f=1|R) P(f=0|R), P(f=1|IR) P(f=0|IR)
  Pf1_rel = np.zeros(1024)
  Pf0_rel = np.zeros(1024)
  Pf1_irrel = np.zeros(1024)
  Pf0_irrel = np.zeros(1024)
  if len(vry_rel_obj)>0:
    Pf1_rel = np.sum(vry_rel_obj, axis = 0) / (len(vry_rel_obj) + 1)
  if len(rel_obj)>0:
    Pf1_rel += (np.sum(rel_obj, axis = 0) * 0.75) / (len(rel_obj) + 1)
  if len(vry_rel_obj)>0 and len(rel_obj)>0:
    Pf1_rel = Pf1_rel/2
  Pf1_rel[Pf1_rel == 0] = 0.5 / (len(vry_rel_obj) + len(rel_obj) + 1)
  Pf0_rel = 1 - Pf1_rel

  if len(vry_irrel_obj)>0:
    Pf1_irrel = np.sum(vry_irrel_obj, axis = 0) / (len(vry_irrel_obj) + 1)
  if len(irrel_obj)>0:
    Pf1_irrel += (np.sum(irrel_obj, axis = 0)*0.75) / (len(irrel_obj) + 1)
  if len(vry_irrel_obj)>0 and len(irrel_obj)>0:
    Pf1_irrel = Pf1_irrel/2
  Pf1_irrel[Pf1_irrel == 0] = 0.5 / (len(vry_irrel_obj) + len(irrel_obj) + 1)
  Pf0_irrel = 1 - Pf1_irrel

  # print(Pf0_rel.min())
  # print(Pf1_irrel.min())
  P = np.log((Pf1_rel*Pf0_irrel)/(Pf0_rel*Pf1_irrel)) # weights of all the features calculated with respect to the relevance feedback
  return P

### RF loop

In [61]:
# Probability based Relevance Feedback
def prob_based_rf():
  while True:
    query_image = int(input('Enter the query image id: '))
    t = int(input('Enter no. of images t: '))
    display(data[int(query_image)][0].resize((150,150)))
    lsh_index_structure = load_lsh_index_from_file(f'{path}/LSH_index_structures/lsh_index_structure_10_3.pkl')
    images = get_lsh(lsh_index_structure, query_image, 30) # Get some initial images and scores

    features_of_pred = []
    for img_id, score in images:
      features_of_pred.append(feature_descriptors[str(img_id)]['layer_3'])

    binary_ftr = get_binary(np.array(features_of_pred), images)
    relv_of_pred = {}
    dis = t # number of images to be dispalyed, can be changed if user wants to give feedback to more or less number of images than intially mentioned

    print('Feedback relevancy: 2 for Very relevant | 1 for relevant | -1 for irrelevant | -2 for very irrelevant | 0 to skip->')
    display_images(images[:t],data, 200, 200)
    User_q=0
    for img_id, score in images:
      if User_q < t:
        relevancy = input(f'Enter the relevancy of {img_id}: ')
        relv_of_pred[img_id] = relevancy
      User_q+=1

    while True:
      ftr_weights = feature_wgts_from_relevance(relv_of_pred, binary_ftr)
      obj_rel = {}
      # weighing the feature vectors of our images
      for imgId, score in images:
        obj_rel[imgId] = (np.dot(ftr_weights, binary_ftr[imgId]))

      sorted_keys = sorted(obj_rel, key=obj_rel.get, reverse=True)

      display_given_count_images(sorted_keys, dis)

      x = input('Do you want to continue giving feedback[Y/N]')
      if x in ['Y','y']:
        feedback_cnt = int(input('Enter the Number of images you want to view to give feedback:'))
        if(feedback_cnt > dis):
          display_given_count_images(sorted_keys, feedback_cnt)
        print('Feedback relevancy: 2 for Very relevant | 1 for relevant | -1 for irrelevant | -2 for very irrelevant | 0 if no feedback for an image ->')
        for img_id in sorted_keys[:feedback_cnt]:
          relevancy = input(f'Enter the relevancy of {img_id}: ')
          if relevancy != 0:
            relv_of_pred[img_id] = relevancy
      else:
        break
    q = input('Do you have another query?[Y/N]')
    if q in ['N','n']:
      break

In [64]:
# Main FeedBack loop to get t images, User selected approach
feedback_model = int(input('Enter the feedback model to use 1)SVM, 2)Probabilistic Relevance feedback model '))
if feedback_model==1:
  exit = True
  query_image = int(input('Enter the query image id: '))
  t = int(input('Enter no. of images t: '))
  display(data[int(query_image)][0].resize((150,150)))
  lsh_index_structure = load_lsh_index_from_file(f'{path}/LSH_index_structures/lsh_index_structure_10_3.pkl')
  images = get_lsh(lsh_index_structure, query_image, t) # Get initial t images
  if len(images)<t:
      print('Not enough Images in LSH Bucket')
  tagged={}
  labels = ['2','1','-1','-2']
  # Create a multi-label SVM classifier using One-vs-Rest
  num_classes = 4
  svm_classifiers = [MultiLabelSVM() for _ in range(num_classes)]
  j = 2
  print('Feedback relevancy: 2 for Very relevant | 1 for relevant | -1 for irrelevant | -2 for very irrelevant ->')
  while exit:
      display_images(images,data, 200, 200)

      for img_id, score in images:
          relevancy = input(f'Enter the relevancy of {img_id}: ')
          tagged[img_id] = [score, relevancy]

      X_train,y_train = [],[]
      for i in tagged:
          X_train.append(feature_descriptors[str(i)]['layer_3'])
          y_train.append(tagged[i][1])

      for i,label in enumerate(labels):
          svm_classifiers[i].train(np.array(X_train), np.array(y_train), label)

      # Expand query range
      images_new = get_lsh(lsh_index_structure, query_image, j*t)
      images_new2 = []
      X_test = []
      for img_id, score in images_new:
          if img_id not in tagged:
              images_new2.append((img_id, score))
              X_test.append(feature_descriptors[str(img_id)]['layer_3'])
      if len(X_test)==0:
          print('Not enough Images in LSH Bucket')
          break
      # Make predictions on the test set
      predictions = np.array([classifier.predict(np.array(X_test)) for classifier in svm_classifiers])
      # Choose the label with the highest confidence as the predicted label
      predicted_labels = np.argmax(predictions, axis=0)

      pred = []
      for i in predicted_labels:
          pred.append(labels[i])

      img_train = []
      for i in tagged:
          img_train.append(((i,tagged[i][0]),tagged[i][1]))

      img_test = list(zip(images_new2, pred))
      img_train.extend(img_test)
      img_train = sorted(img_train, key = lambda x:x[1], reverse = True)

      images = []
      for i in range(t):
          images.append(img_train[i][0])
      j+=1
      exit = input('Do you want to exit, y/n : ') in ['N','n']
elif feedback_model==2:
  prob_based_rf()

Output hidden; open in https://colab.research.google.com to view.