
# ✅ ColPali Search - Verified Clean Version  
Notebook validated for use with:
- ColQwen 2.5 (tsystems/colqwen2.5-3b-multilingual-v1.0)
- Elasticsearch Cloud
- RVL-CDIP dataset (scientific report, scientific publication, presentation)


In [None]:
!pip install -r requirements.txt

In [None]:

import os
import torch
import numpy as np
from elasticsearch import Elasticsearch
from colpali_engine.models import ColQwen2_5, ColQwen2_5_Processor
from PIL import Image
from dotenv import load_dotenv
from datasets import load_dataset
from tqdm import tqdm
import shutil
from IPython.display import display, HTML


In [None]:

load_dotenv("elastic.env")
ELASTIC_HOST = os.getenv("ELASTIC_HOST", "").strip('"')
ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY", "").strip('"')

if not ELASTIC_HOST or not ELASTIC_API_KEY:
    raise ValueError("Please set ELASTIC_HOST and ELASTIC_API_KEY in elastic.env")


In [None]:

MODEL_NAME = "tsystems/colqwen2.5-3b-multilingual-v1.0"


In [None]:

if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print("Using device:", device)


In [None]:

model = ColQwen2_5.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16).eval().to(device)
processor = ColQwen2_5_Processor.from_pretrained(MODEL_NAME)


In [None]:

es = Elasticsearch(hosts=[ELASTIC_HOST], api_key=ELASTIC_API_KEY)


In [None]:

INDEX_NAME = "colpali_docs"

index_body = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "category": {"type": "keyword"},
            "rank_vectors": {
                "type": "rank_vectors",
                "dims": 2048,
                "element_type": "float"
            }
        }
    }
}
if not es.indices.exists(index=INDEX_NAME):
    es.indices.create(index=INDEX_NAME, body=index_body)


In [None]:

def embed_text(text):
    inputs = processor.process_queries([text]).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs[0].cpu().numpy()


In [None]:

label_to_category = {
    5: 'scientific report',
    6: 'scientific publication',
    12: 'presentation',
}

print("Downloading RVL-CDIP dataset...")
dataset = load_dataset('rvl_cdip', split='train')

def normalize_category(example):
    example['category'] = label_to_category.get(example['label'], 'other')
    return example

dataset = dataset.map(normalize_category)
filtered = dataset.filter(lambda x: x['category'] in label_to_category.values())

output_dir = "rvl_cdip_filtered"
shutil.rmtree(output_dir, ignore_errors=True)
os.makedirs(output_dir, exist_ok=True)

documents = []
for i, example in enumerate(filtered.select(range(30))):
    img = example['image']
    label = example['category']
    text = f"Category: {label} document describing scientific content."
    img_path = f"{output_dir}/{label}_{i}.png"
    img.save(img_path)
    documents.append({"title": f"{label.capitalize()} Document {i}", "text": text, "category": label, "path": img_path})


In [None]:

for i, doc in enumerate(documents):
    patch_vectors = embed_text(doc["text"])
    body = {
        "title": doc["title"],
        "category": doc["category"],
        "rank_vectors": patch_vectors.tolist()
    }
    es.index(index=INDEX_NAME, id=f"doc_{i}", document=body)


In [None]:

queries = {
    "en": [
        "What are deep learning methods for describing images?",
        "What was shared in the latest physics event?",
        "How do proteins fold in biological systems?"
    ],
    "ko": [
        "이미지를 설명하는 딥러닝 방법은?",
        "최근 물리학 컨퍼런스에서 발표된 내용은?",
        "단백질이 어떻게 접히는지에 대한 연구는?"
    ]
}


In [None]:

def search_and_render(query):
    q_vec = embed_text(query)
    es_query = {
        "_source": False,
        "query": {
            "script_score": {
                "query": {"match_all": {}},
                "script": {
                    "source": "maxSimDotProduct(params.query_vector, 'rank_vectors')",
                    "params": {"query_vector": q_vec.tolist()}
                }
            }
        },
        "size": 5
    }
    results = es.search(index=INDEX_NAME, body=es_query)
    image_ids = [hit["_id"] for hit in results["hits"]["hits"]]

    html = "<div style='display: flex; flex-wrap: wrap; align-items: flex-start;'>"
    for doc_id in image_ids:
        match = next((d for d in documents if f"doc_{documents.index(d)}" == doc_id), None)
        if match:
            image_path = match["path"]
            html += f'<img src="{image_path}" alt="{doc_id}" style="max-width:300px; height:auto; margin:10px;">'
    html += "</div>"
    display(HTML(html))


In [None]:

print("\n--- English Queries ---")
for q in queries['en']:
    print(f"\nQuery: {q}")
    search_and_render(q)

print("\n--- Korean Queries ---")
for q in queries['ko']:
    print(f"\nQuery: {q}")
    search_and_render(q)


In [None]:

# We kill the kernel forcefully to free up the memory from the ColQwen model.
import os
print("Shutting down the kernel to free memory...")
os._exit(0)
