# Evaluation of different embedding models on biomedical QA
Author: Marie Corradi

## Define evaluator class

In [1]:
from sentence_transformers import SentenceTransformer
from embeddings_training import ContrastiveAutoencoder
import numpy as np 
from sklearn.metrics.pairwise import cosine_similarity 
import json
import torch

class EmbeddingEvaluator:
    def __init__(self, model_type, model_path=None, embedding_model_name=None, k=5):
        """
        Initialize the evaluator with the specified model type.
        :param model_type: Either 'custom' for a trained model or 'pretrained' for a SentenceTransformer model.
        :param model_path: Path to the trained model (for custom model).
        :param embedding_model_name: Name of the pre-trained SentenceTransformer model (for pretrained model).
        :param k: Number of top passages to retrieve.
        """
        self.model_type = model_type
        self.k = k
        self.metrics_store = []
        
        # Load the appropriate model based on model type
        if model_type == "pretrained":
            self.embedding_model_name = embedding_model_name
            self.model = SentenceTransformer(embedding_model_name)  # Load SentenceTransformer model
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(device)

    def load_dataset(self, file_path):
        """
        Load dataset from JSON file and return questions and answers.
        :param file_path: Path to the JSON dataset file.
        :return: Tuple of (questions, answers)
        """
        with open(file_path, 'r') as file:
            data = json.load(file)

        questions = []
        answers = []

        for entry in data.values():
            questions.append(entry["question"])
            answers.extend(entry["answers"])

        return questions, answers

    def generate_embeddings(self, texts):
        """
        Generate embeddings for a list of texts using the pre-trained embedding model or custom model.
        :param texts: List of text strings.
        :return: Embeddings as tensors.
        """
        if self.model_type == "pretrained":
            return self.model.encode(texts, convert_to_tensor=True)  # SentenceTransformer's encode method

    def retrieve_top_k(self, question_embeddings, answer_embeddings):
        """
        Retrieve the top-k most relevant answers based on cosine similarity.
        :param question_embeddings: List of question embeddings.
        :param answer_embeddings: List of answer embeddings.
        :return: Top-k indices for each question.
        """
        # Check if the embeddings are on GPU and move them to CPU if necessary
        if question_embeddings.device != torch.device('cpu'):
            question_embeddings = question_embeddings.cpu().detach().numpy()
        if answer_embeddings.device != torch.device('cpu'):
            answer_embeddings = answer_embeddings.cpu().detach().numpy()
    
        # Convert to NumPy arrays for cosine similarity calculation
        #question_embeddings = question_embeddings.numpy()
        #answer_embeddings = answer_embeddings.numpy()

        cosine_sim = cosine_similarity(question_embeddings, answer_embeddings)
        top_k_indices = np.argsort(cosine_sim, axis=1)[:, ::-1][:, :self.k]
        return top_k_indices

    def compute_metrics(self, dataset_name, relevant_indices, top_k_indices):
        """
        Compute MRR, Precision@k, and Recall@k for the current dataset.
        :param dataset_name: Name or path of the dataset being evaluated.
        :param relevant_indices: List of relevant indices (correct answers) for each question.
        :param top_k_indices: List of top-k retrieved indices for each question.
        :return: Tuple of MRR, Precision@k, and Recall@k.
        """
        mrr = self.mean_reciprocal_rank(relevant_indices, top_k_indices)
        precision = self.precision_at_k(relevant_indices, top_k_indices)
        recall = self.recall_at_k(relevant_indices, top_k_indices)

        # Store the metrics for this dataset
        self.metrics_store.append({
            "Model": self.model_type if self.model_type == "custom" else self.embedding_model_name,
            "Dataset": dataset_name,
            "MRR": mrr,
            "Precision@k": precision,
            "Recall@k": recall
        })

        return mrr, precision, recall

    def mean_reciprocal_rank(self, relevant_indices, top_k_indices):
        # Rank of the first relevant result returned
        mrr_total = 0.0
        for i, relevant in enumerate(relevant_indices):
            for rank, retrieved in enumerate(top_k_indices[i]):
                if retrieved in relevant:
                    mrr_total += 1.0 / (rank + 1)
                    break
        return mrr_total / len(relevant_indices)

    def precision_at_k(self, relevant_indices, top_k_indices):
        # How many of the top k retrieved results are relevant
        precision_total = 0.0
        for i, relevant in enumerate(relevant_indices):
            retrieved_set = set(top_k_indices[i][:self.k])
            relevant_set = set(relevant)
            precision_total += len(retrieved_set.intersection(relevant_set)) / self.k
        return precision_total / len(relevant_indices)

    def recall_at_k(self, relevant_indices, top_k_indices):
        # How many of the relevant items are retrieved within the top k results
        recall_total = 0.0
        for i, relevant in enumerate(relevant_indices):
            retrieved_set = set(top_k_indices[i][:self.k])
            relevant_set = set(relevant)
            recall_total += len(retrieved_set.intersection(relevant_set)) / len(relevant_set)
        return recall_total / len(relevant_indices)

    def evaluate(self, dataset_file_paths):
        """
        Evaluate the embedding model on multiple datasets and compute metrics for each.
        :param dataset_file_paths: List of dataset file paths.
        """
        for dataset_file_path in dataset_file_paths:
            questions, answers = self.load_dataset(dataset_file_path)

            # Generate embeddings for questions and answers
            question_embeddings = self.generate_embeddings(questions)
            answer_embeddings = self.generate_embeddings(answers)

            # Retrieve the top-k results for each question
            top_k_indices = self.retrieve_top_k(question_embeddings, answer_embeddings)

            # Generate relevant indices 
            relevant_indices = self.get_relevant_indices(dataset_file_path, answers)

            # Compute and return the evaluation metrics for the current dataset
            mrr, precision, recall = self.compute_metrics(dataset_file_path, relevant_indices, top_k_indices)
            print(f"Dataset: {dataset_file_path}, MRR: {mrr:.3f}, Precision@{self.k}: {precision:.3f}, Recall@{self.k}: {recall:.3f}")

    def get_relevant_indices(self, dataset_file_path, answers):
        """
        Get the relevant indices (correct answers) for each question.
        :param dataset_file_path: Path to the dataset file.
        :param answers: List of all possible answers.
        :return: List of relevant indices for each question.
        """
        with open(dataset_file_path, 'r') as file:
            data = json.load(file)

        relevant_indices = []
        for entry in data.values():
            relevant = [answers.index(ans) for ans in entry["answers"]]
            relevant_indices.append(relevant)

        return relevant_indices

    def get_metrics_summary(self):
        """
        Retrieve the stored metrics across all evaluated datasets.
        :return: List of metric dictionaries for each dataset.
        """
        return self.metrics_store


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Evaluator usage function
def evaluate_model(datasets,model_type, model_path=None, embedding_model_name=None, k=5):
    # Initialize the evaluator with a pre-trained Sentence-BERT model
    evaluator = EmbeddingEvaluator(model_type, model_path=model_path, embedding_model_name=embedding_model_name, k=k)

    # Evaluate the model on all datasets
    evaluator.evaluate(datasets)

    # Retrieve stored metrics
    return(evaluator.get_metrics_summary())


## Evaluate embedding models

In [3]:
datasets = ['/home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json', '/home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json']

### General model

In [4]:
metrics_general = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'BAAI/bge-base-en-v1.5')

Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.932, Precision@5: 0.720, Recall@5: 0.549
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.925, Precision@5: 0.192, Recall@5: 0.958


In [5]:
metrics_general_1 = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'BAAI/bge-base-en-v1.5', k=1)

Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.895, Precision@1: 0.895, Recall@1: 0.199
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.901, Precision@1: 0.901, Recall@1: 0.901


### Nomic model

In [None]:
metrics_nomic = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'BAAI/bge-base-en-v1.5')

### PubMed model

In [6]:
metrics_pubmed = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'neuml/pubmedbert-base-embeddings')

Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.809, Precision@5: 0.590, Recall@5: 0.421
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.936, Precision@5: 0.195, Recall@5: 0.973


In [7]:
metrics_pubmed_1 = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'neuml/pubmedbert-base-embeddings', k=1)

Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.766, Precision@1: 0.766, Recall@1: 0.157
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.910, Precision@1: 0.910, Recall@1: 0.910


### MedEmbed model

In [8]:
metrics_med = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'abhinand/MedEmbed-base-v0.1')

You try to use a model that was created with version 3.2.0, however, your version is 2.7.0. This might cause unexpected behavior or errors. In that case, try to update to the latest version.





Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.928, Precision@5: 0.720, Recall@5: 0.546
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.955, Precision@5: 0.195, Recall@5: 0.976


In [10]:
metrics_med_1 = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'abhinand/MedEmbed-base-v0.1', k=1)

You try to use a model that was created with version 3.2.0, however, your version is 2.7.0. This might cause unexpected behavior or errors. In that case, try to update to the latest version.





Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.891, Precision@1: 0.891, Recall@1: 0.198
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.939, Precision@1: 0.939, Recall@1: 0.939


In [1]:
from sentence_transformers import SentenceTransformer
from embeddings_training import ContrastiveAutoencoder
from colbert import Indexer, Searcher
from colbert.infra import Run, ColBERTConfig
import numpy as np
import json
import torch

class EmbeddingEvaluator:
    def __init__(self, model_type, model_path=None, embedding_model_name=None, k=5, index_name="default"):
        self.model_type = model_type
        self.k = k
        self.index_name = index_name
        self.metrics_store = []

        # Initialize models based on model type
        if model_type == "custom":
            if model_path is None:
                raise ValueError("model_path must be provided for a custom model.")
            self.model = ContrastiveAutoencoder.from_pretrained(model_path)
        elif model_type == "pretrained":
            if embedding_model_name is None:
                raise ValueError("embedding_model_name must be provided for a pre-trained model.")
            self.model = SentenceTransformer(embedding_model_name)
        elif model_type == "colbert":
            if embedding_model_name is None:
                raise ValueError("embedding_model_name must be provided for a ColBERT model.")
            self.config = ColBERTConfig(doc_maxlen=512, nbits=2)
            self.indexer = Indexer(checkpoint=embedding_model_name, config=self.config)
            self.searcher = None  # Initialize later after indexing answers
        else:
            raise ValueError(f"Unsupported model type: {model_type}")
        
        if model_type in ["custom", "pretrained"]:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            self.model.to(device)

    def load_dataset(self, file_path):
        with open(file_path, 'r') as file:
            data = json.load(file)
        questions, answers = [], []
        for entry in data.values():
            questions.append(entry["question"])
            answers.extend(entry["answers"])
        return questions, answers

    def index_answers(self, answers):
        """Indexes the answers once for efficient retrieval in ColBERT."""
        if self.model_type == "colbert":
            self.indexer.index(name=self.index_name, collection=answers, overwrite="force_silent_overwrite")
            self.searcher = Searcher(index=self.index_name, config=self.config)

    def generate_question_embeddings(self, questions):
        """Generates embeddings for questions based on model type."""
        if self.model_type in ["custom", "pretrained"]:
            return self.model.encode(questions, convert_to_tensor=True)
        elif self.model_type == "colbert":
            # For ColBERT, retrieve based on query without generating embeddings directly
            return questions

    def retrieve_top_k(self, questions, answer_embeddings=None):
        """Retrieve the top-k most relevant answers for each question."""
        if self.model_type == "colbert":
            top_k_docs = [self.searcher.search(query, k=self.k) for query in questions]
            return [[doc[0] for doc in docs] for docs in top_k_docs]
        else:
            # For custom and pretrained models, use cosine similarity
            if questions.device != torch.device('cpu'):
                questions = questions.cpu().detach().numpy()
            if answer_embeddings.device != torch.device('cpu'):
                answer_embeddings = answer_embeddings.cpu().detach().numpy()
            cosine_sim = cosine_similarity(questions, answer_embeddings)
            return np.argsort(cosine_sim, axis=1)[:, ::-1][:, :self.k]

    def compute_metrics(self, dataset_name, relevant_indices, top_k_indices):
        mrr = self.mean_reciprocal_rank(relevant_indices, top_k_indices)
        precision = self.precision_at_k(relevant_indices, top_k_indices)
        recall = self.recall_at_k(relevant_indices, top_k_indices)
        self.metrics_store.append({
            "Model": self.model_type if self.model_type == "custom" else self.embedding_model_name,
            "Dataset": dataset_name,
            "MRR": mrr,
            "Precision@k": precision,
            "Recall@k": recall
        })
        return mrr, precision, recall

    def evaluate(self, dataset_file_paths):
        """Evaluate the embedding model on multiple datasets."""
        for dataset_file_path in dataset_file_paths:
            questions, answers = self.load_dataset(dataset_file_path)

            # ColBERT: Index answers once and reuse
            if self.model_type == "colbert":
                self.index_answers(answers)

            # Generate question embeddings or use questions as is for ColBERT
            question_embeddings = self.generate_question_embeddings(questions)
            answer_embeddings = self.generate_embeddings(answers) if self.model_type != "colbert" else answers

            # Retrieve the top-k results for each question
            top_k_indices = self.retrieve_top_k(question_embeddings, answer_embeddings)

            # Generate relevant indices
            relevant_indices = self.get_relevant_indices(dataset_file_path, answers)

            # Compute and display metrics
            mrr, precision, recall = self.compute_metrics(dataset_file_path, relevant_indices, top_k_indices)
            print(f"Dataset: {dataset_file_path}, MRR: {mrr:.3f}, Precision@{self.k}: {precision:.3f}, Recall@{self.k}: {recall:.3f}")

    def get_relevant_indices(self, dataset_file_path, answers):
        with open(dataset_file_path, 'r') as file:
            data = json.load(file)
        relevant_indices = []
        for entry in data.values():
            relevant = [answers.index(ans) for ans in entry["answers"]]
            relevant_indices.append(relevant)
        return relevant_indices

    def get_metrics_summary(self):
        return self.metrics_store


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Evaluator usage function
def evaluate_model(datasets,model_type, model_path=None, embedding_model_name=None, k=5):
    # Initialize the evaluator with a pre-trained Sentence-BERT model
    evaluator = EmbeddingEvaluator(model_type, model_path=model_path, embedding_model_name=embedding_model_name, k=k)

    # Evaluate the model on all datasets
    evaluator.evaluate(datasets)

    # Retrieve stored metrics
    return(evaluator.get_metrics_summary())

In [3]:
datasets = ['/home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json', '/home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json']

In [4]:
metrics_general = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'BAAI/bge-base-en-v1.5')

AttributeError: 'EmbeddingEvaluator' object has no attribute 'generate_embeddings'

In [5]:
metrics_pubmed = evaluate_model(datasets,model_type='pretrained',embedding_model_name = 'neuml/pubmedbert-base-embeddings')

Dataset: /home/mcorradi/researchdrive/BetterEmbedData/bioasq_test_set.json, MRR: 0.809, Precision@5: 0.590, Recall@5: 0.421
Dataset: /home/mcorradi/researchdrive/BetterEmbedData/pubmedqa_test_set.json, MRR: 0.936, Precision@5: 0.195, Recall@5: 0.973


In [None]:
metrics_colbert = evaluate_model(datasets,model_type='colbert',embedding_model_name = 'answerdotai/answerai-colbert-small-v1')



[Oct 30, 15:17:14] #> Note: Output directory .ragatouille/colbert/indexes/default already exists


#> Starting...


  self.scaler = torch.cuda.amp.GradScaler()
Process Process-4:
Traceback (most recent call last):
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/site-packages/colbert/infra/launcher.py", line 134, in setup_new_process
    return_val = callee(config, *args)
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/site-packages/colbert/indexing/collection_indexer.py", line 33, in encode
    encoder.run(shared_lists)
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/site-packages/colbert/indexing/collection_indexer.py", line 63, in run
    self.setup() # Computes and saves plan for whole collection
  File "/home/mcorradi/.conda/envs/embeddings/lib/python3.10/site-packages/colbert/ind

nranks = 1 	 num_gpus = 1 	 device=0
{
    "query_token_id": "[unused0]",
    "doc_token_id": "[unused1]",
    "query_token": "[Q]",
    "doc_token": "[D]",
    "ncells": null,
    "centroid_score_threshold": null,
    "ndocs": null,
    "load_index_with_mmap": false,
    "index_path": null,
    "index_bsize": 64,
    "nbits": 2,
    "kmeans_niters": 4,
    "resume": false,
    "pool_factor": {
        "val": 1
    },
    "clustering_mode": {
        "val": "hierarchical"
    },
    "protected_tokens": {
        "val": 0
    },
    "similarity": "cosine",
    "bsize": 64,
    "accumsteps": 1,
    "lr": 1e-5,
    "maxsteps": 15626,
    "save_every": null,
    "warmup": 781,
    "warmup_bert": null,
    "relu": false,
    "nway": 32,
    "use_ib_negatives": false,
    "reranker": false,
    "distillation_alpha": 1.0,
    "ignore_scores": false,
    "model_name": "answerdotai\/AnswerAI-ColBERTv2.5-small",
    "query_maxlen": 32,
    "attend_to_mask_tokens": false,
    "interaction": "colb

