## Load the data

In [6]:
import os
import json


def load_data_by_language(
    embeddings_directory: str, language: str = "english"
) -> tuple[list[dict], list[dict]]:
    chunks, questions = [], []
    chunk_id_counter = 0
    file_paths = get_file_paths(embeddings_directory, language, sort=True)

    for file_path in file_paths:
        with open(file_path, "r") as f:
            data = json.load(f)

        old_to_new_id_map = {}
        file_chunks = data["chunks"]
        for chunk in file_chunks:
            old_id = chunk["id"]
            new_id = chunk_id_counter
            old_to_new_id_map[old_id] = new_id
            chunk["id"] = new_id
            chunks.append(chunk)
            chunk_id_counter += 1

        file_questions = data["question_answer_pairs"]
        for question in file_questions:
            old_chunk_id = question["chunk_id"]
            if old_chunk_id in old_to_new_id_map:
                question["chunk_id"] = old_to_new_id_map[old_chunk_id]
                questions.append(question)

    print(
        f"{language.capitalize()} - Chunks: {len(chunks)}, Questions: {len(questions)}"
    )
    return chunks, questions


def get_file_paths(embeddings_directory: str, language: str, sort: bool = False):
    file_paths = []

    for file in os.listdir(embeddings_directory):
        if not file.endswith(".json"):
            continue

        if language == "english":
            languages = ["arabic"]
            if not any(language in file.lower() for language in languages):
                file_paths.append(os.path.join(embeddings_directory, file))
        else:
            if language.lower() in file.lower():
                file_paths.append(os.path.join(embeddings_directory, file))

    if sort:
        file_paths.sort()

    return file_paths


embeddings_directory = "../data/embeddings/"
chunks_arabic, questions_arabic = load_data_by_language(embeddings_directory, "arabic")
chunks_english, questions_english = load_data_by_language(
    embeddings_directory, "english"
)

Arabic - Chunks: 120, Questions: 1209
English - Chunks: 120, Questions: 1209


## Benchmarks

In [None]:
import numpy as np
import string

from typing import Any

from rich.box import HEAVY
from rich.text import Text
from rich.table import Table
from rich.console import Console

from ranx import Run, Qrels, compare
from ranx.data_structures.report import Report
from sklearn.metrics.pairwise import cosine_similarity


def create_qrels_from_data(questions: list[dict]) -> Qrels:
    qrels_dict = {}
    for i, question in enumerate(questions):
        query_id = f"q_{i}"
        correct_chunk_id = question["chunk_id"]
        qrels_dict[query_id] = {str(correct_chunk_id): 1}

    return Qrels(qrels_dict)


def create_run_from_data(
    chunks: list[dict], questions: list[dict], model_name: str
) -> Run:
    run_dict = {}

    chunk_embeddings = np.array([chunk["embeddings"][model_name] for chunk in chunks])
    question_embeddings = np.array(
        [question["embeddings"][model_name] for question in questions]
    )
    similarity_scores = cosine_similarity(question_embeddings, chunk_embeddings)

    chunk_ids = [str(chunk["id"]) for chunk in chunks]
    for i, scores_for_one_question in enumerate(similarity_scores):
        chunk_scores = {
            chunk_id: score
            for chunk_id, score in zip(chunk_ids, scores_for_one_question)
        }

        query_id = f"q_{i}"
        run_dict[query_id] = chunk_scores

    return Run(run_dict, name=model_name)


def print_benchmark_report(report: Report) -> None:
    model_to_letter = {}
    for letter, name in zip(string.ascii_lowercase, report.model_names):
        model_to_letter[name] = letter

    report.rounding_digits = 4
    cell_data = {}
    max_widths = {metric: len(metric) for metric in report.metrics}

    for current_model_name in report.model_names:
        cell_data[current_model_name] = {}
        for metric in report.metrics:
            superscripts = ""
            current_model_score = report.results[current_model_name][metric]

            for other_model_name in report.model_names:
                if current_model_name == other_model_name:
                    continue

                comparison_key = {current_model_name, other_model_name}
                p_value = report.comparisons[comparison_key][metric]["p_value"]
                other_model_score = report.results[other_model_name][metric]

                if current_model_score > other_model_score and p_value < report.max_p:
                    superscripts += model_to_letter[other_model_name]

            score_str = f"{current_model_score:.{report.rounding_digits}f}"
            sorted_superscripts = "".join(sorted(superscripts))
            cell_data[current_model_name][metric] = (score_str, sorted_superscripts)

            total_len = len(score_str)
            if sorted_superscripts:
                space_len = 1
                total_len += space_len + len(sorted_superscripts)

            if total_len > max_widths[metric]:
                max_widths[metric] = total_len

    max_scores = {metric: -float("inf") for metric in report.metrics}
    min_scores = {metric: float("inf") for metric in report.metrics}
    for model_name in report.model_names:
        for metric in report.metrics:
            score = report.results[model_name][metric]
            if score > max_scores[metric]:
                max_scores[metric] = score
            if score < min_scores[metric]:
                min_scores[metric] = score

    console = Console()
    table = Table(show_header=True, header_style="bold bright_magenta", box=HEAVY)
    table.add_column("#", style="dim")
    table.add_column("Model", style="yellow")

    for metric in report.metrics:
        table.add_column(metric, justify="left", width=max_widths[metric])

    table.add_column("Average", justify="right", style="bold cyan")

    model_averages = {}
    for model_name in report.model_names:
        avg_score = np.mean(
            [report.results[model_name][metric] for metric in report.metrics]
        )
        model_averages[model_name] = avg_score

    sorted_models = sorted(model_averages.items(), key=lambda x: x[1], reverse=True)
    best_model_name = sorted_models[0][0]

    for model_name, avg_score in sorted_models:
        model_name_cell_value = model_name
        if model_name == best_model_name:
            model_name_cell_value = (
                f"[bold bright_green]{model_name}[/bold bright_green]"
            )

        row_data: list[Any] = [
            f"[dim]{model_to_letter[model_name]}[/dim]",
            model_name_cell_value,
        ]

        for metric in report.metrics:
            cell_text = Text()
            score = report.results[model_name][metric]
            score_part, superscript_part = cell_data[model_name][metric]

            if score == max_scores[metric]:
                cell_text.append(score_part, style="bold bright_green")
            elif score == min_scores[metric]:
                cell_text.append(score_part, style="bold bright_red")
            else:
                cell_text.append(score_part)

            if superscript_part:
                cell_text.append(" ")
                cell_text.append(superscript_part, style="bold bright_cyan")

            row_data.append(cell_text)

        row_data.append(f"{avg_score:.4f}")
        table.add_row(*row_data)

    console.print(table)

### 1. Questions (English) & Chunks (English)

In [22]:
runs_english = []
models_to_benchmark = list(chunks_english[0]["embeddings"].keys())
for current_model_name in models_to_benchmark:
    run = create_run_from_data(
        chunks=chunks_english,
        questions=questions_english,
        model_name=current_model_name,
    )
    runs_english.append(run)
qrels_english = create_qrels_from_data(questions_english)

metrics = ["mrr", "recall@1", "recall@5", "ndcg@5"]
report_english_english = compare(
    qrels=qrels_english,
    runs=runs_english,
    metrics=metrics,
    max_p=0.05,
    stat_test="fisher",
)
print_benchmark_report(report_english_english)

### 2. Questions (Arabic) & Chunks (English)

In [23]:
runs_english = []
models_to_benchmark = list(chunks_english[0]["embeddings"].keys())
for current_model_name in models_to_benchmark:
    run = create_run_from_data(
        chunks=chunks_english,
        questions=questions_arabic,
        model_name=current_model_name,
    )
    runs_english.append(run)
qrels_arabic = create_qrels_from_data(questions_arabic)

metrics = ["mrr", "recall@1", "recall@5", "ndcg@5"]
report_arabic_english = compare(
    qrels=qrels_arabic,
    runs=runs_english,
    metrics=metrics,
    max_p=0.05,
    stat_test="fisher",
)
print_benchmark_report(report_arabic_english)

### 3. Questions (English) & Chunks (Arabic)

In [24]:
runs_arabic = []
models_to_benchmark = list(chunks_english[0]["embeddings"].keys())
for current_model_name in models_to_benchmark:
    run = create_run_from_data(
        chunks=chunks_arabic,
        questions=questions_english,
        model_name=current_model_name,
    )
    runs_arabic.append(run)
qrels_english = create_qrels_from_data(questions_english)

metrics = ["mrr", "recall@1", "recall@5", "ndcg@5"]
report_english_arabic = compare(
    qrels=qrels_english,
    runs=runs_arabic,
    metrics=metrics,
    max_p=0.05,
    stat_test="fisher",
)
print_benchmark_report(report_english_arabic)

### 4. Questions (Arabic) & Chunks (Arabic)

In [25]:
runs_arabic = []
models_to_benchmark = list(chunks_english[0]["embeddings"].keys())
for current_model_name in models_to_benchmark:
    run = create_run_from_data(
        chunks=chunks_arabic,
        questions=questions_arabic,
        model_name=current_model_name,
    )
    runs_arabic.append(run)
qrels_arabic = create_qrels_from_data(questions_arabic)

metrics = ["mrr", "recall@1", "recall@5", "ndcg@5"]
report_arabic_arabic = compare(
    qrels=qrels_arabic,
    runs=runs_arabic,
    metrics=metrics,
    max_p=0.05,
    stat_test="fisher",
)
print_benchmark_report(report_arabic_arabic)