### Setup and Imports 

This section initializes the environment, sets up necessary constants, and imports required libraries and modules.

#### Environment Setup
Load environment variables from the `.env` file.

```python
from dotenv import load_dotenv
load_dotenv()


ANTIQUE_DATASET_PATH = os.getenv('ANTIQUE_DATASET_PATH')
RECALL_PRECISION_THRESHOLD = int(os.getenv('RECALL_PRECISION_THRESHOLD', 10))

In [None]:
from typing import Optional
from abc import ABC, abstractmethod
from typing import  Dict
from dotenv import load_dotenv
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from database.mongo_helper import MongoDBConnection
from collections import defaultdict
import re
from overrides import overrides
import os
from typing import List
import numpy as np
from gensim.models import Word2Vec
from numpy import ndarray
from common.constants import Locations
from common.file_utilities import FileUtilities
from database.chroma_helper import ChromaHelper
from text_processors.base_text_processor import BaseTextProcessor
from text_processors.antique_text_processor import AntiqueTextProcessor
from tabulate import tabulate

load_dotenv()

ANTIQUE_DATASET_PATH = os.getenv('ANTIQUE_DATASET_PATH')
RECALL_PRECISION_THRESHOLD = int(os.getenv('RECALL_PRECISION_THRESHOLD', 10))


Define Evaluation Metrics Calculators

In [None]:
class MetricCalculator(ABC):
    @abstractmethod
    def calculate(self, query_id: str, retrieved_docs: List[str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> float:
        pass

class AveragePrecisionCalculator(MetricCalculator):
    def calculate(self, query_id: str, retrieved_docs: List[str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> float:
        if query_id not in qrels:
            return 0.0

        relevant_docs = qrels[query_id]
        num_retrieved_relevant_docs = 0
        sum_precisions = 0.0
        num_relevant_docs = 0

        for i, doc_id in enumerate(retrieved_docs, start=1):
            if doc_id in relevant_docs and relevant_docs[doc_id] > 0:
                num_retrieved_relevant_docs += 1
                num_relevant_docs += 1
                precision_at_i = num_retrieved_relevant_docs / i
                sum_precisions += precision_at_i

        average_precision = 0 if num_relevant_docs == 0 else sum_precisions / num_relevant_docs
        return average_precision

class PrecisionCalculator(MetricCalculator):
    def calculate(self, query_id: str, retrieved_docs: List[str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> float:
        if not retrieved_docs:
            return 0.0

        relevant_docs = qrels.get(query_id, {})
        relevant_retrieved = sum(1 for doc_id in retrieved_docs[:k] if doc_id in relevant_docs)

        if not relevant_retrieved:
            return 0.0
        return relevant_retrieved / min(len(retrieved_docs), k)

class RecallCalculator(MetricCalculator):
    def calculate(self, query_id: str, retrieved_docs: List[str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> float:
        relevant_docs = qrels.get(query_id, {})
        relevant_retrieved = sum(1 for doc_id in retrieved_docs[:k] if doc_id in relevant_docs)
        total_relevant = sum(relevant_docs.values())
        return relevant_retrieved / total_relevant if total_relevant > 0 else 0

class ReciprocalRankCalculator(MetricCalculator):
    def calculate(self, query_id: str, retrieved_docs: List[str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> float:
        relevant_docs = qrels.get(query_id, {})

        if k is not None:
            retrieved_docs = retrieved_docs[:k]

        for i, doc in enumerate(retrieved_docs, start=1):
            doc_id = doc['doc_id']
            if doc_id in relevant_docs.keys() and relevant_docs[doc_id] > 0:
                return 1.0 / i
        return 0.0


Define Evaluation Manager

In [None]:
class EvaluationManager:
    def __init__(self, metric_calculators: List[MetricCalculator], matcher):
        self.metric_calculators = metric_calculators
        self.matcher = matcher

    def evaluate(self, queries: Dict[str, str], qrels: Dict[str, Dict[str, int]], k: Optional[int] = None) -> Dict[str, Dict[str, float]]:
        evaluation_results = {}

        for query_id, query_text in queries.items():
            retrieved_docs = self.matcher.match(query_text, k)
            metrics_results = {}
            for metric_calculator in self.metric_calculators:
                metric_name = metric_calculator.__class__.__name__
                if metric_name in ["AveragePrecisionCalculator", "RecallCalculator", "PrecisionCalculator"]:
                    retrieved_doc_ids = [doc_info['doc_id'] for doc_info in retrieved_docs]
                    metric_value = metric_calculator.calculate(query_id, retrieved_doc_ids, qrels, k=k)
                else:
                    metric_value = metric_calculator.calculate(query_id, retrieved_docs, qrels)
                metrics_results[metric_name] = metric_value

            evaluation_results[query_id] = metrics_results

        return evaluation_results


Load Data

In [None]:
class DatasetReader:
    def __init__(self, file_path: str):
        self.file_path = file_path

    @abstractmethod
    def load_as_dict(self) -> dict:
        pass

    @abstractmethod
    def read_queries(self) -> dict:
        pass

    @abstractmethod
    def read_qrels(self) -> defaultdict:
        pass
    
class AntiqueReader(DatasetReader):
    @overrides
    def load_as_dict(self) -> dict:
        key_value_pairs = {}
        with open(self.file_path, 'r', encoding='utf-8') as file:
            for line in file:
                key, value = line.strip().split('\t')
                key_value_pairs[key] = value
        return key_value_pairs

    @overrides
    def read_queries(self) -> dict:
        queries_path = os.environ.get('ANTIQUE_QUERIES_PATH')
        queries = {}
        with open(queries_path, 'r') as f:
            for line in f:
                query_id, query_text = line.strip().split('\t')
                queries[query_id] = query_text
        return queries

    @overrides
    def read_qrels(self) -> defaultdict:
        qrels_path = os.environ.get('ANTIQUE_QRELS_PATH')
        qrels = defaultdict(dict)
        with open(qrels_path, 'r') as f:
            for line_num, line in enumerate(f, 1):
                parts = re.split(r'\s+', line.strip())

                query_id, _, doc_id, relevance = parts
                qrels[query_id][doc_id] = int(relevance)

        return qrels
    
def load_antique_data():
    reader = AntiqueReader(ANTIQUE_DATASET_PATH)
    qrels = reader.read_qrels()
    queries = reader.read_queries()
    return qrels, queries

qrels, queries = load_antique_data()

Define Matchers

In [None]:
class QueryMatcher:
    def __init__(self, model_name: str):
        matrix_path: str = Locations.generate_matrix_path(model_name)
        self.matrix = FileUtilities.load_file(matrix_path)

        model_path: str = Locations.generate_model_path(model_name)
        self.model: TfidfVectorizer = FileUtilities.load_file(model_path)

        self.threshold = float(os.environ.get('SIMILARITY_THRESHOLD', 0.5))

        self.db_collection = MongoDBConnection.get_instance().get_collection(model_name)

    def __vectorize_query(self, query: str):
        return self.model.transform([query])

    def match(self, query: str, n=None):
        print(f"Query: {query}")

        query_vector = self.__vectorize_query(query)

        cos_similarities = cosine_similarity(self.matrix, query_vector)

        sorted_indices = np.argsort(cos_similarities, axis=0)[::-1].flatten()

        matching_docs_indices = []
        for i in sorted_indices:
            if cos_similarities[i].item() >= self.threshold:
                matching_docs_indices.append(i.item() + 1)

        matching_results = list(self.db_collection.find({"index": {"$in": matching_docs_indices}}))

        return sorted(
            matching_results,
            key=lambda x: matching_docs_indices.index(x['index']),
            reverse=False
        )
    

class AntiqueMatcher(QueryMatcher):
    def __init__(self):
        super().__init__(Locations.ANTIQUE_COLLECTION_NAME)

Evaluate Without Embedding

In [None]:
matcher = AntiqueMatcher()

metric_calculators = [
    AveragePrecisionCalculator(),
    PrecisionCalculator(),
    RecallCalculator(),
    ReciprocalRankCalculator()
]

evaluation_manager = EvaluationManager(metric_calculators, matcher)
evaluation_results_no_embedding = evaluation_manager.evaluate(queries, qrels, RECALL_PRECISION_THRESHOLD)

def calculate_average_metrics(evaluation_results):
    total_map = sum(metrics_results["AveragePrecisionCalculator"] for metrics_results in evaluation_results.values())
    total_mrr = sum(metrics_results["ReciprocalRankCalculator"] for metrics_results in evaluation_results.values())
    total_queries = len(evaluation_results)

    average_map = total_map / total_queries if total_queries > 0 else 0.0
    average_mrr = total_mrr / total_queries if total_queries > 0 else 0.0

    return average_map, average_mrr

average_map_no_embedding, average_mrr_no_embedding = calculate_average_metrics(evaluation_results_no_embedding)
print(f"Average MAP without embedding: {average_map_no_embedding:.6f}")
print(f"Average MRR without embedding: {average_mrr_no_embedding:.6f}")

table = []
for query_id, metrics in evaluation_results_no_embedding.items():
    row = [
        query_id,
        f"{metrics['PrecisionCalculator']:.6f}",
        f"{metrics['RecallCalculator']:.6f}"
    ]
    table.append(row)

print("\nDetailed Results Without Embedding:")
print(tabulate(table, headers=["Query ID", "Precision", "Recall"], tablefmt="pretty"))

Improving Precision With Embedding

Embedding Matcher

In [None]:
class BaseEmbeddingMatcher:

    def __init__(self, model_name: str, text_processor: BaseTextProcessor):
        self.vector_collection = ChromaHelper.get_instance().get_or_create_collection(model_name)
        self.vector_size = int(os.environ.get("VECTOR_SIZE", 500))
        self.model: Word2Vec = self.__load_model(model_name)
        self.text_processor = text_processor
        self.model_name = model_name

    def match(self, text: str, top: int = 10):
        processed_query: List[str] = self.text_processor.process_query(text)

        query_embeddings: List = self.vectorize_query(processed_query).tolist()

        result = self.vector_collection.query(
            query_embeddings=query_embeddings,
            n_results=top,
        )

        transformed_results = []
        ids = result.get('ids', [[]])[0]
        documents = result.get('documents', [[]])[0]
        distances = result.get('distances', [[]])[0]

        for doc_id, doc_content, doc_similarity in zip(ids, documents, distances):
            transformed_results.append({
                'doc_id': doc_id,
                'doc_content': doc_content,
                'similarity': doc_similarity,
            })

        return transformed_results

    def vectorize_query(self, query_words: list[str]) -> ndarray:

        query_vectors = [self.model.wv[word] for word in query_words if word in self.model.wv]

        if query_vectors:
            query_vec = np.mean(query_vectors, axis=0)
        else:
            query_vec = np.zeros(self.vector_size)

        return query_vec

    @staticmethod
    def __load_model(model_name: str):
        return FileUtilities.load_file(
            file_path=Locations.generate_embeddings_model_path(model_name)
        )



class AntiqueEmbeddingMatcher(BaseEmbeddingMatcher):
    def __init__(self):
        super().__init__(
            model_name='antique',
            text_processor=AntiqueTextProcessor()
        )

Evaluate With Embedding

In [None]:

matcher = AntiqueEmbeddingMatcher()
evaluation_manager = EvaluationManager(metric_calculators, matcher)
evaluation_results_with_embedding = evaluation_manager.evaluate(queries, qrels, RECALL_PRECISION_THRESHOLD)

average_map_with_embedding, average_mrr_with_embedding = calculate_average_metrics(evaluation_results_with_embedding)
print(f"Average MAP with embedding: {average_map_with_embedding:.6f}")
print(f"Average MRR with embedding: {average_mrr_with_embedding:.6f}")

table = []
for query_id, metrics in evaluation_results_with_embedding.items():
    row = [
        query_id,
        f"{metrics['PrecisionCalculator']:.6f}",
        f"{metrics['RecallCalculator']:.6f}"
    ]
    table.append(row)

print("\nDetailed Results With Embedding:")
print(tabulate(table, headers=["Query ID", "Precision", "Recall"], tablefmt="pretty"))