<a href="https://colab.research.google.com/github/KhangTheKangaroo/Image-Retrieval/blob/main/Image_Retrieval_with_CLIP_(No_vector_database).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install chromadb
!pip install open-clip-torch

Collecting chromadb
  Downloading chromadb-0.5.5-py3-none-any.whl.metadata (6.8 kB)
Collecting chroma-hnswlib==0.7.6 (from chromadb)
  Downloading chroma_hnswlib-0.7.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (252 bytes)
Collecting fastapi>=0.95.2 (from chromadb)
  Downloading fastapi-0.112.0-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn>=0.18.3 (from uvicorn[standard]>=0.18.3->chromadb)
  Downloading uvicorn-0.30.6-py3-none-any.whl.metadata (6.6 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-3.5.0-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.18.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.3 kB)
Collecting opentelemetry-api>=1.2.0 (from chromadb)
  Downloading opentelemetry_api-1.26.0-py3-none-any.whl.metadata (1.4 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_pro

In [2]:
!gdown --id 1msLVo0g0LFmL9-qZ73vq9YEVZwbzOePF # Download the dataset
!unzip -q data.zip

Downloading...
From (original): https://drive.google.com/uc?id=1msLVo0g0LFmL9-qZ73vq9YEVZwbzOePF
From (redirected): https://drive.google.com/uc?id=1msLVo0g0LFmL9-qZ73vq9YEVZwbzOePF&confirm=t&uuid=121bd718-c637-4b9b-bc52-21ae497338b4
To: /content/data.zip
100% 76.1M/76.1M [00:00<00:00, 112MB/s]


In [3]:
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction

In [None]:
def plot_results(query_path, ls_path_score, reverse=False):
    ls_path_score.sort(key=lambda x: x[1], reverse=reverse)  # Sort by score

    # Display query image
    query_image = plt.imread(query_path)
    plt.figure(figsize=(5, 5))
    plt.imshow(query_image)
    plt.title("Query Image")
    plt.axis('off')
    plt.show()

    # Display top 5 results
    plt.figure(figsize=(20, 10))
    for i in range(5):
        image_path, score = ls_path_score[i]
        image = plt.imread(image_path)
        plt.subplot(1, 5, i + 1)
        plt.imshow(image)

        # Extract the class name from the path
        class_name = image_path.split('/')[-2]
        plt.title(f"{class_name}")

        plt.axis('off')
    plt.suptitle("Top 5 Results")
    plt.show()

In [10]:
ROOT = 'data'
CLASS_NAME = sorted(list(os.listdir(f"{ROOT}/train"))) # Get the images' classes from data

In [None]:
def read_image_from_path(path, size):
  img = Image.open(path).convert('RGB').resize(size) # Open the image from path, convert color to RGB type and resize the image
  return np.array(img) # Vectorize the img

def folder_to_images(folder, size):
  list_dir = [folder + '/' + name for name in os.listdir(folder)] # Get the images' path
  images = np.zeros(shape = (len(list_dir), *size, 3))
  images_path = []

  for i, path in enumerate(list_dir): # This step is to check if an image could be opened

    images[i] = read_image_from_path(path, size)
    images_path.append(path)

  return images, images_path

In [5]:
embedding_function = OpenCLIPEmbeddingFunction()

def get_single_image_embedding(img):
  embedding = embedding_function._encode_image(img = img) # Get features from image as a vector
  return np.array(embedding)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


open_clip_pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

In [11]:
def abs_diff(query, data):
  axis_batch_size = tuple(range(1, len(data.shape)))
  return np.sum(np.abs(data - query), axis = axis_batch_size) # Get the absolute difference of every images in data to the query image

def get_l1_score(root_img_path, query_path, size):
  query = read_image_from_path(query_path, size)
  query_embedding = get_single_image_embedding(query)
  ls_path_score = []
  for folder in os.listdir(root_img_path):
    if folder in CLASS_NAME:
      path = root_img_path + folder
      images_np, images_path = folder_to_images(path, size)
      embedding_list = []

  for idx_img in range(images_np.shape[0]):
    embedding = get_single_image_embedding(images_np[idx_img].astype(np.uint8))
    embedding_list.append(embedding)  # Add embedding to the list of embeddings for the current folder
    rates = abs_diff(query_embedding, np.stack(embedding_list))
    ls_path_score.extend(list(zip(images_path, rates)))
  return query, ls_path_score

In [14]:
def mean_square_diff(query, data):
  axis_batch_size = tuple(range(1, len(data.shape)))
  return np.mean((data - query)**2, axis = axis_batch_size)

def get_l2_score(root_img_path, query_path, size):
  query = read_image_from_path(query_path, size)
  query_embedding = get_single_image_embedding(query)
  ls_path_score = []
  for folder in os.listdir(root_img_path):
    if folder in CLASS_NAME:
      path = root_img_path + folder
      images_np, images_path = folder_to_images(path, size)
      embedding_list = []

  for idx_img in range(images_np.shape[0]):
    embedding = get_single_image_embedding(images_np[idx_img].astype(np.uint8))
    embedding_list.append(embedding) # Add embedding to the list of embeddings for the current folder
    rates = mean_square_diff(query_embedding, np.stack(embedding_list))
    ls_path_score.extend(list(zip(images_path, rates)))
  return query, ls_path_score

In [None]:
def cosine_similarity(query, data):
  axis_batch_size = tuple(range(1,len(data.shape)))
  query_norm = np.sqrt(np.sum(query**2))
  data_norm = np.sqrt(np.sum(data**2, axis=axis_batch_size))
  return np.sum(data * query, axis=axis_batch_size) / (query_norm*data_norm + np.finfo(float).eps)

def get_CosineSimilarity_score(root_img_path, query_path, size):
  query = read_image_from_path(query_path, size)
  query_embedding = get_single_image_embedding(query)
  ls_path_score = []
  for folder in os.listdir(root_img_path):
    if folder in CLASS_NAME:
      path = root_img_path + folder
      images_np, images_path = folder_to_images(path, size)
      embedding_list = []

  for idx_img in range(images_np.shape[0]):
    embedding = get_single_image_embedding(images_np[idx_img].astype(np.uint8))
    embedding_list.append(embedding) # Add embedding to the list of embeddings for the current folder
    rates = cosine_similarity(query_embedding, np.stack(embedding_list))
    ls_path_score.extend(list(zip(images_path, rates)))
  return query, ls_path_score

In [None]:
def correlation_coefficient(query, data):
  axis_batch_size = tuple(range(1,len(data.shape)))
  query_mean = query - np.mean(query)
  data_mean = data - np.mean(data, axis=axis_batch_size, keepdims=True)
  query_norm = np.sqrt(np.sum(query_mean**2))
  data_norm = np.sqrt(np.sum(data_mean**2, axis=axis_batch_size))

  return np.sum(data_mean * query_mean, axis=axis_batch_size) / (query_norm*data_norm + np.finfo(float).eps)

def get_CorrCoef_score(root_img_path, query_path, size):
  query = read_image_from_path(query_path, size)
  query_embedding = get_single_image_embedding(query)
  ls_path_score = []
  for folder in os.listdir(root_img_path):
    if folder in CLASS_NAME:
      path = root_img_path + folder
      images_np, images_path = folder_to_images(path, size)
      embedding_list = []

  for idx_img in range(images_np.shape[0]):
    embedding = get_single_image_embedding(images_np[idx_img].astype(np.uint8))
    embedding_list.append(embedding) # Add embedding to the list of embeddings for the current folder
    rates = correlation_coefficient(query_embedding, np.stack(embedding_list))
    ls_path_score.extend(list(zip(images_path, rates)))
  return query, ls_path_score