In [None]:

import os
from typing import List, Dict, Any
from PIL import Image
import torch
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
from qdrant_client import QdrantClient, models
from nltk.translate.bleu_score import sentence_bleu
from bert_score import score
from sklearn.metrics import precision_score, recall_score
from sentence_transformers import SentenceTransformer, util
import numpy as np 

In [None]:
CONFIG = {
    "embedding_model_name": "llamaindex/vdr-2b-multi-v1",
    "vlm_model_name": "Qwen/Qwen2.5-VL-7B-Instruct",
    "vlm_processor_name": "Qwen/Qwen2.5-VL-7B-Instruct",  # Corrected from -AWQ
    "data_dir": "./data_images",
    "collection_name": "llama-multi",
    "device_embedding": "cuda:1",
    "device_vlm": "cuda:0",
    "image_extensions": [".jpg", ".png"],
    "max_new_tokens": 512,
    "image_resize": (850, 850)
}


# Ground Truth for Retrieval
GROUND_TRUTH = {
    "How is the scaled dot-product attention calculated?": ["page4.png"],
    "What is the BLEU score of the model in English to German translation EN-DE?": ["page8.png"],
    "How long were the base and big models trained?": ["page7.png"],
    "Which optimizer was used when training the models?": ["page7.png"],
    "Show me a picture that shows the difference between Scaled Dot-Product Attention and Multi-Head Attention.": ["page4.png"],
    "similar.png": ["page4.png"]
}

# Reference Answers for Generation
REFERENCES = {
    "How is the scaled dot-product attention calculated?": (
        "Scaled dot-product attention is calculated as follows: Given queries (Q), keys (K), and values (V) matrices "
        "of dimensions d_k and d_v, compute the dot product of Q and K, divide by the square root of d_k, apply a softmax "
        "function to obtain attention weights, and multiply by V. The formula is: Attention(Q, K, V) = softmax(QK^T / √d_k)V."
    ),
    "What is the BLEU score of the model in English to German translation EN-DE?": (
        "The Transformer base model achieves a BLEU score of 27.3, and the big model achieves a BLEU score of 28.4 "
        "on the WMT 2014 English-to-German translation task."
    ),
    "How long were the base and big models trained?": (
        "The base Transformer models were trained for 100,000 steps, approximately 12 hours, on 8 NVIDIA P100 GPUs. "
        "The big Transformer models were trained for 300,000 steps, approximately 3.5 days."
    ),
    "Which optimizer was used when training the models?": (
        "The Transformer models were trained using the Adam optimizer with β1 = 0.9, β2 = 0.98, and ε = 10^-9."
    ),
    "Show me a picture that shows the difference between Scaled Dot-Product Attention and Multi-Head Attention.": (
        "Figure 2 in the Transformer paper illustrates the difference: Scaled Dot-Product Attention computes attention "
        "using a single query-key-value operation scaled by √d_k, while Multi-Head Attention performs multiple such operations "
        "in parallel, projecting queries, keys, and values into h subspaces, concatenating the results, and applying a final projection."
    ),
    "What is the name of the attention function of this image?": (
        "The image depicts Scaled Dot-Product Attention and Multi-Head Attention."
    )
}


In [None]:

def initialize_embedding_model() -> HuggingFaceEmbedding:
    """Initialize the VDRE embedding model."""
    return HuggingFaceEmbedding(
        model_name=CONFIG["embedding_model_name"],
        device=CONFIG["device_embedding"],
        trust_remote_code=True
    )

def initialize_vlm_model() -> tuple[Qwen2_5_VLForConditionalGeneration, AutoProcessor]:
    """Initialize the Qwen2.5-VL model and processor."""
    vlm = Qwen2_5_VLForConditionalGeneration.from_pretrained(
        CONFIG["vlm_model_name"],
        torch_dtype=torch.float16,
        device_map=CONFIG["device_vlm"]
    )
    processor = AutoProcessor.from_pretrained(
        CONFIG["vlm_processor_name"],
        use_fast=True
    )
    return vlm, processor

def initialize_sbert_model() -> SentenceTransformer:
    return SentenceTransformer('paraphrase-MiniLM-L12-v2')

def load_documents(data_dir: str) -> List[Dict[str, str]]:
    """Load image documents from the data directory."""
    return [
        {"caption": filename, "image": os.path.join(data_dir, filename)}
        for filename in os.listdir(data_dir)
        if any(filename.endswith(ext) for ext in CONFIG["image_extensions"])
    ]

def generate_embeddings(
    model: HuggingFaceEmbedding,
    documents: List[Dict[str, str]]
) -> tuple[List, List]:
    """Generate text and image embeddings for documents."""
    text_embeddings = model.get_text_embedding_batch([doc["caption"] for doc in documents])
    image_embeddings = model.get_image_embedding_batch([doc["image"] for doc in documents])
    return text_embeddings, image_embeddings

def initialize_vector_store() -> QdrantClient:
    """Initialize the Qdrant vector store client."""
    return QdrantClient(":memory:")

def setup_collection(
    client: QdrantClient,
    collection_name: str,
    text_embeddings: List,
    image_embeddings: List
) -> None:
    """Set up the Qdrant collection if it doesn't exist."""
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config={
                "image": models.VectorParams(size=len(image_embeddings[0]), distance=models.Distance.COSINE),
                "text": models.VectorParams(size=len(text_embeddings[0]), distance=models.Distance.COSINE)
            }
        )

def upload_documents(
    client: QdrantClient,
    collection_name: str,
    documents: List[Dict[str, str]],
    text_embeddings: List,
    image_embeddings: List
) -> None:
    """Upload documents and their embeddings to the vector store."""
    client.upload_points(
        collection_name=collection_name,
        points=[
            models.PointStruct(
                id=idx,
                vector={"text": text_embeddings[idx], "image": image_embeddings[idx]},
                payload=doc
            )
            for idx, doc in enumerate(documents)
        ]
    )

def query_image(
    client: QdrantClient,
    collection_name: str,
    model: HuggingFaceEmbedding,
    query: str | Image.Image,
    vector_type: str = "image",
    limit: int = 1,
    resize: bool = False
) -> Any:
    """Query the vector store for images based on text or image input."""
    query_vector = (
        model.get_query_embedding(query)
        if isinstance(query, str)
        else model.get_image_embedding(query)
    )
    result = client.query_points(
        collection_name=collection_name,
        query=query_vector,
        using=vector_type,
        with_payload=["image", "caption"],
        limit=limit
    ).points
    if limit == 1:
        result = result[0]
        if resize:
            result.payload["image"] = Image.open(result.payload["image"]).resize(CONFIG["image_resize"])
    return result


def process_vlm_query(
    vlm: Qwen2_5_VLForConditionalGeneration,
    processor: AutoProcessor,
    image: Image.Image,
    query: str
) -> str:
    """Process a query with the vision-language model."""
    messages = [{
        "role": "user",
        "content": [
            {"type": "image", "image": image},
            {"type": "text", "text": query}
        ]
    }]
    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    image_inputs, video_inputs = process_vision_info(messages)
    inputs = processor(
        text=[text],
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt"
    ).to(CONFIG["device_vlm"])
    generated_ids = vlm.generate(**inputs, max_new_tokens=CONFIG["max_new_tokens"])
    generated_ids_trimmed = [
        out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    return processor.batch_decode(
        generated_ids_trimmed,
        skip_special_tokens=True,
        clean_up_tokenization_spaces=False
    )[0]


In [None]:

def evaluate_retrieval(
    client: QdrantClient,
    collection_name: str,
    model: HuggingFaceEmbedding,
    queries: List[Any],
    ground_truth: Dict[str, List[str]],
    image_query_path: str,
    k: int = 1
) -> Dict[str, float]:
    """Evaluate retrieval performance for queries."""
    precisions, recalls = [], []
    for query in queries:
        query_key = os.path.basename(image_query_path) if not isinstance(query, str) else query
        result = query_image(client, collection_name, model, query, vector_type="image", limit=k)
        retrieved_images = [result.payload["image"]] if k == 1 else [point.payload["image"] for point in result]
        relevant_images = ground_truth.get(query_key, [])
        y_true = [1 if os.path.basename(img) in relevant_images else 0 for img in retrieved_images]
        y_pred = [1] * len(retrieved_images)
        precisions.append(precision_score(y_true, y_pred, zero_division=0))
        recalls.append(recall_score(y_true, y_pred, zero_division=0) if relevant_images else 0)
    return {
        f"Precision@{k}": sum(precisions) / len(precisions) if precisions else 0,
        f"Recall@{k}": sum(recalls) / len(recalls) if recalls else 0
    }


In [None]:


def evaluate_generation(
    vlm: Qwen2_5_VLForConditionalGeneration,
    processor: AutoProcessor,
    client: QdrantClient,
    collection_name: str,
    model: HuggingFaceEmbedding,
    queries: List[Any],
    vlm_queries: List[str],
    references: Dict[str, str],
    sbert_model: SentenceTransformer
) -> Dict[str, float]:
    """Evaluate VLM generation performance."""
    bleu_scores, sbert_scores = [], []
    from nltk.translate.bleu_score import SmoothingFunction
    smoothie = SmoothingFunction().method4  # Use method4 for robustness
    for query, vlm_query in zip(queries, vlm_queries):
        result = query_image(client, collection_name, model, query, vector_type="image", limit=1)
        image = Image.open(result.payload["image"])
        generated_text = process_vlm_query(vlm, processor, image, vlm_query)
        reference = references.get(vlm_query, "")
        if generated_text.strip() and reference.strip():  # Ensure non-empty texts
            # BLEU score with smoothing
            bleu_scores.append(sentence_bleu(
                [reference.split()], 
                generated_text.split(), 
                weights=(0.5, 0.5, 0, 0),
                smoothing_function=smoothie
            ))
            # SBERT score
            embeddings = sbert_model.encode([generated_text, reference], convert_to_tensor=True)
            sbert_score = util.cos_sim(embeddings[0], embeddings[1]).item()
            sbert_scores.append(sbert_score)
    return {
        "Average BLEU": sum(bleu_scores) / len(bleu_scores) if bleu_scores else 0,
        "Average SBERT Score": sum(sbert_scores) / len(sbert_scores) if sbert_scores else 0
    }

In [None]:
embedding_model = initialize_embedding_model()
vlm, vlm_processor = initialize_vlm_model()
sbert_model = initialize_sbert_model()
documents = load_documents(CONFIG["data_dir"])

text_embeddings, image_embeddings = generate_embeddings(embedding_model, documents)

client = initialize_vector_store()
setup_collection(client, CONFIG["collection_name"], text_embeddings, image_embeddings)
upload_documents(client, CONFIG["collection_name"], documents, text_embeddings, image_embeddings)


In [8]:

text_queries = [
    "How is the scaled dot-product attention calculated?",
    "What is the BLEU score of the model in English to German translation EN-DE?",
    "How long were the base and big models trained?",
    "Which optimizer was used when training the models?",
    "Show me a picture that shows the difference between Scaled Dot-Product Attention and Multi-Head Attention."
]
image_query_path = "./images/similar.png"
queries = text_queries + [Image.open(image_query_path) if os.path.exists(image_query_path) else None]
vlm_queries = text_queries + ["What is the name of the attention function of this image?"]  # VLM query for image

for query, vlm_query in zip(queries, vlm_queries):
    if query is None:
        print(f"Image Query Skipped: {image_query_path} not found\n")
        continue
    result = query_image(client, CONFIG["collection_name"], embedding_model, query, resize=False)
    image = Image.open(result.payload["image"])
    output = process_vlm_query(vlm, vlm_processor, image, vlm_query)
    if isinstance(query, str):
        print(f"Text Query: {query}\nVLM Output: {output}\n")
    else:
        print(f"Image Query: {image_query_path}\nVLM Query: {vlm_query}\nVLM Output: {output}\n")

retrieval_metrics = evaluate_retrieval(
    client, CONFIG["collection_name"], embedding_model, [q for q in queries if q is not None], GROUND_TRUTH, image_query_path, k=1
)
generation_metrics = evaluate_generation(
    vlm, vlm_processor, client, CONFIG["collection_name"], embedding_model,
    [q for q in queries if q is not None], vlm_queries, REFERENCES, sbert_model
)
print("Retrieval Metrics:", retrieval_metrics)
print("Generation Metrics:", generation_metrics)


Text Query: How is the scaled dot-product attention calculated?
VLM Output: The scaled dot-product attention is calculated by first computing the dot products of the query with all keys, dividing each by \(\sqrt{d_k}\), and then applying a softmax function to obtain the weights on the values. The formula for this is given by:

\[
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
\]

Here:
- \(Q\) is the query matrix.
- \(K\) is the key matrix.
- \(V\) is the value matrix.
- \(d_k\) is the dimensionality of the keys and queries.
- \(\sqrt{d_k}\) is used to scale the dot products to prevent the softmax function from having extremely small gradients when \(d_k\) is large.

The softmax function ensures that the weights sum to 1, making them valid probabilities. These weights are then applied to the values to compute the final output of the attention mechanism.

Text Query: What is the BLEU score of the model in English to German translation EN-DE?
VLM Output: 