In [None]:
# Load the ColQWEN model
import torch
from colpali_engine.models import ColQwen2, ColQwen2Processor
import os

# Get rid of process forking deadlock warnings.
os.environ["TOKENIZERS_PARALLELISM"] = "false"

def get_device():
    if torch.cuda.is_available():
        return "cuda:0"
    elif torch.backends.mps.is_available():
        return "mps"
    else:
        return "cpu"

# A convenience class to wrap the functionality we will use from
# https://huggingface.co/vidore/colqwen2-v1.0
class Colqwen:
    def __init__(self):
        """Load the model and processor from huggingface."""
        # About a 5 GB download and similar memory usage.
        self.model = ColQwen2.from_pretrained(
            "vidore/colqwen2-v1.0",
            torch_dtype=torch.bfloat16,
            device_map=get_device(),  # or "cuda:0" if using a NVIDIA GPU
            attn_implementation="eager",  # or "flash_attention_2" if available
        ).eval()
        self.processor = ColQwen2Processor.from_pretrained("vidore/colqwen2-v1.0")

    # A batch size of one appears to be most performant when running on an M4.
    # Note: Reducing the image resolution speeds up the vectorizer and produces
    # fewer multi-vectors.
    def multi_vectorize_image(self, img):
        """Return the multi-vector image of the supplied PIL image."""
        image_batch = self.processor.process_images([img]).to(self.model.device)
        with torch.no_grad():
            image_embedding = self.model(**image_batch)
        return image_embedding[0]

    def multi_vectorize_text(self, query):
        """Return the multi-vector embedding of the query text string."""
        query_batch = self.processor.process_queries([query]).to(self.model.device)
        with torch.no_grad():
            query_embedding = self.model(**query_batch)
        return query_embedding[0]

    def maxsim(self, query_embedding, image_embedding):
        """Compute the MaxSim between the query and image multi-vectors."""
        return self.processor.score_multi_vector(
            [query_embedding], [image_embedding]
        ).item()


# Instantiate the model to be used below.
colqwen = Colqwen()

In [None]:
import torch
import os
import json
from PIL import Image

def get_embedding(image_path):
    """Return a generated multi vector embedding for a given image"""
    image = Image.open(image_path)
    embedding = colqwen.multi_vectorize_image(image)
    return embedding.detach().cpu().to(dtype=torch.float32).numpy()

def generate_embeddings():
    """Generate multi vector embeddings for all PDF pages"""
    pages_folder = "./pdf/pages"
    png_files = [f for f in os.listdir(pages_folder) if f.endswith('.png')]
    embeddings = []
    for file_name in png_files:
        emb = get_embedding(f"{pages_folder}/{file_name}")
        embeddings.append((file_name, emb))
    return embeddings

def generate_or_load_cached_embeddings(force_generate = False):
    """Get the cached multi vector embeddings or generate them if they are not present"""
    try:
        cached_embeddings = "./pdf/multi_vector_embeddings.json"
        if os.path.exists(cached_embeddings) and not force_generate:
            data = []
            with open(cached_embeddings, "r") as f:
                data = json.load(f)
            return data
        else:
            embeddings = generate_embeddings()
            data = []
            for emb in embeddings:
                data.append({
                    "paper_number": emb[0],
                    "embedding": emb[1].tolist()  # Convert NumPy array to list for JSON
                })
            with open(cached_embeddings, "w") as f:
                json.dump(data, f, indent=2)
            return data
    except Exception as e:
        print(f"Error: {e}")

In [None]:
# Start Weaviate v1.31.0 in the background using docker
!docker run --detach -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.31.0

In [31]:
import weaviate
from weaviate.classes.config import Property, DataType, Configure
from weaviate.util import generate_uuid5

In [None]:
client = weaviate.connect_to_local()

In [None]:
client.collections.delete("NvidiaPDF")

In [None]:
nvidia_pdf = client.collections.create(
        name="NvidiaPDF",
        properties=[
            Property(name="paper_number", data_type=DataType.TEXT),
        ],
        vectorizer_config=[Configure.NamedVectors.none(
            name="colqwen",
            vector_index_config=Configure.VectorIndex.hnsw(
                multi_vector=Configure.VectorIndex.MultiVector.multi_vector(
                    encoding=Configure.VectorIndex.MultiVector.Encoding.muvera(),
                ),
            )
        )]
    )

In [None]:
embeddings = generate_or_load_cached_embeddings(force_generate=False)

In [None]:
for emb in embeddings:
    print(f"page_number: {emb["paper_number"]} embedding: {len(emb["embedding"])}")
print(f"Total: {len(embeddings)}")

In [None]:
collection = client.collections.get("NvidiaPDF")
with collection.batch.dynamic() as batch:
    for emb in embeddings:
        batch.add_object(
            uuid=generate_uuid5(emb["paper_number"]),
            properties={"paper_number": emb["paper_number"]}, vector={"colqwen": emb["embedding"]},
        )
    batch.flush()

In [None]:
from PIL import Image

def get_query_embedding(query: str):
    """Generates multi vector embedding for a query string"""
    query_emb = colqwen.multi_vectorize_text(query)
    return query_emb.cpu().to(dtype=torch.float32).numpy()

def perform_query_and_show_first_result(query):
    """A helper method which performs a vector search an shows first result"""
    res = collection.query.near_vector(
        near_vector=get_query_embedding(query),
        limit=1
    )
    def show_image(file_name):
        pages_folder = "./pdf/pages"
        image = Image.open(f"{pages_folder}/{file_name}")
        image.show()
    show_image(res.objects[0].properties["paper_number"])

In [None]:
perform_query_and_show_first_result("List of countries using AI")

In [None]:
perform_query_and_show_first_result("What is Nvidia's infrastructure roadmap")

In [None]:
perform_query_and_show_first_result("revenue and income charts")

In [None]:
perform_query_and_show_first_result("NVIDIA CUDA speedup")

In [None]:
perform_query_and_show_first_result("What are the plans for dividends?")