## Load data

In [120]:
import logging
import warnings
import pandas as pd

from datasets import get_dataset_config_names

warnings.filterwarnings("ignore")

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.debug("info")

logging.getLogger("urllib3").setLevel(logging.WARNING),
logging.getLogger('opensearch').setLevel(logging.WARNING),

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [16]:
subsets = get_dataset_config_names('rungalileo/ragbench')

subsets

['covidqa',
 'cuad',
 'delucionqa',
 'emanual',
 'expertqa',
 'finqa',
 'hagrid',
 'hotpotqa',
 'msmarco',
 'pubmedqa',
 'tatqa',
 'techqa']

In [177]:
data = {}

for subset in subsets:
    df = pd.read_parquet(f'hf://datasets/rungalileo/ragbench/{subset}')
    
    df['question_id'] = df.index
    df['n_relevant'] = df['all_relevant_sentence_keys'].apply(lambda x: len(x) if type(x) is list else 0)
    
    df.sort_values(by=['id', 'n_relevant'], ascending=[False, False], inplace=True)
    df.drop_duplicates(subset=['id'], keep='first', inplace=True)

    logger.info("Dataset named '%s' downloaded with shape: '%s'", subset, df.shape)
    data[subset] = df.copy()

INFO:__main__:Dataset named 'covidqa' downloaded with shape: '(1765, 28)'
INFO:__main__:Dataset named 'cuad' downloaded with shape: '(2550, 28)'
INFO:__main__:Dataset named 'delucionqa' downloaded with shape: '(913, 28)'
INFO:__main__:Dataset named 'emanual' downloaded with shape: '(659, 28)'
INFO:__main__:Dataset named 'expertqa' downloaded with shape: '(2027, 28)'
INFO:__main__:Dataset named 'finqa' downloaded with shape: '(8281, 28)'
INFO:__main__:Dataset named 'hagrid' downloaded with shape: '(4532, 28)'
INFO:__main__:Dataset named 'hotpotqa' downloaded with shape: '(2697, 28)'
INFO:__main__:Dataset named 'msmarco' downloaded with shape: '(2690, 28)'
INFO:__main__:Dataset named 'pubmedqa' downloaded with shape: '(12250, 28)'
INFO:__main__:Dataset named 'tatqa' downloaded with shape: '(16552, 28)'
INFO:__main__:Dataset named 'techqa' downloaded with shape: '(905, 28)'


## Embedder

In [10]:
from pydantic import BaseModel


class EmbedderSettings(BaseModel):
    batch_size: int = 16
    model_name: str
    type_model: str
    dimension: int
    prefix_query: str
    prefix_document: str

In [11]:
e5_embedder_settings = EmbedderSettings(batch_size=16, 
                                     model_name='intfloat/multilingual-e5-base', 
                                     type_model="", 
                                     dimension=768, 
                                     prefix_query="query: {}",
                                     prefix_document="passage: {}")

In [12]:
import abc
from typing import List

import more_itertools
import numpy as np
import torch
import torch.nn.functional as F
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer, XLMRobertaModel, XLMRobertaTokenizer


class IEmbedder(abc.ABC):
    def __init__(self):
        if torch.cuda.is_available():
            self.device = torch.device("cuda")
        else:
            self.device = torch.device("cpu")

    @abc.abstractmethod
    def encode(self, sentences: List[str], doc_type: str) -> np.ndarray:
        """Calculate sentences embedding(s)"""


class Embedder(IEmbedder):
    def __init__(self, settings: EmbedderSettings):
        super().__init__()
        self._settings = settings
        self.batch_size = self._settings.batch_size
        self.model_type = self._settings.type_model
        self.prefix_query = self._settings.prefix_query
        self.prefix_document = self._settings.prefix_document

        if self.model_type == 'e5':
            self.model = XLMRobertaModel.from_pretrained(self._settings.model_name).to(self.device)
            self.tokenizer = XLMRobertaTokenizer.from_pretrained(self._settings.model_name)
        else:
            self.model = AutoModel.from_pretrained(self._settings.model_name).to(self.device)
            self.tokenizer = AutoTokenizer.from_pretrained(self._settings.model_name)

    @staticmethod
    def average_pool(last_hidden_states: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0)
        return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]

    def encode(self, sentences: List[str], doc_type: str) -> np.ndarray:
        sentences = self.preprocess_sentences(sentences, doc_type)
        embeddings = torch.tensor([]).to(self.device)

        for batch in more_itertools.chunked(sentences, self.batch_size):
            tokenized_batch = self.tokenizer(batch, max_length=512, padding=True,
                                             truncation=True, return_tensors='pt').to(self.device)

            with torch.no_grad():
                outputs = self.model(**tokenized_batch).last_hidden_state
                embed = self.average_pool(outputs, tokenized_batch['attention_mask'])

            torch.cuda.empty_cache()

            for tensor in embed:
                embeddings = torch.cat((embeddings, tensor.unsqueeze(0)), 0)

        return np.array([torch.Tensor.cpu(emb) for emb in F.normalize(embeddings, dim=-1)])

    def preprocess_sentences(self, sentences: List[str], doc_type: str) -> List[str]:
        if doc_type == 'query':
            return [self.prefix_query.format(sentence) for sentence in sentences]
        elif doc_type == 'document':
            return [self.prefix_document.format(sentence) for sentence in sentences]
        return sentences

In [13]:
embedder = Embedder(e5_embedder_settings)

## Vector Database

In [83]:
import yaml
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost:5434/ugragdb"

engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

In [84]:
from pgvector.sqlalchemy import Vector
from sqlalchemy import Column, Date, ForeignKey, Integer, String, Text


dimension = 768
table_class_cache = {}


def create_document_table_class(table_name: str):
    if table_name in table_class_cache:
        return table_class_cache[table_name]
        
    class DBTable(Base):
        __tablename__ = table_name

        id = Column(String, primary_key=True, index=True)
        doc_id = Column(String, nullable=False)
        question_id = Column(String, nullable=False)
        text = Column(Text, nullable=False)
        vector = Column(Vector(dimension))

    table_class_cache[table_name] = DBTable

    return DBTable


for table_name in subsets:
    DocumentTableClass = create_document_table_class(table_name)
Base.metadata.create_all(bind=engine)

In [178]:
db = SessionLocal()

for subset in subsets:
    DocumentTableClass = table_class_cache[subset]
    logger.info("Table '%s' loading...", DocumentTableClass.__table__)
    table_df = data[subset]

    for question_id, document_sentences in zip(table_df['question_id'].values, table_df['documents_sentences'].values), total=len(table_df):
        for doc_group in document_sentences:
            for doc in doc_group:
                doc_id = doc[0]
                doc_text = doc[1]
                key_id = f"{question_id}_{doc_id}"
    
                embedding = embedder.encode([doc_text], doc_type="document")[0]
    
                db_doc = DocumentTableClass(id=key_id, doc_id=doc_id,
                                      question_id=str(question_id),
                                      text=doc_text,
                                      vector=embedding)
                db.add(db_doc)
        db.commit()

INFO:__main__:Table 'covidqa' loading...
INFO:__main__:Table 'cuad' loading...
INFO:__main__:Table 'delucionqa' loading...
INFO:__main__:Table 'emanual' loading...
INFO:__main__:Table 'expertqa' loading...
INFO:__main__:Table 'finqa' loading...
INFO:__main__:Table 'hagrid' loading...
INFO:__main__:Table 'hotpotqa' loading...
INFO:__main__:Table 'msmarco' loading...
INFO:__main__:Table 'pubmedqa' loading...
INFO:__main__:Table 'tatqa' loading...
INFO:__main__:Table 'techqa' loading...


## ElasticSearch

In [109]:
import logging
from typing import Any, Dict, Iterator, List

import more_itertools
from opensearchpy import OpenSearch, OpenSearchException
from opensearchpy.helpers import bulk

logger = logging.getLogger(__name__)


def create_index(index_name: str, os_client: OpenSearch) -> None:
    mapping: Dict = {
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "doc_id": {"type": "keyword"},
                "question_id": {"type": "keyword"}
            }
        }
    }

    if not os_client.indices.exists(index=index_name):
        os_client.indices.create(index=index_name, body=mapping)
        logger.info(f"Successfully created index {index_name}")


def load(df: pd.DataFrame) -> Iterator[Any]:
    for _, row in df.iterrows():
        try:
            yield generate_document_source(row.to_dict())
        except Exception:
            raise

def generate_document_source(row: Dict) -> Dict:
    result = {
        "id": row['key_id'],
        "text": row['doc_text'],
        "doc_id": row['doc_id'],
        "question_id": row['question_id']
    }

    return result


def update_search(df: pd.DataFrame, os_client: OpenSearch, index_name: str, batch_size: int = 500) -> None:
    create_index(index_name, os_client)
    total_inserted_docs: int = 0
    total_errors: int = 0

    for chunk in more_itertools.ichunked(load(df), batch_size):
        bucket_data = []
        for document in chunk:
            cur = {
                "_index": index_name,
                "_source": document,
            }
            if 'id' in document:
                cur['_id'] = str(document['id'])
            bucket_data.append(cur)
        try:
            inserted, errors = bulk(os_client, bucket_data, max_retries=4, raise_on_error=False)
            errors_num = len(errors) if isinstance(errors, list) else errors  # type: ignore
            logger.debug(f"{inserted} docs successfully inserted by bulk with {errors_num} errors")
            total_inserted_docs += inserted
            total_errors += errors_num
            if isinstance(errors, list):  # type: ignore
                for error in errors:  # type: ignore
                    logger.error(f"Doc was not inserted with error: {error}")
        except OpenSearchException as e:
            logger.exception(f"Error while pushing data to elasticsearch: {e}")
            raise


In [110]:
os_client = OpenSearch(([{"host": "localhost", "port": 9200}]))

In [122]:
def get_fulltext_df(subset: str) ->pd.DataFrame:
    doc_ids, doc_texts, key_ids, question_ids = [], [], [], []

    for question_id, document_sentences in zip(data[subset]['question_id'].values, 
                                               data[subset]['documents_sentences'].values):
        for doc_group in document_sentences:
            for doc in doc_group:
                doc_ids.append(doc[0])
                doc_texts.append(doc[1])
                key_ids.append(f"{question_id}_{doc[0]}")
                question_ids.append(str(question_id))

    logger.info("for subset '%s' got data to load to index with size: '%s'", subset, len(key_ids))
    return pd.DataFrame({'key_id': key_ids, 'question_id': question_ids, 'doc_id': doc_ids, 'doc_text': doc_texts})

In [124]:
for subset in subsets:
    fulltext_df = get_fulltext_df(subset)
    update_search(fulltext_df, os_client, subset)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1765/1765 [00:00<00:00, 84152.12it/s]
INFO:__main__:for subset 'covidqa' got data to load to index with size: '32392'
INFO:__main__:Successfully created index covidqa
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 913/913 [00:00<00:00, 69366.90it/s]
INFO:__main__:for subset 'delucionqa' got data to load to index with size: '36121'
INFO:__main__:Successfully created index delucionqa
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 659/659 [00:00<00:00, 107487.71it/s]
INFO:__main__:for subset 'emanual' got data to load to index with size: '18812'
INFO:__main__:Successfully created index emanual
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2027/2027

## Retrieval

In [151]:
def retrieve_semantic_search(subset: str, query: str, question_id: str, top_k: int = 40, similarity_threshold: float = 0.8) -> list:
    query_vector = embedder.encode([query], doc_type="query")[0].tolist()
    DocumentTableClass = table_class_cache[subset]
    
    results = (
        db.query(
            DocumentTableClass,
            DocumentTableClass.vector.cosine_distance(query_vector).label("distance"),
        )
        .filter(
            DocumentTableClass.vector.cosine_distance(query_vector) < similarity_threshold,
            DocumentTableClass.question_id == str(question_id)
        )
        .order_by("distance")
        .limit(top_k)
        .all()
    )
    return [{"text": result.DocumentTableClass.text, 
             "doc_id": result.DocumentTableClass.doc_id,
             "score": result[1]} for result in results]

In [152]:
def retrieve_fulltext_search(subset: str, query: str, question_id: str, top_k: int = 40) -> list:
    query: Dict = {"query": {
        "bool": {
          "must": [
            {
              "match": {
                "text": query
              }
            },
            {
              "term": {
                "question_id": {
                  "value": str(question_id)
                }
              }
            }
          ]
        }
      }, "size": top_k}
    response: dict = os_client.search(index=subset, body=query)

    return [{"text": hit["_source"]['text'], 
             "doc_id": hit["_source"]['doc_id'],
             "score": hit["_score"]} for hit in response['hits']['hits']]

In [162]:
def retrieve_for_subset(subset: str) -> dict:
    results = {'semantic_search_results': {}, 'fulltext_search_results': {}}

    for question_id, query in tqdm(zip(data[subset]['question_id'].values, data[subset]['question'].values), total=len(data[subset])):
        results['semantic_search_results'][str(question_id)] = retrieve_semantic_search(subset, query, question_id)
        results['fulltext_search_results'][str(question_id)] = retrieve_fulltext_search(subset, query, question_id)

    return results

## Reranker

In [163]:
from sentence_transformers.cross_encoder import CrossEncoder

model = CrossEncoder("cross-encoder/stsb-roberta-base")

In [164]:
def rerank_serp(subset: str) -> dict:
    combined_search_results_v1 = {}

    retrieved = retrieve_for_subset('delucionqa')
    
    for question_id in data[subset]['question_id'].values:
        values = retrieved['fulltext_search_results'][str(question_id)] + \
                 retrieved['semantic_search_results'][str(question_id)]
    
        unique_data = {entry['doc_id']: entry for entry in values}
        docs = list(unique_data.values())
    
        query = data[subset].loc[data[subset]['question_id']==question_id]['question'].values[0]
        corpus = [val['text'] for val in docs]
        
        sentence_combinations = [[query, sentence] for sentence in corpus]
        scores = model.predict(sentence_combinations)
        
        for score, doc in zip(scores, docs):
            doc['ml_score'] = score
        
        combined_search_results_v1[str(question_id)] = sorted(docs, key=lambda x: x['ml_score'], reverse=True)

    return combined_search_results_v1

## Metrics

In [166]:
def calculate_precision_recall(true_relevance: list[str], retrieved_documents: list[str], at_k: int = 10) -> tuple[float, float, float]:
    true_relevance = set(true_relevance)
    retrieved_documents = set(retrieved_documents[:at_k])
    
    true_positives = true_relevance.intersection(retrieved_documents)
    
    precision = len(true_positives) / len(retrieved_documents)
    recall = len(true_positives) / len(true_relevance)
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return precision, recall, f1_score


def calculate_mrr(true_relevance: list[str], retrieved_documents: list[str], ks: list[int] = [1, 3, 5, 10]) -> dict[int, float]:
    true_relevance = set(true_relevance)
    mrr_scores = []
    
    for k in ks:
        rank_found = 0
        for i, doc_id in enumerate(retrieved_documents[:k], start=1):
            if doc_id in true_relevance:
                rank_found = i
                break
        mrr_scores.append(1 / rank_found if rank_found > 0 else 0)
    
    return mrr_scores


import pandas as pd
import numpy as np

def calculate_metrics(subset: str, df: pd.DataFrame, search_results: dict, model_name: str = 'deepvk/USER-base') -> pd.DataFrame:
    metrics = {
        k: {
            'precisions': [],
            'recalls': [],
            'f1s': [],
            'mrrs': []
        } for k in [1, 3, 5, 10]
    }
    
    for question_id, true_relevance in zip(df['question_id'].values, df['all_relevant_sentence_keys'].values):
        if len(true_relevance) == 0:
            continue
            
        relevances = [val['doc_id'] for val in search_results[str(question_id)]]
        
        mrr_values = calculate_mrr(true_relevance, relevances)

        for idx, k in enumerate([1, 3, 5, 10]):
            metrics[k]['mrrs'].append(mrr_values[idx])
        
            precision, recall, f1 = calculate_precision_recall(true_relevance, relevances, k)
            metrics[k]['precisions'].append(precision)
            metrics[k]['recalls'].append(recall)
            metrics[k]['f1s'].append(f1)

    rows = []
    for k in [1, 3, 5, 10]:
        if not metrics[k]['precisions']:  # Skip if no data
            continue
            
        rows.append({
            'model_name': model_name,
            'subset': subset,
            'k': k,
            'precision': np.mean(metrics[k]['precisions']),
            'recall': np.mean(metrics[k]['recalls']),
            'f1': np.mean(metrics[k]['f1s']),
            'mrr': np.mean(metrics[k]['mrrs'])
        })
    
    return pd.DataFrame(rows)[['model_name', 'subset', 'k', 'precision', 'recall', 'f1', 'mrr']]

In [185]:
metrics = pd.DataFrame()

for subset in subsets:
    metrics = pd.concat([metrics, calculate_metrics(subset, rerank_serp(subset))])

metrics

Unnamed: 0,model_name,subset,k,precision,recall,f1,mrr
0,intfloat/multilingual-e5-base,covidqa,1,0.437,0.312,0.361,0.428
1,intfloat/multilingual-e5-base,covidqa,3,0.394,0.427,0.407,0.481
2,intfloat/multilingual-e5-base,covidqa,5,0.358,0.502,0.415,0.513
3,intfloat/multilingual-e5-base,covidqa,10,0.301,0.621,0.402,0.542
4,intfloat/multilingual-e5-base,cuad,1,0.518,0.394,0.445,0.551
5,intfloat/multilingual-e5-base,cuad,3,0.467,0.512,0.486,0.589
6,intfloat/multilingual-e5-base,cuad,5,0.423,0.587,0.489,0.613
7,intfloat/multilingual-e5-base,cuad,10,0.367,0.703,0.481,0.641
8,intfloat/multilingual-e5-base,delucionqa,1,0.638,0.591,0.612,0.703
9,intfloat/multilingual-e5-base,delucionqa,3,0.594,0.672,0.629,0.731
