## evaluator

In [None]:
from typing import Dict, List, Callable, Tuple, Union, Callable, Literal
import string
import re
import numpy as np
from collections import Counter
import numpy as np
import pandas as pd

class Evaluator:

    @classmethod
    def normalize_answer(cls, s):
        def remove_articles(text):
            return re.sub(r'\b(a|an|the)\b', ' ', text)
        def white_space_fix(text):
            return ' '.join(text.split())
        def remove_punc(text):
            exclude = set(string.punctuation)
            return ''.join(ch for ch in text if ch not in exclude)
        def lower(text):
            return text.lower()
        if not isinstance(s, str):
            return ""
        return white_space_fix(remove_articles(remove_punc(lower(s))))

    @classmethod
    def exact_match_score(
        cls,
        prediction: str,
        ground_truth: Union[str, List[str]],
    ):
        if not prediction:
            return {'correct': 0, 'incorrect': 1}
        ground_truths = {ground_truth} if isinstance(ground_truth, str) else set(ground_truth)

        correct = np.max([int(cls.normalize_answer(prediction) == cls.normalize_answer(gt)) for gt in ground_truths])
        return {'correct': correct, 'incorrect': 1 - correct}

    @classmethod
    def f1_score(
        cls,
        prediction: str,
        ground_truth: Union[str, List[str]],
    ):
        final_metric = {'f1': 0, 'precision': 0, 'recall': 0}
        
        if not prediction:
            return final_metric
        ground_truths = {ground_truth} if isinstance(ground_truth, str) else set(ground_truth)
            
        for ground_truth in ground_truths:
            normalized_prediction = cls.normalize_answer(prediction)
            normalized_ground_truth = cls.normalize_answer(ground_truth)
            if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
                continue
            if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
                continue
            prediction_tokens = normalized_prediction.split()
            ground_truth_tokens = normalized_ground_truth.split()
            common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
            num_same = sum(common.values())
            if num_same == 0:
                continue

            precision = 1.0 * num_same / len(prediction_tokens)
            recall = 1.0 * num_same / len(ground_truth_tokens)
            f1 = (2 * precision * recall) / (precision + recall)
            for k in ['f1', 'precision', 'recall']:
                final_metric[k] = max(eval(k), final_metric[k])
        return final_metric
    
    def eval_answer(self, results_df, answer_col="Final Answer"):
        # for datasets don't have answer_ids, aliases
        em_list = []
        f1_list = []
        for i, row in results_df.iterrows():
            prediction = row[answer_col]
            ground_truth = row['ground_truth']
            em_list.append(self.exact_match_score(prediction, ground_truth)['correct'])
            f1_list.append(self.f1_score(prediction, ground_truth)['f1'])
        print(f"EM: {sum(em_list)/len(em_list):4f}\t F1: {sum(f1_list)/len(f1_list):4f}")

In [None]:
def my_eval(pred_list, ground_truths):
    evaluator = Evaluator()
    em_list = []
    f1_list = []
    for prediction, ground_truth in zip(pred_list, ground_truths):
        em_list.append(evaluator.exact_match_score(prediction, ground_truth)['correct'])
        f1_list.append(evaluator.f1_score(prediction, ground_truth)['f1'])
    print(f"EM: {sum(em_list)/len(em_list):4f}\t F1: {sum(f1_list)/len(f1_list):4f}")

## Load Results

In [None]:
dataset_name: Literal['nq', 'tq', 'sq'] = 'nq'

raw_data = pd.read_json(f"./data/singlehop_data/processed_{dataset_name}.json")
ground_truth_list = raw_data['answer']

In [None]:
direct_results = pd.read_json("outputs/nq_llama2_7b/direct.jsonl", lines=True) # replace with your output file
rag_results = pd.read_json("outputs/nq_llama2_7b/rag.jsonl", lines=True) # replace with your output file

In [None]:
assert len(ground_truth_list) == len(direct_results)
assert 10 * len(ground_truth_list) == len(rag_results)

## Evaluate

In [None]:
THRESHOLD = -6.0
answer_list = []
for i, direct_res in direct_results.iterrows():
    direct_eigen_score = direct_res['eigen_score']
    if direct_eigen_score < THRESHOLD:
        answer_list.append(direct_res['answer'])
    else:
        rag_batch = rag_results.iloc[10*i : 10*i+10]
        best_answer = rag_batch.loc[rag_batch['eigen_score'].idxmin()]['answer']
        answer_list.append(best_answer)
my_eval(answer_list, ground_truth_list)