In [None]:
!pip install --upgrade vidore_benchmark==4.0.2
!pip install --upgrade pymupdf
!pip install openpyxl
! sudo apt install tesseract-ocr -y
!pip install pytesseract


In [None]:
from transformers import AutoConfig, AutoModel, AutoProcessor
from vidore_benchmark.retrievers.vision_retriever import VisionRetriever

import torch
from typing import List, Optional, Tuple, Union, TypeVar

from __future__ import annotations
import logging
from typing import ClassVar, List, Optional, Union, cast
import pandas as pd
from colpali_engine.models.qwen2_5.colqwen2_5.modeling_colqwen2_5 import ColQwen2_5
from colpali_engine.models.qwen2_5.colqwen2_5.processing_colqwen2_5 import ColQwen2_5_Processor
from colpali_engine.utils.torch_utils import get_torch_device
from dotenv import load_dotenv
from PIL import Image
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

T = TypeVar("T")


class ListDataset(Dataset[T]):
    def __init__(self, elements: List[T]):
        self.elements = elements

    def __len__(self) -> int:
        return len(self.elements)

    def __getitem__(self, idx: int) -> T:
        return self.elements[idx]


def averaged_st(models_list):
    
    state_dicts = [model.state_dict() for model in models_list]
    averaged_st =  models_list[0].state_dict()
    
    
    for key in averaged_st.keys():
        for i in range(1, len(state_dicts)):
            averaged_st[key] += state_dicts[i][key]
            
        averaged_st[key] /= len(state_dicts)
        
    
    return averaged_st


def score_multi_vector(
        qs: List[torch.Tensor],
        ps: List[torch.Tensor],
        batch_size: int = 128,
        device: Optional[Union[str, torch.device]] = None,
    ) -> torch.Tensor:
        """
        Compute the MaxSim score (ColBERT-like) for the given multi-vector query and passage embeddings.
        """
        device = device or get_torch_device("auto")

        if len(qs) == 0:
            raise ValueError("No queries provided")
        if len(ps) == 0:
            raise ValueError("No passages provided")

        scores_list: List[torch.Tensor] = []

        for i in range(0, len(qs), batch_size):
            scores_batch = []
            qs_batch = torch.nn.utils.rnn.pad_sequence(qs[i : i + batch_size], batch_first=True, padding_value=0).to(
                device
            )
            for j in range(0, len(ps), batch_size):
                ps_batch = torch.nn.utils.rnn.pad_sequence(
                    ps[j : j + batch_size], batch_first=True, padding_value=0
                ).to(device)
                scores_batch.append(torch.einsum("bnd,csd->bcns", qs_batch, ps_batch).max(dim=3)[0].sum(dim=2))
            scores_batch = torch.cat(scores_batch, dim=1).cpu()
            scores_list.append(scores_batch)

        scores = torch.cat(scores_list, dim=0)
        assert scores.shape[0] == len(qs), f"Expected {len(qs)} scores, got {scores.shape[0]}"

        scores = scores.to(torch.float32)
        return scores



#from vidore_benchmark.retrievers.registry_utils import register_vision_retriever

logger = logging.getLogger(__name__)

load_dotenv(override=True)


#@register_vision_retriever("colqwen2")
class ColQwenRetriever(VisionRetriever):
    """
    ColPali Retriever that implements the model from "ColPali: Efficient Document Retrieval
    with Vision Language Models".
    """

    emb_dim_query: ClassVar[int] = 128
    emb_dim_doc: ClassVar[int] = 128

    def __init__(
        self,
        model, 
        processor,
        device: str = "auto",
    ):
        super().__init__()
        self.device = get_torch_device(device)
        logger.info(f"Using device: {self.device}")
        self.model = model
        self.processor = processor
        print("Loaded custom processor.\n")

    @property
    def use_visual_embedding(self) -> bool:
        return True

    def process_images(self, images: List[Image.Image], **kwargs):
        return self.processor.process_images(images=images)

    def process_queries(self, queries: List[str], **kwargs):
        return self.processor.process_queries(queries=queries)

    def forward_queries(self, queries: List[str], batch_size: int, **kwargs) -> List[torch.Tensor]:
        dataloader = DataLoader(
            dataset=ListDataset[str](queries),
            batch_size=batch_size,
            shuffle=False,
            collate_fn=self.process_queries,
            num_workers=8
        )

        qs = []
        for batch_query in tqdm(dataloader, desc="Forward pass queries...", leave=False):
            with torch.no_grad():
                batch_query = {k: v.to(self.device) for k, v in batch_query.items()}
                embeddings_query = self.model(**batch_query)
                qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))

        return qs

    def forward_documents(self, passages: List[Image.Image], batch_size: int, **kwargs) -> List[torch.Tensor]:
        dataloader = DataLoader(
            dataset=ListDataset[Image.Image](passages),
            batch_size=batch_size,
            shuffle=False,
            collate_fn=self.process_images,
            
        )

        ds = []
        for batch_doc in tqdm(dataloader, desc="Forward pass documents...", leave=False):
            with torch.no_grad():
                batch_doc = {k: v.to(self.device) for k, v in batch_doc.items()}
                embeddings_doc = self.model(**batch_doc)
            ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
        return ds

    def get_scores(
        self,
        query_embeddings: Union[torch.Tensor, List[torch.Tensor]],
        passage_embeddings: Union[torch.Tensor, List[torch.Tensor]],
        batch_size: Optional[int] = 128,
    ) -> torch.Tensor:
        if batch_size is None:
            raise ValueError("`batch_size` must be provided for ColPaliRetriever's scoring")
        scores = score_multi_vector(query_embeddings, passage_embeddings, batch_size=batch_size,device=self.device)
        return scores



In [2]:
from datasets import load_dataset
from dotenv import load_dotenv
import pandas as pd
from vidore_benchmark.evaluation.evaluate import evaluate_dataset
from vidore_benchmark.retrievers.colpali_retriever import ColPaliRetriever


load_dotenv(override=False)


def eval_model(model_path,dataset_path):
    my_retriever = ColQwenRetriever(model_path)
    dataset = load_dataset(dataset_path, split="test")
    metrics = evaluate_dataset(my_retriever, dataset, batch_query=32,batch_doc=128,batch_score=128)
    return  metrics
load_dotenv(override=False)


def eval_retreiver(retriever,dataset_path):
    dataset = load_dataset(dataset_path, split="test")
    metrics = evaluate_dataset(retriever, dataset, batch_query=4,batch_doc=32,batch_score=128)
    return  metrics


def create_retriever(model_name, model_type):
    if model_type == "colqwen":
        model = ColQwen2_5.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            device_map="cuda",
            attn_implementation= 'flash_attention_2',
        ).eval()
        processor = ColQwen2_5_Processor.from_pretrained(
            model_name
        )
        retriever = ColQwenRetriever(model=model, processor=processor)
    elif model_type == "colpali":
        retriever = ColPaliRetriever(model_name=model_name)
    else:
        model = AutoModel.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            device_map="cuda", 
            attn_implementation= 'flash_attention_2',
            trust_remote_code=True
        ).eval()
        processor = AutoProcessor.from_pretrained(
            model_name, 
            trust_remote_code=True
        )
        retriever = ColQwenRetriever(model=model, processor=processor)
    return retriever



# VIDORE BEnchmark

In [None]:

model_names = [
    # 'Metric-AI/ColQwen2.5-3b-multilingual-v1.0',
    # 'vidore/colpali-v1.3',
    'Metric-AI/ColQwenStella-2b-multilingual'
]
model_types = [
    # "colqwen",
    # "colpali",
    "auto"
]


dataset_names = [
    'vidore/arxivqa_test_subsampled',
    'vidore/docvqa_test_subsampled',
    'vidore/infovqa_test_subsampled', 
    'vidore/tabfquad_test_subsampled',
    'vidore/tatdqa_test',
    'vidore/shiftproject_test',
    'vidore/syntheticDocQA_artificial_intelligence_test',
    'vidore/syntheticDocQA_energy_test',
    'vidore/syntheticDocQA_government_reports_test',
    'vidore/syntheticDocQA_healthcare_industry_test'
]
score_df = pd.DataFrame(index = model_names,columns=[i.split('/')[1] for i in dataset_names])


for model_name, model_type in zip(model_names, model_types):
    retriever = create_retriever(model_name, model_type)
    for data in dataset_names:
        metrics = eval_retreiver(retriever,data)
        score_df.loc[model_name,data.split('/')[1]] = metrics['ndcg_at_5']
        score_df.to_excel('./results-stella.xlsx')

print(score_df.mean().mean())



# VISRAG BEnchmark

In [None]:
model_names = ['vidore/colqwen2-v0.1','models/colqwen2-mixed','models/colqwen2-visrag']

dataset_names = ['Metric-AI/VisRAG-Ret-Test-ChartQA','Metric-AI/VisRAG-Ret-Test-ArxivQA',
                 'Metric-AI/VisRAG-Ret-Test-MP-DocVQA','Metric-AI/VisRAG-Ret-Test-InfoVQA',
                 'Metric-AI/VisRAG-Ret-Test-PlotQA','Metric-AI/VisRAG-Ret-Test-SlideVQA']

score_df = pd.DataFrame(index = model_names,columns=[i.split('/')[1] for i in dataset_names])


for data in dataset_names:
    for model_name in model_names:
        metrics = eval_model(model_name,data)
        score_df.loc[model_name,data.split('/')[1]] = metrics['ndcg_at_5']
        score_df.to_excel('Visrag_bench.xlsx')

