<a href="https://colab.research.google.com/github/Chandana-Malgireddy/MLProject/blob/main/Final_Multimodal_Semantic_Search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
uploaded = files.upload()  # Upload kaggle.json here


In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
!mkdir -p /content/data
!kaggle datasets download -d vikashrajluhaniwal/fashion-images -p /content/data
!unzip -q "/content/data/*.zip" -d /content/data/fashion-images


In [None]:
!pip install -q kaggle transformers datasets torch torchvision pillow faiss-cpu gradio pandas


In [None]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPModel, CLIPProcessor, get_linear_schedule_with_warmup


In [None]:
LR = 1e-5
NUM_EPOCHS = 5
WARMUP_RATIO = 0.05
BATCH_SIZE = 64   # GPU-friendly
BASE_DIR = "/content/data/fashion-images/data"


In [None]:
df = pd.read_csv("/content/data/fashion-images/data/fashion.csv")
# Build absolute paths
df["image_path"] = df.apply(
    lambda row: os.path.join(
        BASE_DIR,
        row["Category"],
        row["Gender"],
        "Images",
        "images_with_product_ids",
        row["Image"]
    ),
    axis=1
)

print(df.head())


In [None]:
df = df[df["image_path"].apply(os.path.exists)].reset_index(drop=True)
print("Images available:", len(df))


In [None]:
df["caption"] = (
    "Gender: " + df["Gender"] + "; "
    "Color: " + df["Colour"] + "; "
    "Category: " + df["Category"] + "; "
    "ProductType: " + df["ProductType"] + "; "
    "SubCategory: " + df["SubCategory"] + "; "
    "Usage: " + df["Usage"]
)


In [None]:
class FashionDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row["image_path"]).convert("RGB")
        txt = row["caption"]
        return {"image": img, "text": txt}


In [None]:
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
#For images:Resize/crop to CLIP’s expected size (e.g. 224×224),Normalize with CLIP’s mean/std,
#Stack into a tensor pixel_values of shape (batch_size, 3, H, W).

#For texts:Tokenize the strings,Add special tokens,
#padding=True → pad all sequences in the batch to the same length,
#truncation=True → cut off texts that are too long,
def collate_fn(batch):
    images = [b["image"] for b in batch]
    texts  = [b["text"]  for b in batch]

    encoding = processor(
        images=images,
        text=texts,
        padding=True,
        truncation=True,
        return_tensors="pt"
    )
    return encoding


In [None]:
!pip install imagehash


In [None]:
import imagehash
from PIL import Image
def compute_phash(img_path):
    try:
        img = Image.open(img_path).convert("RGB")
        return str(imagehash.phash(img))
    except:
        return None

df["phash"] = df["image_path"].apply(compute_phash)
df = df.drop_duplicates(subset=["phash"]).reset_index(drop=True)
print("Unique images after pHash:", len(df))

In [None]:
from sklearn.model_selection import train_test_split


# ======================================================
# 80/20 TRAIN–TEST SPLIT
# ======================================================
train_df, test_df = train_test_split(df,test_size=0.2,random_state=42,shuffle=True)

print("Train samples:", len(train_df))
print("Test samples:", len(test_df))

train_dataset = FashionDataset(train_df)
test_dataset  = FashionDataset(test_df)

train_loader = DataLoader(train_dataset,batch_size=BATCH_SIZE,shuffle=True,collate_fn=collate_fn)
test_loader = DataLoader(test_dataset,batch_size=BATCH_SIZE,shuffle=False,collate_fn=collate_fn)



In [None]:
use_cuda = torch.cuda.is_available()
DEVICE = torch.device("cuda" if use_cuda else "cpu")

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(DEVICE)
for p in model.vision_model.parameters():
    p.requires_grad = False

print("Using:", DEVICE)


In [None]:
test = processor(text=["hello"], return_tensors="pt").to(DEVICE)
out = model.get_text_features(**test)
print("NaN in output:", torch.isnan(out).any())
print("Output sample:", out[0][:5])



In [None]:
for p in model.vision_model.parameters():
    p.requires_grad = False


In [None]:
trainable_params = [p for p in model.parameters() if p.requires_grad]

optimizer = torch.optim.AdamW(trainable_params, lr=LR)
num_steps = NUM_EPOCHS * len(train_loader)

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    int(num_steps * WARMUP_RATIO),
    num_steps
)


In [None]:
from tqdm.auto import tqdm
import time

MAX_GRAD_NORM = 1.0
def train_epoch(epoch):
    model.train()
    total_loss = 0.0
    start = time.time()

    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} TRAIN"):
        batch = {k: v.to(DEVICE) for k, v in batch.items()}   # <<< FIX
        out = model(**batch, return_loss=True)
        loss = out.loss
        if torch.isnan(loss):
            print(" NaN detected — abort epoch")
            return
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(trainable_params, MAX_GRAD_NORM)
        optimizer.step()
        scheduler.step()
        total_loss += loss.item()
    avg = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} avg loss: {avg:.4f}")

@torch.no_grad()
def validate():
    model.eval()
    total_loss = 0.0

    for batch in tqdm(test_loader, desc="Test"):
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        out = model(**batch, return_loss=True)
        total_loss += out.loss.item()

    avg = total_loss / len(test_loader)
    print(f"Test Loss: {avg:.4f}")
    return avg



In [None]:
for epoch in range(NUM_EPOCHS):
    train_epoch(epoch)
    validate()


In [None]:
from torch.utils.data import DataLoader
@torch.no_grad()
def compute_image_embeddings(model, processor, df):
    model.eval()
    all_embs = []
    paths = df["image_path"].tolist()
    loader = DataLoader(paths, batch_size=32, shuffle=False)
    for batch in loader:
        imgs = [Image.open(p).convert("RGB") for p in batch]
        enc = processor(images=imgs, return_tensors="pt").to(DEVICE)
        # NO .half() here
        feats = model.get_image_features(**enc)
        # ensure FP32 and normalize
        feats = feats.float()
        feats = feats / feats.norm(dim=-1, keepdim=True)
        all_embs.append(feats.cpu().numpy())
    return np.vstack(all_embs)
image_embs = compute_image_embeddings(model, processor, df)
print("image_embs:", image_embs.shape)


In [None]:
@torch.no_grad()
def compute_text_embeddings(model, processor, df):
    model.eval()
    all_embs = []
    caps = df["caption"].tolist()
    loader = DataLoader(caps, batch_size=32, shuffle=False)
    for batch in loader:
        enc = processor(
            text=batch,
            return_tensors="pt",
            padding=True,
            truncation=True
        ).to(DEVICE)
        feats = model.get_text_features(
            input_ids=enc["input_ids"],
            attention_mask=enc["attention_mask"])
        feats = feats.float()
        feats = feats / feats.norm(dim=-1, keepdim=True)
        all_embs.append(feats.cpu().numpy())
    return np.vstack(all_embs)
text_embs = compute_text_embeddings(model, processor, df)
print("text_embs:", text_embs.shape)


In [None]:
!pip install -q faiss-cpu gradio


In [None]:
import faiss
import numpy as np

image_embs_f32 = image_embs.astype("float32")
D = image_embs_f32.shape[1]
index = faiss.IndexFlatIP(D)
index.add(image_embs_f32)
print("FAISS index size:", index.ntotal)




In [None]:
from typing import List
from PIL import Image

@torch.no_grad()
def encode_text_query(query: str) -> np.ndarray:
    model.eval()
    enc = processor(
        text=[query],
        return_tensors="pt",
        truncation=True,
        padding=True
    )
    enc = {k: v.to(DEVICE) for k, v in enc.items()}
    feats = model.get_text_features(
        input_ids=enc["input_ids"],
        attention_mask=enc["attention_mask"])
    feats = feats / feats.norm(dim=-1, keepdim=True)
    return feats.cpu().numpy().astype("float32")



def search_text_to_image(query: str, top_k: int = 5):
    q = encode_text_query(query)
    scores, idxs = index.search(q, top_k * 10)  # fetch more

    seen = set()
    results = []

    for score, idx in zip(scores[0], idxs[0]):
        row = df.iloc[idx]
        img_hash = row["phash"]

        if img_hash in seen:
            continue

        seen.add(img_hash)
        results.append((row["image_path"], row["caption"], float(score)))

        if len(results) == top_k:
            break

    return results

# quick test
results = search_text_to_image("men red sneakers", top_k=5)
#results_keyword= keyword_search("women red shoe", top_k=5)
for p, t, s in results:
    print(s, "->", t, "|", p)



In [None]:
# Cosine similarity matrix (N x N)
sims = text_embs @ image_embs.T   # both are normalized
print("Similarity matrix:", sims.shape)


In [None]:
import numpy as np


sims = text_embs @ image_embs.T   # (N x N)
print("Similarity matrix computed:", sims.shape)



def compute_ranks(sim_matrix):
    N = sim_matrix.shape[0]
    ranks = []

    for i in range(N):
        order = np.argsort(-sim_matrix[i])   # descending
        rank = int(np.where(order == i)[0][0])
        ranks.append(rank)

    return np.array(ranks)


def recall_at_k(ranks, k):
    return np.mean(ranks < k)


def mean_average_precision(ranks):
    # Only one relevant item → AP = 1/(rank+1)
    ap = 1.0 / (ranks + 1)
    return float(ap.mean())



ranks = compute_ranks(sims)

R1  = recall_at_k(ranks, 1)
R5  = recall_at_k(ranks, 5)
R10 = recall_at_k(ranks, 10)

MedR = np.median(ranks)
MnR  = np.mean(ranks)
mAP  = mean_average_precision(ranks)





print(f"Total samples evaluated: {len(ranks)}\n")

print(f" Recall@1   : {R1:.4f}")
print(f" Recall@5   : {R5:.4f}")
print(f" Recall@10  : {R10:.4f}\n")
print(f" Median Rank: {MedR:.2f}")
print(f" Mean Rank  : {MnR:.2f}\n")
print(f" mAP        : {mAP:.4f}")



In [None]:
def gradio_search(query: str, top_k: int = 5):
    res = search_text_to_image(query, top_k=top_k)
    images = [r[0] for r in res]  # image paths
    captions = [f"{r[1]} (score={r[2]:.3f})" for r in res]
    return images, "\n\n".join(captions)


In [None]:
@torch.no_grad()
def encode_single_image(path: str) -> np.ndarray:
    img = Image.open(path).convert("RGB")
    enc = processor(images=[img], return_tensors="pt").to(DEVICE)
    if use_cuda:
        enc["pixel_values"] = enc["pixel_values"].half()
    feats = model.get_image_features(**enc)
    feats = feats / feats.norm(dim=-1, keepdim=True)
    return feats.cpu().numpy().astype("float32")


def search_text_to_image(query: str, top_k):
    q = encode_text_query(query)
    scores, idxs = index.search(q, top_k * 3)  # fetch more

    seen = set()
    results = []

    for score, idx in zip(scores[0], idxs[0]):
        row = df.iloc[idx]
        img_path = row["image_path"]

        if img_path in seen:
            continue  # skip duplicates

        seen.add(img_path)
        results.append((img_path, row["caption"], float(score)))

        if len(results) >= top_k:
            break

    return results


In [None]:
import gradio as gr

with gr.Blocks() as demo:
    gr.Markdown("##  Fashion CLIP Search ")

    with gr.Row():
        query = gr.Textbox(label="Text query", value="boys tee", lines=1)
        topk  = gr.Slider(label="Top K",value=5, step=1)

    search_btn = gr.Button("Search")

    # Fixed gallery (no .style())
    gallery = gr.Gallery(
        label="Results",
        columns=5,
        height="auto"
    )

    captions_box = gr.Textbox(
        label="Captions & Scores",
        lines=10
    )

    search_btn.click(
        fn=gradio_search,
        inputs=[query, topk],
        outputs=[gallery, captions_box]
    )

demo.launch(share=True)
