#### Libraries

In [None]:
%%capture
# must be imported to google colab L4 GPU
!pip install bitsandbytes
!pip install hf-xet
!pip install nltk

import nltk
nltk.download('punkt_tab')
nltk.download('stopwords')
from nltk.tokenize import sent_tokenize

import json
import os
import random
import re
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from tqdm import tqdm

import numpy as np
import torch
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.metrics.pairwise import cosine_similarity
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline,
)
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import seaborn as sns
from sentence_transformers import CrossEncoder, SentenceTransformer
from pathlib import Path

# mount drive
from google.colab import drive
drive.mount('/content/drive/')

#### 1/ Configurations and storing the collection

In [None]:
# collections and token for llama
COLLECTION_PATH = 'project_collection_2024_25.json' # change path to test diff json

HF_TOKEN = "YOUR_HF_API_TOKEN_HERE"
# personal token for 3.1--8B-instruct 3 models NO GO with current setup
# HF_TOKEN2   = token_path.read_text().strip()

# models used can be changed accordingly
BERT_MODELS = {
    "roberta": "deepset/roberta-base-squad2",
    "distilbert": "distilbert-base-cased-distilled-squad",
    "bert-small": "mrm8488/bert-small-finetuned-squadv2",
    "bert-large": "deepset/roberta-large-squad2",
}
LLAMA_MODEL = "meta-llama/Llama-2-7b-chat-hf" # meta-llama/Llama-3.2-1B-Instruct  3 MODEL NOGO with the prompt below
DEVICE = 0 if torch.cuda.is_available() else -1

# seed for reproducibility
SEED = 2
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# get the collection strings {id, title text, and list of qa}
@dataclass
class QA: # define qa
    question: str
    answer: str
    type: Optional[str] = None
    entity: Optional[str] = None
@dataclass
class Topic:
    id: str
    title: str
    text: str
    qa_list: List[QA]


def load_collection(path: str|Path) -> List[Topic]:
    with open(path, 'r', encoding='utf-8') as f:
        raw_data = json.load(f)

    topics=[]
    for entry in raw_data:
        # either 'text' or 'context' in topics
        content = entry.get('text') or entry.get('context')

        qa_items = []
        for qa_dict in entry.get('qa', []):
            question_text = qa_dict.get('question', "").strip()
            answer_text = qa_dict.get('answer',  "").strip()
            q_type = qa_dict.get('type')
            q_entity = qa_dict.get('entity')

            # append to qa
            qa_items.append(QA(
                question=question_text,
                answer=answer_text,
                type=q_type,
                entity=q_entity
            ))


        # topic to list
        topic_id = str(entry.get('id', ""))
        topic_title = entry.get('title', "").strip()

        # everything to topics
        topics.append(Topic(
            id=topic_id,
            title=topic_title,
            text=content,
            qa_list=qa_items
        ))

    return topics

#### 2 /Basic QA system with random word selection and evaluation metrics

In [None]:
# return one random word from text as baseline
def random_word(text: str) -> str:

    # find all words from the text
    words = re.compile(r"\b\w+\b").findall(text)

    # return random
    return random.choice(words)

# exact match and token F1 score
def calculate_em_f1(prediction: str, reference: str) -> Tuple[int, float]:
    # lowercase and strip whitespace
    pred_norm = prediction.strip().lower()
    gold_norm = reference.strip().lower()

    # exact match
    em_score = int(pred_norm == gold_norm)

    # tokenize on whitespace
    pred_tokens = pred_norm.split()
    gold_tokens = gold_norm.split()

    # count common tokens
    common_tokens = Counter(pred_tokens) & Counter(gold_tokens)
    num_common = sum(common_tokens.values())

    # If either has zero tokens, F1 is 0
    if not pred_tokens or not gold_tokens:
        return em_score, 0.0

    # precision and recall
    precision = num_common / len(pred_tokens)
    recall = num_common / len(gold_tokens)

    # F1
    if precision + recall == 0:
        f1_score = 0.0
    else:
        f1_score = 2 * precision * recall / (precision + recall)

    return em_score, f1_score

### Core functions

#### 3/ Question type and entity type prediction

In [None]:
class QuestionClassifier: # what worked best in assignment2 is BERT+SVM or RNFOREST we can also do tfidf but results are about 42% acc
    # factoid vs confirmation
    def __init__(self, sentence_embedder: SentenceTransformer):
        self.sentence_embedder = sentence_embedder # Store the embedder instance
        # classifier for questions
        self.type_classifier = RandomForestClassifier(random_state=SEED)
        # classifier for entities on factoids
        self.entity_classifier = RandomForestClassifier(random_state=SEED)

    # train both classifiers
    def train(self, qa_items: List[QA]) -> None:
        # get texts and labels
        questions = [item.question for item in qa_items]
        types = [item.type     for item in qa_items]

        # SentenceTransformer to get embeddings directly returns the feature matrix
        feature_matrix = self.sentence_embedder.encode(questions)

        self.type_classifier.fit(feature_matrix, types)

        # get indices of factoid questions
        factoid_idxs = [i for i, t in enumerate(types) if t == "factoid"]
        if factoid_idxs:
            # get features and entities for factoid questions
            factoid_features = feature_matrix[factoid_idxs]
            factoid_entities = [qa_items[i].entity for i in factoid_idxs]
            self.entity_classifier.fit(factoid_features, factoid_entities)


    # predict type of question
    def predict(self, question: str) -> Tuple[str, Optional[str]]:
        features = self.sentence_embedder.encode([question]) # Encode the single question

        # predict question type
        q_type = self.type_classifier.predict(features)[0]

        # if factoid predict the entity type
        entity = (
            self.entity_classifier.predict(features)[0]
            if q_type == "factoid" else
            None
        )
        return q_type, entity

#### 4/ Implement QA with different transformer models

In [None]:
# load int a streamlined pipeline all BERT models
class BERTpipeline:
    def __init__(self, device: int):
        self.pipelines: Dict[str, "transformers.Pipeline"] = {
            name: pipeline(
                task = "question-answering",
                model = path,
                tokenizer = path,
                device = device,
                # token = HF_TOKEN2 # can deactivate
            )
            for name, path in BERT_MODELS.items()
        }

    # get answer/confidence/latency for each model
    def answer(self, question: str, context: str) -> Dict[str, Tuple[str, float, float]]:
        res = {}
        for model_name, qa_pipe in self.pipelines.items():
            # get time
            start_time = time.time()

            # run the pipeline for each model
            output = qa_pipe(question=question, context=context)

            # time stop
            latency_seconds = time.time() - start_time

            # extract and clean lowercase and strip whitespace
            answer_text = output.get("answer", "").strip()
            confidence_score = output.get("score", 0.0)

            # store dict
            res[model_name] = (answer_text, confidence_score, latency_seconds)

        return res

#### 5/ Llama QA

In [None]:
LLAMA_QA_PROMPT = (
    """<s>[INST] <<SYS>>
    You are an AI that answers questions from texts.
    <</SYS>>
    Given the following report, answer the question with the shortest possible phrase.
    ## REPORT
    {context}
    ## QUESTION
    {question} [/INST]
    ##ANSWER
    """
)

class LlamaModelBase:
    def __init__(self, model_path: str, hf_token: str | None, device: int):
        self.enabled = torch.cuda.is_available() and bool(hf_token)
        self.model = None
        self.tokenizer = None

        quant_cfg = BitsAndBytesConfig(
            load_in_4bit = True,
            bnb_4bit_quant_type = "nf4",
            bnb_4bit_use_double_quant = True,
            bnb_4bit_compute_dtype = torch.bfloat16
        )

        self.tokenizer = AutoTokenizer.from_pretrained(
            model_path,
            token = hf_token
        )

        self.model = AutoModelForCausalLM.from_pretrained(
            model_path,
            device_map = "auto",
            quantization_config = quant_cfg,
            token = hf_token, 
            trust_remote_code = True
        ).eval()

    def _generate_text(self, prompt: str, max_new_tokens: int) -> str:
        if not self.enabled or self.model is None:
            return ""

        inputs = self.tokenizer(prompt, return_tensors="pt")
        inputs = {k: v.to(self.model.device) for k, v in inputs.items()}

        generated = self.model.generate(
            **inputs,
            max_new_tokens = max_new_tokens,
            do_sample = False,
            eos_token_id = self.tokenizer.eos_token_id
        )

        output_text = self.tokenizer.decode(
            generated[0],
            skip_special_tokens=True
        )
        return output_text


class LlamaQA(LlamaModelBase):
    def __init__(self, hf_token: str | None, device: int):
        super().__init__(LLAMA_MODEL, hf_token, device)

    def answer(self, question: str, context: str, max_new_tokens: int = 128) -> str:
        prompt_text = LLAMA_QA_PROMPT.format(context=context, question=question)
        output_text = self._generate_text(prompt_text, max_new_tokens)

        if "##ANSWER:" in output_text:
            return output_text.split("##ANSWER:")[-1].strip()
        else:
            lines = [line.strip() for line in output_text.splitlines() if line.strip()]
            return lines[-1] if lines else ""

#### 7/ Confirmation yer or no

In [None]:
# cross-encoder to decide yes or no
class ConfirmationAnswerer:
    def __init__(self, threshold: float = 0.5, device: int = DEVICE):
        self.threshold = threshold
        self.cross_encoder = CrossEncoder("cross-encoder/stsb-distilroberta-base", device=device) #best for this task
        # cross-encoder/qnli-distilroberta-base cross-encoder/ms-marco-MiniLM-L6-v2
        # We need a separate sentence embedder for finding relevant sentences within a topic
        self.sentence_embedder = None # will be passed from the evaluate function.

    # Set the sentence embedder
    def set_sentence_embedder(self, embedder: SentenceTransformer):
        self.sentence_embedder = embedder

    # predict
    def answer(self, question: str, full_topic_text: str) -> str:
        # split the full topic text into sentences
        sentences = sent_tokenize(full_topic_text)
        sentences = [s.strip() for s in sentences if s.strip()] # Clean empty sentences

        if not sentences:
            return "No" # No context so cant confirm

        # Find the most relevant sentence
        # reuse the approach from passageRetriever but at sentence level
        if self.sentence_embedder:
            question_embedding = self.sentence_embedder.encode([question])
            sentence_embeddings = self.sentence_embedder.encode(sentences)

            # calculate cosine similarity between question and each sentence
            similarities = cosine_similarity(question_embedding, sentence_embeddings)[0]

            # Get the top scoring sentence as context for the cross-encoder using top@1
            best_sentence_idx = np.argmax(similarities)
            context_for_cross_encoder = sentences[best_sentence_idx]
        else: # if not used use full text
            context_for_cross_encoder = full_topic_text

        # cross encoder with the refined context
        score = self.cross_encoder.predict([(question, context_for_cross_encoder)])[0]

        # apply threshold
        return "Yes" if score >= self.threshold else "No"

#### 9/ Passage retrieval

In [None]:
# rank a list of topics by relevance
class PassageRetriever:
    def __init__(self, device: int = DEVICE):
        self.model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L6-v2", device=device) # best so far
        # , cross-encoder/qnli-distilroberta-base
    # select index of most relevant topic
    def best_topic(self, question: str, topics: List[Topic]) -> int:
        # pair each text/question of each topic
        pairs = [(question, topic.text) for topic in topics]

        # get relevance scores
        scores = self.model.predict(pairs)

        # get hishest
        return int(np.argmax(scores))

# majority vote of the predictions
def majority_vote(predictions: List[str]) -> str:

    # count answer and return the answer with the highest count
    return Counter(predictions).most_common(1)[0][0]


#### eval

In [None]:
def evaluate(topics: List[Topic],
             hf_token: str | None = HF_TOKEN,
             device: int = DEVICE,
             show_progress: bool = True) -> None:

    ##### init all reusable functions ####
    BERTpipelines = BERTpipeline(device)
    llama_qa = LlamaQA(hf_token, device)
    passage_retriever = PassageRetriever()

    # Init Sentence transformer confirmation/and question classifier
    # Using a common, lightweight model for sentence embeddings
    # The model used here is the most downloaded
    sentence_embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2', device=f"cuda:{DEVICE}" if DEVICE != -1 else "cpu") # best model
    question_classifier = QuestionClassifier(sentence_embedder)
    confirmation_model = ConfirmationAnswerer(device=device)
    confirmation_model.set_sentence_embedder(sentence_embedder) # Pass the embeder to the confirmation model


    # classifier training and evaluation with proper train/test for no data leakage
    all_questions_list = [qa for topic in topics for qa in topic.qa_list]

    # split all questions
    train_classifier_qa_items, test_classifier_qa_items = train_test_split(
        all_questions_list,
        test_size=0.2,
        random_state=SEED,
    )

    question_classifier.train(train_classifier_qa_items)

    # eval the question classifier on test
    q_type_correct_eval = 0
    q_type_total_eval = 0
    entity_correct_eval = 0
    entity_total_eval = 0

    print("Evaluating Question/Entity type classifier on test set")
    for qa_item in test_classifier_qa_items:
        predicted_q_type, predicted_entity = question_classifier.predict(qa_item.question)
        q_type_total_eval += 1
        if predicted_q_type == qa_item.type:
            q_type_correct_eval += 1

        if qa_item.type == "factoid":
            entity_total_eval += 1
            if predicted_entity == qa_item.entity:
                entity_correct_eval += 1

    # Eval to find the optimal threshold for confirmation answerer
    thresholds = np.arange(0.1, 1.0, 0.05) # test thresholds from 0.1 to 0.95
    best_confirm_accuracy = -1
    optimal_threshold = 0.5
    LOW_CONFIDENCE_THRESHOLD = 0.35 # for semantic re-rank

    confirm_qa_items = [qa for qa in all_questions_list if qa.type == "confirmation"]

    for threshold in thresholds:
        current_confirm_correct = 0.0
        current_confirm_total = 0

        temp_confirmation_model = ConfirmationAnswerer(threshold=threshold, device=device)
        temp_confirmation_model.set_sentence_embedder(sentence_embedder) # Pass embedder

        for qa_item in confirm_qa_items:
            # Find the parent topic text for this qa_item
            parent_topic_text = None
            for topic in topics:
                if qa_item in topic.qa_list:
                    parent_topic_text = topic.text
                    break

            if parent_topic_text: # Ensure text is found
                prediction = temp_confirmation_model.answer(qa_item.question, parent_topic_text)
                current_confirm_correct += int(prediction.lower() == qa_item.answer.lower())
                current_confirm_total += 1

        if current_confirm_total > 0:
            current_accuracy = current_confirm_correct / current_confirm_total
            if current_accuracy > best_confirm_accuracy:
                best_confirm_accuracy = current_accuracy
                optimal_threshold = threshold

    print(f"Optimal confirmation threshold: {optimal_threshold:.2f} with accuracy: {best_confirm_accuracy:.2%}")
    confirmation_model.threshold = optimal_threshold

    # running totals for every metric we will report
    # EM_sum, F1_sum, Latency_sum, count
    per_model_scores = defaultdict(lambda: [0.0, 0.0, 0.0, 0])
    per_entity_scores = defaultdict(lambda: [0.0, 0.0, 0])


    baseline_em_sum = 0.0
    baseline_f1_sum = 0.0
    baseline_count = 0

    confirm_correct = 0.0
    confirm_total = 0
    retrieval_correct = 0.0
    retrieval_total = 0


    # Itr over every topic on every question
    # track 40 topics
    topic_iter = tqdm(enumerate(topics), desc="Topics") if show_progress else enumerate(topics) # track it

    for topic_index, topic in topic_iter:
        for qa in topic.qa_list: # Iterate over ALL questions in the original collection

            # Passage retrieval
            retrieval_total += 1
            if passage_retriever.best_topic(qa.question, topics) == topic_index:
                retrieval_correct += 1

            # Random word as baseline
            random_guess = random_word(topic.text)
            em, f1 = calculate_em_f1(random_guess, qa.answer)
            baseline_em_sum += em
            baseline_f1_sum += f1
            baseline_count  += 1

            # Confirmation of questions
            if qa.type == "confirmation": # will now use refined context and optimal threshold
                prediction = confirmation_model.answer(qa.question, topic.text)
                confirm_correct += int(prediction.lower() == qa.answer.lower())
                confirm_total += 1
                continue  # no EM/F1 for confirmation questions for BERT/LLaMA

            # BERT
            bert_outputs = BERTpipelines.answer(qa.question, topic.text)
            for tag, (answer, confidence, latency) in bert_outputs.items():
                em, f1 = calculate_em_f1(answer, qa.answer)
                agg = per_model_scores[tag]
                per_model_scores[tag] = [agg[0] + em, agg[1] + f1, agg[2] + latency, agg[3] + 1]

            # LLAMA-2
            llama_start_time = time.time()
            llama_answer = llama_qa.answer(qa.question, topic.text)
            llama_latency = time.time() - llama_start_time
            em, f1 = calculate_em_f1(llama_answer, qa.answer)
            agg = per_model_scores["llama"]
            per_model_scores["llama"] = [agg[0] + em, agg[1] + f1, agg[2] + llama_latency, agg[3] + 1]

            ### ensemble of models for to better the perfomance
            # choose the highest-confidence BERT answer + LLAMA vote
            best_bert_answer = ""
            best_bert_confidence = 0.0
            if bert_outputs:
                best_bert_tag, (best_bert_answer, best_bert_confidence, _) = max(bert_outputs.items(), key=lambda kv: kv[1][1])

            ensemble_answer_orig = majority_vote([best_bert_answer, llama_answer or best_bert_answer])
            em, f1 = calculate_em_f1(ensemble_answer_orig, qa.answer)
            agg = per_model_scores["ensemble"]
            per_model_scores["ensemble"] = [agg[0] + em, agg[1] + f1, agg[2], agg[3] + 1]

            ### semantic re-ranking
            semantic_rerank_answer = ""

            if best_bert_confidence < LOW_CONFIDENCE_THRESHOLD:
                candidate_answers = {best_bert_answer} # Use a set to get unique answers
                if llama_answer:
                    candidate_answers.add(llama_answer)

                candidate_answers = [ans for ans in candidate_answers if ans.strip()]

                if candidate_answers:
                    question_embedding = sentence_embedder.encode([qa.question])
                    candidate_embeddings = sentence_embedder.encode(candidate_answers)
                    similarities = cosine_similarity(question_embedding, candidate_embeddings)[0]
                    best_candidate_idx = np.argmax(similarities)
                    semantic_rerank_answer = candidate_answers[best_candidate_idx]
                else:
                    semantic_rerank_answer = best_bert_answer
            else:
                semantic_rerank_answer = best_bert_answer

            em, f1 = calculate_em_f1(semantic_rerank_answer, qa.answer)
            agg = per_model_scores["ensemble_rerank"]
            per_model_scores["ensemble_rerank"] = [agg[0] + em, agg[1] + f1, agg[2], agg[3] + 1]


            # Entity level
            if qa.entity:
                # Use the answer from the highest-confidence BERT model for entity level
                # Ensure BERT is not empty
                if bert_outputs:
                    entity_qa_answer = bert_outputs[max(bert_outputs.items(), key=lambda kv: kv[1][1])[0]][0]
                else:
                    entity_qa_answer = ""

                em, f1 = calculate_em_f1(entity_qa_answer, qa.answer)
                agg = per_entity_scores[qa.entity]
                per_entity_scores[qa.entity] = [agg[0] + em, agg[1] + f1, agg[2] + 1]

    ### Summary and visualization ###
    best_overall_model_name = "N/A"
    best_overall_model_f1 = 0.0
    model_performance_data_temp = [] # Collect data to determine overall best model
    for tag, (em_sum, f1_sum, latency_sum, n) in per_model_scores.items():
        if n > 0:
            avg_latency = latency_sum / n if latency_sum > 0 else 0
            model_performance_data_temp.append({
                "Model": tag,
                "EM": em_sum / n,
                "F1": f1_sum / n,
                "Avg Latency (s)": avg_latency,
                "N_Questions": n
            })

    df_models_all = pd.DataFrame(model_performance_data_temp).sort_values(by="F1", ascending=False)
    best_overall_model_name = df_models_all.iloc[0]['Model']
    best_overall_model_f1 = df_models_all.iloc[0]['F1']
    print(f"Overall best performing model: {best_overall_model_name} (F1: {best_overall_model_f1:.2%})")

    # overall perfomance
    print("Overall Performance")
    overall_data = {
        "Metric": ["Random-Word Baseline", "Question Type Prediction", "Factoid Entity Type Prediction",
                   "Passage Retrieval Accuracy"],
        "Value": [
            f"{baseline_em_sum / baseline_count:.2%} EM / {baseline_f1_sum / baseline_count:.2%} F1",
            f"{q_type_correct_eval / q_type_total_eval:.2%}" if q_type_total_eval > 0 else "N/A",
            f"{entity_correct_eval / entity_total_eval:.2%}" if entity_total_eval > 0 else "N/A",
            f"{retrieval_correct / retrieval_total:.2%}"
        ],

        "N_Questions": [
            baseline_count,
            q_type_total_eval if q_type_total_eval > 0 else 0,
            entity_total_eval if entity_total_eval > 0 else 0,
            retrieval_total
        ]
    }
    df_overall = pd.DataFrame(overall_data)
    print(df_overall.to_string(index=False))


    # confirmation question yes/no
    print("Confirmation question performance")
    confirm_accuracy = confirm_correct / confirm_total
    print(f"Confirmation question accuracy using optimal threshold: {confirm_accuracy:.2%} (N={confirm_total})")

    plt.figure(figsize=(6, 4))
    sns.barplot(x=['Confirmation Questions'], y=[confirm_accuracy])
    plt.title('Confirmation question accuracy with optimal threshold')
    plt.ylabel('Accuracy')
    plt.ylim(0, 1)
    plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: '{:.0%}'.format(y)))
    for p in plt.gca().patches:
        plt.gca().annotate(f'{p.get_height():.2%}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 5), textcoords='offset points', fontsize=10)
    plt.tight_layout()
    plt.show()


    ## Per model performance
    print("Per model performance for factoid questions only")
    model_performance_data = []
    for tag, (em_sum, f1_sum, latency_sum, n) in per_model_scores.items():
        if n > 0:
            model_performance_data.append({
                "Model": tag,
                "EM": em_sum / n,
                "F1": f1_sum / n,
                "Avg Latency (s)": latency_sum / n,
                "N_Questions": n
            })
    df_models = pd.DataFrame(model_performance_data).sort_values(by="F1", ascending=False)
    print(df_models.to_string(index=False, formatters={"EM": "{:.2%}".format, "F1": "{:.2%}".format, "Avg Latency (s)": "{:.5f}".format}))

    # Visualization
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    fig.suptitle('Per model performance: EM, F1-Score, and Average latency', fontsize=16)

    # EM and F1 vis
    df_melted_em_f1 = df_models.melt(id_vars=['Model'], value_vars=['EM', 'F1'], var_name='Metric type', value_name='Score')
    sns.barplot(x='Model', y='Score', hue='Metric type', data=df_melted_em_f1, ax=axes[0])
    axes[0].set_title('EM and F1')
    axes[0].set_ylabel('Score')
    axes[0].set_ylim(0, 1) # Scores are between 0 and 1
    axes[0].tick_params(axis='x', rotation=45)
    for p in axes[0].patches:
        axes[0].annotate(f'{p.get_height():.3f}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 5), textcoords='offset points', fontsize=8)


    # plot avg latency
    sns.barplot(x='Model', y='Avg Latency (s)', data=df_models.sort_values(by="Avg Latency (s)", ascending=False), ax=axes[1])
    axes[1].set_title('Average latency per model')
    axes[1].set_ylabel('seconds')
    axes[1].tick_params(axis='x', rotation=45)
    for p in axes[1].patches:
        axes[1].annotate(f'{p.get_height():.3f}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 5), textcoords='offset points', fontsize=8)

    plt.tight_layout()
    plt.show()

    ## Entity level performance of best model for each entity
    print("Entity level performance best overall")
    entity_performance_data = []
    for ent, (em_sum, f1_sum, n) in per_entity_scores.items():
        if n > 0:
            entity_performance_data.append({
                "Entity Type": ent,
                "EM": em_sum / n,
                "F1": f1_sum / n,
                "N_Questions": n
            })
    df_entities = pd.DataFrame(entity_performance_data).sort_values(by="F1", ascending=False)
    print(df_entities.to_string(index=False, formatters={"EM": "{:.2%}".format, "F1": "{:.2%}".format}))

    # visualization
    plt.figure(figsize=(12, 8))
    sns.barplot(x='Entity Type', y='F1', data=df_entities)
    plt.title(f'F1 score per entity type from answers from best confidence BERT per question')
    plt.ylabel('F1')
    plt.xlabel('Entity type')
    plt.xticks(rotation=45, ha='right')
    for p in plt.gca().patches:
        plt.gca().annotate(f'{p.get_height():.3f}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 5), textcoords='offset points', fontsize=8)
    plt.tight_layout()
    plt.show()

### Main code

In [None]:
topics = load_collection(COLLECTION_PATH)
evaluate(topics = topics, hf_token = HF_TOKEN)