In [None]:
################## Image Embeddings ##################

In [None]:
!pip install transformers sentence-transformers pillow

In [None]:
!wget http://images.cocodataset.org/zips/val2017.zip
!wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip
!unzip val2017.zip
!unzip annotations_trainval2017.zip

In [None]:
import os
import json
import torch
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
from sentence_transformers import SentenceTransformer

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def generate_caption(image_path, model, processor):
    img = Image.open(image_path).convert("RGB")
    inputs = processor(images=img, return_tensors="pt").to(device)
    out = model.generate(**inputs)
    return processor.decode(out[0], skip_special_tokens=True)

def embed_text(text, embedder):
    return embedder.encode(text, normalize_embeddings=True)

caption_model_name = "Salesforce/blip-image-captioning-base"
embedder_model_name = "sentence-transformers/all-MiniLM-L6-v2"

processor = BlipProcessor.from_pretrained(caption_model_name)
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name).to(device)
embedder = SentenceTransformer(embedder_model_name, device=device)

In [None]:
input_dir = "/content/val2017"
output_json = "image_embeddings.json"
results = []

for fname in os.listdir(input_dir):
    if fname.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif")):
        path = os.path.join(input_dir, fname)
        caption = generate_caption(path, caption_model, processor)
        embedding = embed_text(caption, embedder)
        results.append({
            "filename": fname,
            "caption": caption,
            "embedding": embedding.tolist()
        })

with open(output_json, "w") as f:
    json.dump(results, f, indent=2)

print(f"Processed {len(results)} images → {output_json}")

In [None]:
!pip install pycocotools git+https://github.com/salaniz/pycocoevalcap.git

In [None]:
from pycocotools.coco import COCO

ann_file = "/content/annotations/captions_val2017.json"
coco = COCO(ann_file)

fname2id = {}
for img in coco.loadImgs(coco.getImgIds()):
    fname2id[img['file_name']] = img['id']

preds = []
for item in results:
    img_id = fname2id[item['filename']]
    preds.append({
        "image_id": img_id,
        "caption": item['caption']
    })


In [None]:
from pycocoevalcap.eval import COCOEvalCap

coco_dt = coco.loadRes(preds)
coco_eval = COCOEvalCap(coco, coco_dt)
coco_eval.params['image_id'] = coco_dt.getImgIds()
coco_eval.evaluate()

for metric, score in coco_eval.eval.items():
    print(f"{metric}: {score:.3f}")


In [None]:
################## Audio Embeddings ##################

In [None]:
!pip install aac-datasets

In [None]:
import numpy as np
import torch
from transformers import ClapProcessor, ClapModel
from aac_datasets import Clotho
from sklearn.metrics.pairwise import cosine_similarity
import librosa

device = "cuda" if torch.cuda.is_available() else "cpu"

processor = ClapProcessor.from_pretrained("laion/clap-htsat-fused")
model = ClapModel.from_pretrained("laion/clap-htsat-fused").to(device)
model.eval()

dataset = Clotho(root=".", subset="val", download=True)

def embed_audio(waveform: np.ndarray):
    waveform = librosa.resample(waveform, orig_sr=16000, target_sr=48000)
    inputs = processor(audios=waveform, sampling_rate=48000, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        feats = model.get_audio_features(**inputs)
    return feats / feats.norm(dim=-1, keepdim=True)

def embed_text(captions: list[str]):
    inputs = processor(text=captions, return_tensors="pt", padding=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        feats = model.get_text_features(**inputs)
    return feats / feats.norm(dim=-1, keepdim=True)

audio_embs, text_embs = [], []
for i, sample in enumerate(dataset):
    wav = sample["audio"].numpy().squeeze().astype(np.float32)
    audio_embs.append(embed_audio(wav).cpu().numpy()[0])
    text_embs.extend(embed_text(sample["captions"]).cpu().numpy())

    if (i + 1) % 200 == 0:
        print(f"Processed {i + 1} samples")

A = np.vstack(audio_embs)
T = np.vstack(text_embs)

def recall_at_k(A, T, K):
    sims = cosine_similarity(A, T)
    hits = sum(
        1 for i, row in enumerate(sims)
        if any(idx in np.argsort(row)[-K:] for idx in range(5 * i, 5 * i + 5))
    )
    return hits / A.shape[0]

for k in (100, 500, 1000):
    print(f"Recall@{k}: {recall_at_k(A, T, k):.3f}")

In [None]:
# results indicate that model is accurate but ranking function needs to be improved
 # by possibly using a cross-encoder

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

for k in (100, 500, 1000):
    print(f"Recall@{k}: {recall_at_k(A, T, k):.3f}")


sims = cosine_similarity(A, T)
def mean_reciprocal_rank(sim_matrix, group_size=5):
    rr = []
    for i, row in enumerate(sim_matrix):
        ranked = np.argsort(row)[::-1]
        first_hit_rank = min(ranked.tolist().index(j) for j in range(5*i, 5*i + group_size))
        rr.append(1.0 / (first_hit_rank + 1))
    return np.mean(rr)

def mean_average_precision(sim_matrix, group_size=5):
    APs = []
    for i, row in enumerate(sim_matrix):
        ranked = np.argsort(row)[::-1]
        hits = 0
        score_sum = 0.0
        for rank_idx, idx in enumerate(ranked, start=1):
            if 5*i <= idx < 5*i + group_size:
                hits += 1
                score_sum += hits / rank_idx
                if hits == group_size:
                    break
        APs.append(score_sum / group_size)
    return np.mean(APs)

def ndcg_at_k(sim_matrix, K, group_size=5):
    dcg = 0.0
    idcg = sum((1.0 / np.log2(r + 2) for r in range(group_size)))
    for i, row in enumerate(sim_matrix):
        ranked = np.argsort(row)[::-1][:K]
        for rank_pos, idx in enumerate(ranked):
            if 5*i <= idx < 5*i + group_size:
                dcg += 1.0 / np.log2(rank_pos + 2)
    return dcg / (sim_matrix.shape[0] * idcg)

mrr = mean_reciprocal_rank(sims)
map_score = mean_average_precision(sims)
ndcg5 = ndcg_at_k(sims, K=5)
ndcg10 = ndcg_at_k(sims, K=10)

print(f"MRR: {mrr:.3f}")
print(f"mAP: {map_score:.3f}")
print(f"nDCG@5: {ndcg5:.3f}")
print(f"nDCG@10: {ndcg10:.3f}")


In [None]:
################## Video Embeddings ##################

In [None]:
# TO DO