In [None]:
from typing import Dict, List, Callable, Tuple, Union, Callable
import string
import os
import json
import re
import numpy as np
from collections import Counter
from tqdm import tqdm
import numpy as np

class MultiHopEvaluator:
    @classmethod
    def get_all_alias(cls, ground_truth_id: str) -> List[str]:
        return {}

    @classmethod
    def normalize_answer(cls, s):
        def remove_articles(text):
            return re.sub(r'\b(a|an|the)\b', ' ', text)
        def white_space_fix(text):
            return ' '.join(text.split())
        def remove_punc(text):
            exclude = set(string.punctuation)
            return ''.join(ch for ch in text if ch not in exclude)
        def lower(text):
            return text.lower()
        if not isinstance(s, str):
            return ""
        return white_space_fix(remove_articles(remove_punc(lower(s))))

    @classmethod
    def exact_match_score(
        cls,
        prediction: str,
        ground_truth: Union[str, List[str]],
        ground_truth_id: Union[str, List[str]] = None
    ):
        if not prediction:
            return {'correct': 0, 'incorrect': 1}
        ground_truths = {ground_truth} if isinstance(ground_truth, str) else set(ground_truth)
        if ground_truth_id and isinstance(ground_truth_id, str):
            ground_truths.update(cls.get_all_alias(ground_truth_id))

        correct = np.max([int(cls.normalize_answer(prediction) == cls.normalize_answer(gt)) for gt in ground_truths])
        return {'correct': correct, 'incorrect': 1 - correct}

    @classmethod
    def f1_score(
        cls,
        prediction: str,
        ground_truth: Union[str, List[str]],
        ground_truth_id: Union[str, List[str]] = None
    ):
        final_metric = {'f1': 0, 'precision': 0, 'recall': 0}
        
        if not prediction:
            return final_metric
        ground_truths = {ground_truth} if isinstance(ground_truth, str) else set(ground_truth)
        if ground_truth_id and isinstance(ground_truth_id, str):
            ground_truths.update(cls.get_all_alias(ground_truth_id))
            
        for ground_truth in ground_truths:
            normalized_prediction = cls.normalize_answer(prediction)
            normalized_ground_truth = cls.normalize_answer(ground_truth)
            if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
                continue
            if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
                continue
            prediction_tokens = normalized_prediction.split()
            ground_truth_tokens = normalized_ground_truth.split()
            common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
            num_same = sum(common.values())
            if num_same == 0:
                continue

            precision = 1.0 * num_same / len(prediction_tokens)
            recall = 1.0 * num_same / len(ground_truth_tokens)
            f1 = (2 * precision * recall) / (precision + recall)
            for k in ['f1', 'precision', 'recall']:
                final_metric[k] = max(eval(k), final_metric[k])
        return final_metric
    
    def eval_answer(self, results_df, answer_col="Final Answer"):
        # for datasets don't have answer_ids, aliases
        em_list = []
        f1_list = []
        for i, row in results_df.iterrows():
            prediction = row[answer_col]
            ground_truth = row['ground_truth']
            em_list.append(self.exact_match_score(prediction, ground_truth, None)['correct'])
            f1_list.append(self.f1_score(prediction, ground_truth, None)['f1'])
        print(f"EM: {sum(em_list)/len(em_list):4f}\t F1: {sum(f1_list)/len(f1_list):4f}")

In [None]:
import pandas as pd

## TwoWikiHop

In [None]:
class WikiMultiHopEvaluator(MultiHopEvaluator):

    def __init__(self, data_path: str="data/multihop_data/2wikimultihopqa"): 
        # logger.info(f"Loading WikiMultiHopQA from {data_path}")
        dataset = []
        with open(os.path.join(data_path, 'dev.json'), 'r') as fin:
            js = json.load(fin)
            for example in tqdm(js):
                qid = example['_id']
                question = example['question']
                ans = example['answer']
                ans_id = example['answer_id']
                # ctxs = example['ctxs']
                dataset.append({
                    'qid': qid,
                    'question': question,
                    'answer': ans,
                    'answer_id': ans_id,
                    # 'ctxs': ctxs,
                })
        self.dataset = dataset
        self.dataset_from_qid = {entry['qid']: entry for entry in self.dataset}
        self.init_id_aliases(data_path)
        
    @classmethod
    def init_id_aliases(cls, data_path):
        cls.id_alias: Dict[str, List[str]] = {}
        with open(os.path.join(data_path, 'id_aliases.json'), 'r') as fin:
            for l in fin:
                l = json.loads(l)
                cls.id_alias[l['Q_id']] = l['aliases']

    @classmethod
    def get_all_alias(cls, ground_truth_id: str) -> List[str]:
        if ground_truth_id and ground_truth_id in cls.id_alias:
            return cls.id_alias[ground_truth_id]
        else:
            return []

    def get_real_prediction(self, pred):
        if "the answer is" in pred:
            beg = pred.find("the answer is") + len("the answer is") + 1
            pred = pred[beg:] # delete final "."
            if pred.endswith("</s>"):
                pred = pred[:len(pred) - len("</s>")]
            if pred.endswith("<|endoftext|>"):
                pred = pred[:len(pred) - len("<|endoftext|>")]
            if pred.endswith("."):
                pred = pred[:-1]
            return pred
        else:
            return pred
        
    def eval_answer(self, results_df, answer_col="Final Answer"):
        em_list = []
        f1_list = []
        for i, row in results_df.iterrows():
            prediction = row[answer_col]
            ground_truth = row['ground_truth']
            ground_truth_id = self.dataset_from_qid[row['qid']]['answer_id']
            em_list.append(self.exact_match_score(prediction, ground_truth, ground_truth_id)['correct'])
            f1_list.append(self.f1_score(prediction, ground_truth, ground_truth_id)['f1'])
        print(f"EM: {sum(em_list)/len(em_list):4f}\t F1: {sum(f1_list)/len(f1_list):4f}")

In [None]:

results_df = pd.concat([
    pd.read_json("outputs/twowikihop_finished.jsonl", lines=True),
    pd.read_json("outputs/twowikihop_llama3_rerun_failed_0612_0652/results.jsonl", lines=True),
])
print(len(results_df))


In [None]:
results_path = "__YOUR_RESULT.JSONL_PATH__"
results_df = pd.read_json(results_path, lines=True)
twowikihop_evaluator = WikiMultiHopEvaluator()
for column_name in ["Final Answer", "Final Step Answer", "Final Read Answer"]:
    print(column_name)
    twowikihop_evaluator.eval_answer(results_df=results_df, answer_col=column_name)

## Other Datasets

In [None]:
results_path = "__YOUR_RESULT.JSONL_PATH__"
results_df = pd.read_json(results_path, lines=True)
evaluator = MultiHopEvaluator()
for column_name in ["Final Answer", "Final Step Answer", "Final Read Answer"]:
    print(column_name)
    evaluator.eval_answer(results_df=results_df, answer_col=column_name)