In [1]:
!pip install pdf2image

Collecting pdf2image
  Downloading pdf2image-1.17.0-py3-none-any.whl.metadata (6.2 kB)
Downloading pdf2image-1.17.0-py3-none-any.whl (11 kB)
Installing collected packages: pdf2image
Successfully installed pdf2image-1.17.0


In [2]:
!pip install pymilvus

Collecting pymilvus
  Downloading pymilvus-2.6.2-py3-none-any.whl.metadata (6.5 kB)
Collecting ujson>=2.0.0 (from pymilvus)
  Downloading ujson-5.11.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (9.4 kB)
Downloading pymilvus-2.6.2-py3-none-any.whl (258 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m258.8/258.8 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ujson-5.11.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (57 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.4/57.4 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ujson, pymilvus
Successfully installed pymilvus-2.6.2 ujson-5.11.0


In [3]:
!apt-get install -y poppler-utils

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following NEW packages will be installed:
  poppler-utils
0 upgraded, 1 newly installed, 0 to remove and 35 not upgraded.
Need to get 186 kB of archives.
After this operation, 697 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 poppler-utils amd64 22.02.0-2ubuntu0.10 [186 kB]
Fetched 186 kB in 1s (212 kB/s)
Selecting previously unselected package poppler-utils.
(Reading database ... 126435 files and directories currently installed.)
Preparing to unpack .../poppler-utils_22.02.0-2ubuntu0.10_amd64.deb ...
Unpacking poppler-utils (22.02.0-2ubuntu0.10) ...
Setting up poppler-utils (22.02.0-2ubuntu0.10) ...
Processing triggers for man-db (2.10.2-1) ...


In [8]:
!pip install pymilvus[milvus_lite]

Collecting milvus-lite>=2.4.0 (from pymilvus[milvus_lite])
  Downloading milvus_lite-2.5.1-py3-none-manylinux2014_x86_64.whl.metadata (10.0 kB)
Downloading milvus_lite-2.5.1-py3-none-manylinux2014_x86_64.whl (55.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.3/55.3 MB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: milvus-lite
Successfully installed milvus-lite-2.5.1


In [4]:
!pwd

/content


In [6]:
from pdf2image import convert_from_path

pdf_path = "documents/chunking.pdf"
images = convert_from_path(pdf_path)

for i, image in enumerate(images):
    image.save(f"pages/page_{i + 1}.png", "PNG")

In [9]:
from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures

# Initialize Milvus client
client = MilvusClient(uri="milvus.db")  # For local testing with Milvus Lite
# For production: client = MilvusClient(uri="http://your-milvus-server:19530")

In [10]:
from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures

class MilvusColbertRetriever:
    def __init__(self, milvus_client, collection_name, dim=128):
        self.collection_name = collection_name
        self.client = milvus_client
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.load_collection(collection_name)
        self.dim = dim

    def create_collection(self):
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.drop_collection(collection_name=self.collection_name)

        schema = self.client.create_schema(
            auto_id=True,
            enable_dynamic_fields=True,
        )
        schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
        schema.add_field(
            field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
        )
        schema.add_field(field_name="seq_id", datatype=DataType.INT16)
        schema.add_field(field_name="doc_id", datatype=DataType.INT64)
        schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)

        self.client.create_collection(
            collection_name=self.collection_name, schema=schema
        )

    def create_index(self):
        self.client.release_collection(collection_name=self.collection_name)
        self.client.drop_index(
            collection_name=self.collection_name, index_name="vector"
        )
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_name="vector_index",
            index_type="FLAT",
            metric_type="IP",
            params={},
        )

        self.client.create_index(
            collection_name=self.collection_name, index_params=index_params, sync=True
        )

    def insert(self, data):
        """
        Inserts data into the Milvus collection.

        Args:
            data (dict): A dictionary containing the data to insert.
                         Expected keys: "colbert_vecs", "doc_id", "filepath".
        """
        insert_data = []
        for i, vec in enumerate(data["colbert_vecs"]):
            insert_data.append({
                "vector": vec,
                "seq_id": i,
                "doc_id": data["doc_id"],
                "doc": data["filepath"], # Storing filepath as doc for now
            })
        self.client.insert(self.collection_name, insert_data)
        self.client.flush(self.collection_name)

    def search(self, data, topk):
        # First, perform a vector search to find candidate documents
        search_params = {"metric_type": "IP", "params": {}}
        results = self.client.search(
            self.collection_name,
            data,
            limit=int(50),
            output_fields=["vector", "seq_id", "doc_id"],
            search_params=search_params,
        )

        # Collect unique document IDs from the results
        doc_ids = set()
        for r_id in range(len(results)):
            for r in range(len(results[r_id])):
                doc_ids.add(results[r_id][r]["entity"]["doc_id"])

        scores = []

        # Rerank function to calculate MaxSim score for each document
        def rerank_single_doc(doc_id, data, client, collection_name):
            # Retrieve all embeddings for this document
            doc_colbert_vecs = client.query(
                collection_name=collection_name,
                filter=f"doc_id in [{doc_id}]",
                output_fields=["seq_id", "vector", "doc"],
                limit=1000,
            )

            # Stack all vectors for this document
            doc_vecs = np.vstack(
                [doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
            )

            # Calculate MaxSim score: for each query token, find the most similar document token
            # and sum these maximum similarities
            score = np.dot(data, doc_vecs.T).max(1).sum()
            return (score, doc_id)

        # Use parallel processing to rerank documents
        with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
            futures = {
                executor.submit(
                    rerank_single_doc, doc_id, data, client, self.collection_name
                ): doc_id
                for doc_id in doc_ids
            }
            for future in concurrent.futures.as_completed(futures):
                score, doc_id = future.result()
                scores.append((score, doc_id))

        # Sort by score and return top-k results
        scores.sort(key=lambda x: x[0], reverse=True)
        if len(scores) >= topk:
            return scores[:topk]
        else:
            return scores

In [11]:
def search(self, data, topk):
    # First, perform a vector search to find candidate documents
    search_params = {"metric_type": "IP", "params": {}}
    results = self.client.search(
        self.collection_name,
        data,
        limit=int(50),
        output_fields=["vector", "seq_id", "doc_id"],
        search_params=search_params,
    )

    # Collect unique document IDs from the results
    doc_ids = set()
    for r_id in range(len(results)):
        for r in range(len(results[r_id])):
            doc_ids.add(results[r_id][r]["entity"]["doc_id"])

    scores = []

    # Rerank function to calculate MaxSim score for each document
    def rerank_single_doc(doc_id, data, client, collection_name):
        # Retrieve all embeddings for this document
        doc_colbert_vecs = client.query(
            collection_name=collection_name,
            filter=f"doc_id in [{doc_id}]",
            output_fields=["seq_id", "vector", "doc"],
            limit=1000,
        )

        # Stack all vectors for this document
        doc_vecs = np.vstack(
            [doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
        )

        # Calculate MaxSim score: for each query token, find the most similar document token
        # and sum these maximum similarities
        score = np.dot(data, doc_vecs.T).max(1).sum()
        return (score, doc_id)

    # Use parallel processing to rerank documents
    with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
        futures = {
            executor.submit(
                rerank_single_doc, doc_id, data, client, self.collection_name
            ): doc_id
            for doc_id in doc_ids
        }
        for future in concurrent.futures.as_completed(futures):
            score, doc_id = future.result()
            scores.append((score, doc_id))

    # Sort by score and return top-k results
    scores.sort(key=lambda x: x[0], reverse=True)
    if len(scores) >= topk:
        return scores[:topk]
    else:
        return scores

In [14]:
from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm
from PIL import Image
import os

# Initialize the ColPali model
device = get_torch_device("cuda")  # Use GPU if available
model_name = "vidore/colpali-v1.2"

model = ColPali.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map=device,
).eval()

processor = ColPaliProcessor.from_pretrained(model_name)

# Process document images
image_files = [name for name in os.listdir("./pages") if os.path.isfile(os.path.join("./pages", name))]
images = [Image.open(f"./pages/{name}") for name in image_files]


dataloader = DataLoader(
    dataset=ListDataset[str](images),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_images(x),
)

document_embeddings = []
for batch_doc in tqdm(dataloader):
    with torch.no_grad():
        batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
        embeddings_doc = model(**batch_doc)
    document_embeddings.extend(list(torch.unbind(embeddings_doc.to("cpu"))))

# Create and set up the Milvus collection
retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()

# Insert embeddings into Milvus
filepaths = [f"./pages/{name}" for name in image_files]
for i in range(len(filepaths)):
    data = {
        "colbert_vecs": document_embeddings[i].float().numpy(),
        "doc_id": i,
        "filepath": filepaths[i],
    }
    retriever.insert(data)

config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/862M [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

adapter_model.safetensors:   0%|          | 0.00/78.6M [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


preprocessor_config.json:   0%|          | 0.00/700 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/17.8M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/733 [00:00<?, ?B/s]

100%|██████████| 7/7 [00:16<00:00,  2.37s/it]


In [15]:
# Process queries
queries = [
    "How does ColBERT perform end-to-end retrieval?",
    "Show me the performance comparison table for ColBERT",
]

dataloader = DataLoader(
    dataset=ListDataset[str](queries),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_queries(x),
)

query_embeddings = []
for batch_query in dataloader:
    with torch.no_grad():
        batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
        embeddings_query = model(**batch_query)
    query_embeddings.extend(list(torch.unbind(embeddings_query.to("cpu"))))

# Search for each query
for i, query in enumerate(queries):
    query_embedding = query_embeddings[i].float().numpy()
    results = retriever.search(query_embedding, topk=3)

    print(f"Query: {query}")
    for score, doc_id in results:
        print(f"  Score: {score:.4f}, Document: {filepaths[doc_id]}")
    print()

Query: How does ColBERT perform end-to-end retrieval?
  Score: 10.4207, Document: ./pages/page_1.png
  Score: 8.9985, Document: ./pages/page_2.png
  Score: 8.2815, Document: ./pages/page_3.png

Query: Show me the performance comparison table for ColBERT
  Score: 7.7777, Document: ./pages/page_1.png
  Score: 7.1935, Document: ./pages/page_3.png
  Score: 6.3882, Document: ./pages/page_4.png



In [18]:
# Process the question
# question = "Why is chunking required? answer the 3 reasons mentioned in the document chunking.pdf"
question = "Explain Markdown text splitter technique mentioned in the document chunking.pdf"

# Prepare the query for the retriever
dataloader = DataLoader(
    dataset=ListDataset[str]([question]),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_queries(x),
)

query_embedding = None
for batch_query in dataloader:
    with torch.no_grad():
        batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
        embeddings_query = model(**batch_query)
    query_embedding = embeddings_query.to("cpu").squeeze().float().numpy()

# Search for relevant documents using the retriever
# Adjust topk as needed to get a sufficient number of relevant chunks
retrieved_results = retriever.search(query_embedding, topk=5)

print(f"Question: {question}")
print("Retrieved Document Chunks (filepaths):")
for score, doc_id in retrieved_results:
    print(f"  Score: {score:.4f}, Document: {filepaths[doc_id]}")

# You can add code here to actually read and display the content of these files if needed

Question: Explain Markdown text splitter technique mentioned in the document chunking.pdf
Retrieved Document Chunks (filepaths):
  Score: 14.8118, Document: ./pages/page_6.png
  Score: 14.5495, Document: ./pages/page_4.png
  Score: 13.9419, Document: ./pages/page_5.png
  Score: 13.5041, Document: ./pages/page_3.png
  Score: 13.2204, Document: ./pages/page_2.png
