# L5 - Building Multi-modal RAG with ColPali

<p style="background-color:#fff6e4; padding:15px; border-width:3px; border-color:#f5ecda; border-style:solid; border-radius:6px"> ‚è≥ <b>Note <code>(Kernel Starting)</code>:</b> This notebook takes about 30 seconds to be ready to use. You may start and watch the video while you wait.</p>

<div style="background-color:#fff6ff; padding:13px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px">
<p> üíª &nbsp; <b>Access <code>requirements.txt</code> and <code>helper.py</code> files:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Open"</em>.

<p> ‚¨á &nbsp; <b>Download Notebooks:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Download as"</em> and select <em>"Notebook (.ipynb)"</em>.</p>

<p> üìí &nbsp; For more help, please see the <em>"Appendix ‚Äì Tips, Help, and Download"</em> Lesson.</p>

</div>

The following cell is not in the video and just ensures output later in this notebook will render properly.

In [1]:
import plotly.io as pio
pio.renderers.default = "notebook"

In [2]:
from IPython.display import Markdown, display
from dotenv import load_dotenv
from openai import OpenAI
import os

load_dotenv()

openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

#### Loading MUVERA

In [3]:
from fastembed.postprocess.muvera import Muvera

# Create MUVERA instance with same config from Lesson 4
muvera = Muvera(
    dim=128,  # ColPali embedding dimensionality
    k_sim=6,  # 64 clusters (2^6)
    dim_proj=16,  # Compress to 16 dimensions per cluster
    r_reps=20,  # 20 repetitions
    random_seed=42,
)

#### Creating Qdrant Collection

In [None]:
# Expect this cell may take several minutes to finish
from qdrant_client import QdrantClient, models
from helper import recreate_colpali_optimizations_collection

# Collection from Lesson 3 + MUVERA
collection_name = "colpali-optimizations"

# Connect to Qdrant
qdrant = QdrantClient("http://localhost:6333")

# Recreate the collection with all optimizations including MUVERA
recreate_colpali_optimizations_collection(
    qdrant, collection_name, muvera=muvera
)

Upserting embeddings: 0it [00:00, ?it/s]

#### Loading Precomputed Embeddings of Sample Queries

In [None]:
from helper import load_or_compute_rag_query_embeddings
import numpy as np

# Load precomputed RAG query embeddings
rag_queries_df = load_or_compute_rag_query_embeddings(load_precomputed=True)


def embed_query(query_text: str) -> np.ndarray:
    """
    Convert a text query to ColPali embedding using precomputed vectors.
    
    Only supports the precomputed RAG queries. Raises an error for unknown queries.
    """
    # Look up query in the precomputed embeddings DataFrame
    matching_rows = rag_queries_df[rag_queries_df["query"] == query_text]
    
    if len(matching_rows) == 0:
        # Query not found in precomputed embeddings
        available_queries = rag_queries_df["query"].tolist()
        raise ValueError(
            f"Query not found in precomputed embeddings.\n"
            f"Query: '{query_text}'\n"
            f"Available queries:\n" + 
            "\n".join(f"  - {q}" for q in available_queries)
        )
    
    # Return the precomputed embedding as NumPy array
    return np.stack(matching_rows.iloc[0]["query_embedding"])

In [None]:
test_query = "Describe the concept of the 'one learning algorithm'"
test_embedding = embed_query(test_query)
test_embedding

#### Creating Retrieval Helper Functions

In [None]:
def retrieve(
    query_text: str, using: str, top_k: int = 3
) -> list[tuple[str, float]]:
    """
    Retrieve documents using specified optimization method.
    """
    query_embedding = embed_query(query_text)
    results = qdrant.query_points(
        collection_name=collection_name,
        query=query_embedding,
        using=using,
        search_params=models.SearchParams(
            quantization=models.QuantizationSearchParams(rescore=False)
        ),
        limit=top_k,
    )
    return [
        (point.payload["image_path"], point.score)
        for point in results.points
    ]

In [None]:
def retrieve_with_two_stage(
    query_text: str,
    top_k: int = 3,
    prefetch_multiplier: int = 5,
    rerank_using: str = "original",
) -> list[tuple[str, float]]:
    """
    Two-stage retrieval: MUVERA for fast candidates, flexible reranking.
    """
    # Embed query with both ColPali and MUVERA
    query_colpali = embed_query(query_text)
    query_muvera = muvera.process_query(query_colpali)

    # Single API call with two-stage retrieval
    results = qdrant.query_points(
        collection_name=collection_name,
        prefetch=[
            models.Prefetch(
                query=query_muvera,
                using="muvera_fde",
                limit=top_k * prefetch_multiplier,
            )
        ],
        query=query_colpali,
        using=rerank_using,
        limit=top_k,
    )

    return [
        (point.payload["image_path"], point.score)
        for point in results.points
    ]

In [None]:
from helper import display_retrieved_documents

# Test the original ColPali retriever
test_query = "How does the human brain work?"
results = retrieve(test_query, using="original", top_k=3)

# Display the retrieved images
fig = display_retrieved_documents(results)
fig.show()

#### RAG Pipeline Helper Function
Performs retrieval and provides retrieved document pages to VLM to generate a response.

In [None]:
from PIL import Image
from helper import pil_image_to_base64


def generate_answer(
    query_text: str,
    image_paths: list[str],
    model: str = "gpt-4o",
    max_tokens: int = 500,
) -> str:
    """
    Generate answer using OpenAI vision model.
    """
    # Build the messages array
    messages = [
        {
            "role": "system",
            "content": (
                "You are a helpful assistant that "
                "answers questions based on the "
                "provided document images. "
                "Read the images carefully and "
                "provide accurate answers. "
                "Answer in Markdown and highlight "
                "the most important parts"
            ),
        },
    ]

    # Build user message with text and images
    user_content = [{"type": "text", "text": query_text}]

    # Add each image (up to OpenAI's limit of 10)
    for image_path in image_paths[:10]:
        # Load and encode image
        img = Image.open(image_path)
        base64_img = pil_image_to_base64(img)

        user_content.append(
            {
                "type": "image_url",
                "image_url": {
                    "url": (f"data:image/png;base64,{base64_img}"),
                },
            }
        )

    messages.append(
        {
            "role": "user",
            "content": user_content,
        }
    )

    # Call OpenAI API
    response = openai_client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=max_tokens,
    )

    return response.choices[0].message.content

#### Demonstrating a Simple RAG Pipeline

In [None]:
image_paths = [path for path, _ in results]

print(f"Query: {test_query}\n")
answer = generate_answer(test_query, image_paths)
display(Markdown(answer))

In [None]:
queries = [
    "Describe the concept of the 'one learning algorithm'",
    "Explain the size vs performance tradeoff",
    "What was the coffee mug example used to present?",
]

In [None]:
from helper import display_retrieved_documents

for query in queries:
    print("=" * 70)
    print(f"Query: {query}")
    print("=" * 70)

    # Retrieve documents using original ColPali
    results = retrieve(query, using="original", top_k=3)

    # Extract image paths
    image_paths = [path for path, _ in results]

    # Generate answer
    answer = generate_answer(query, image_paths)
    display(Markdown(answer))
    print()

    # Visualize the retrieved documents
    fig = display_retrieved_documents(results)
    fig.show()
    print()

#### Testing RAG Pipeline Using Different Optimizations

In [None]:
# Map optimization names to their vector names
retrievers = [
    ("Original ColPali", "original"),
    # ("Scalar Quantized", "scalar_quantized"),
    ("Binary Quantized", "binary_quantized"),
    ("Hierarchical 2x", "hierarchical_2x"),
    # ("Hierarchical 4x", "hierarchical_4x"),
    # ("Row Pooled", "row_pooled"),
    # ("Column Pooled", "column_pooled"),
]

In [None]:
for retriever_name, using in retrievers:
    print(f"\nRetriever: {retriever_name}")
    print("-" * 50)

    # Retrieve documents using specified optimization
    results = retrieve(test_query, using=using, top_k=3)

    # Extract image paths
    image_paths = [path for path, _ in results]

    # Generate answer
    answer = generate_answer(test_query, image_paths)
    display(Markdown(answer))

    fig = display_retrieved_documents(results)
    fig.show()
    print()

#### Comparing Two-Stage Retrieval with ColPali in the RAG Pipeline

In [None]:
# Compare two-stage retrieval with original ColPali
comparison_retrievers = [
    (
        "Original ColPali",
        retrieve(test_query, using="original", top_k=3),
    ),
    (
        "Two-Stage (MUVERA + ColPali)",
        retrieve_with_two_stage(
            test_query, rerank_using="original", top_k=3
        ),
    ),
]

for retriever_name, results in comparison_retrievers:
    print(f"\nRetriever: {retriever_name}")
    print("=" * 60)

    # Extract image paths
    image_paths = [path for path, _ in results]

    # Generate answer
    answer = generate_answer(test_query, image_paths)
    display(Markdown(answer))
    print()

    # Visualize the results
    fig = display_retrieved_documents(results)
    fig.show()
    print()

#### Testing Different Reranking Strategies with the same MUVERA prefetch


In [None]:
# Test different reranking strategies with same MUVERA prefetch
reranking_strategies = [
    ("Original ColPali", "original"),
    ("Binary Quantized", "binary_quantized"),
    ("Hierarchical 2x", "hierarchical_2x"),
]

for strategy_name, rerank_using in reranking_strategies:
    print(f"\nMUVERA + {strategy_name}")
    print("=" * 60)

    # Retrieve documents with specified reranking strategy
    results = retrieve_with_two_stage(
        test_query, top_k=3, rerank_using=rerank_using
    )

    # Extract image paths
    image_paths = [path for path, _ in results]

    # Generate answer
    answer = generate_answer(test_query, image_paths)
    display(Markdown(answer))
    print()

    fig = display_retrieved_documents(results)
    fig.show()
    print()