# LAB 5: Image search using CLIP

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/biodatlab/xlab-recommendation/blob/main/inclass_notebooks/05_CLIP_image_search.ipynb)

* Dataset ref: https://www.kaggle.com/competitions/h-and-m-personalized-fashion-recommendations/overview
    * images in dataset use in this notebook are resized images from H&M personalized fashion recommendations (resize to 100 * 100 pixel)
    * contains 100k+ images
    * mounted on google drive: https://drive.google.com/drive/folders/1jX1hasS6HysjEuKG0ucmTxdndB03uliJ?usp=sharing

* Objectives
    * find and recommend clothes for customer using image/text search

* Notes
    * openai-clip: https://github.com/openai/CLIP
    * faiss: https://github.com/facebookresearch/faiss/wiki
    * please change runtime on google colab for faster computation
    * try out saved embeddings at [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/biodatlab/xlab-recommendation/blob/notebook/solution_notebooks/06_CLIP_image_search_pretrained.ipynb)

In [None]:
# install library

! pip install torch ftfy regex tqdm numpy
! pip install openai-clip
! pip install gradio
! pip install gdown

In [None]:
# import essential library

import os
import os.path as op
from PIL import Image
from zipfile import ZipFile

import numpy as np
from tqdm import tqdm
import torch

import clip

In [None]:
# check available runtime

device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda": 
  ! pip install faiss-gpu 
else:
  ! pip install faiss-cpu 

print("Now running with " + device)

In [None]:
# see openai-clip available pre-train model

clip.available_models()

In [None]:
# load Vit-B/32 model

model, preprocess = clip.load("ViT-B/32", device=device)

In [None]:
# download resized image dataset from shared google drive

import gdown

url = "https://drive.google.com/drive/folders/1jX1hasS6HysjEuKG0ucmTxdndB03uliJ?usp=sharing"
gdown.download_folder(url, use_cookies=False)

In [None]:
# extract dataset zipped file
path =  op.join(os.getcwd(),"h-and-m-resize-image-zip/h-and-m-resize-image.zip")
  
# opening the zip file in READ mode
with ZipFile(path, 'r') as zip:

    # extracting all the files
    print('Extracting all the files now...')
    %time zip.extractall()
    print('Done!')

In [None]:
# load dataset

dataset_path = op.join(os.getcwd(), "resized_images/")

# create list of all filename in dataset folder

all_folder_path = os.listdir(dataset_path)
all_folder_path.sort()

In [None]:
# check folder path

print(all_folder_path)

In [None]:
# count total number of files in all sub-folders to allocate numpy array for saving embeddings

num_file = 0
images_path = []

for folder in all_folder_path:
    temp_all_image_name = os.listdir(op.join(dataset_path, folder))
    temp_all_image_name.sort()
    for image in temp_all_image_name:
        if op.isfile(op.join(dataset_path, folder, image)):
            images_path.append(op.join(folder, image))

In [None]:
# TODO: allocate memory for embeddings array with shape of (number_of_files,512), Hint: create arrays of zeros

embeddings_storage = ...

# encode dataset & store images name
file_counter = 0

for path in tqdm(images_path):
    with torch.no_grad():
        image = (
            preprocess(Image.open(op.join(dataset_path, path))).unsqueeze(0).to(device)
        )
        # TODO: encode images using CLIP model, Hint: use previous define "model" 
        embeddings_storage[file_counter] = np.array(...)
        file_counter += 1

In [None]:
# create embeddings vector using FAISS
import faiss

index = faiss.IndexFlatL2(
    512
)  # dimension of 1 embedding decoded from CLIP model is 512
index.add(embeddings_storage)

In [None]:
# add embeddings into faiss vector

print(index.ntotal)  # number of images embeddings store in dataset vector

In [None]:
# recommend from images

import gradio as gr


def recommend_similar_image(image_path):
    print(f"get image path {image_path}")

    test_image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)

    with torch.no_grad():
        test_embeddings = (
            model.encode_image(test_image).numpy(force=True)[0].astype("float32")
        )
        test_embeddings = np.array([test_embeddings])

    k = 4  # number of recommendations
    square_distance, image_index = index.search(test_embeddings, k)
    print(image_index)
    print(square_distance)

    print("Opening Images...")
    recommended_images = [
        (
            Image.open(op.join(dataset_path, images_path[image_index[0][i]])),
            f"Recommended Rank {i+1}",
        )
        for i in range(k)
    ]
    return recommended_images


example_path = []
demo = gr.Interface(
    fn=recommend_similar_image,
    inputs=gr.Image(type="filepath"),
    outputs=gr.Gallery(),
)

demo.launch(debug=True)

In [None]:
# recommend from text


def recommend_similar_image(text):
    # print(f"get image path {image_path}")
    # original_image = Image.open(image_path).resize((100,100))
    # test_image = preprocess(original_image).unsqueeze(0).to(device)
    text = clip.tokenize([text]).to(device)
    with torch.no_grad():
        test_embeddings = model.encode_text(text).numpy(force=True)[0].astype("float32")
        test_embeddings = np.array([test_embeddings])

    k = 10  # number of recommendations
    square_distance, image_index = index.search(test_embeddings, k)
    print(image_index)
    print(square_distance)

    print("Opening Images...")
    recommended_images = [
        (
            Image.open(op.join(dataset_path, images_path[image_index[0][i]])),
            f"Recommended Rank {i+1}",
        )
        for i in range(k)
    ]
    return recommended_images


example_path = []
demo = gr.Interface(
    fn=recommend_similar_image,
    inputs=gr.Textbox(),
    outputs=gr.Gallery(),
)

demo.launch(debug=True)