In [1]:
import warnings
from collections import defaultdict
from typing import Iterable, Literal, Mapping

import numpy as np
from nltk import word_tokenize
from nltk.translate.bleu_score import sentence_bleu
from rouge import Rouge
from tqdm import tqdm


warnings.filterwarnings("ignore")   # filter user warning for BLEU when overlap is 0


class QAEvaluation:
    def __init__(self, sources: list, preds: list, labels: list):
        assert isinstance(labels[0], list)
        self.sources = sources
        self.preds = preds
        self.labels = labels
        assert len(self.sources) == len(self.preds) == len(self.labels)

    def exact_match(self) -> float:
        return sum(1 for pred, label in zip(self.preds, self.labels) if pred in label) / len(self.preds)

    def bleu(self, n: Literal[1, 2, 3, 4]) -> float:
        # individual BLEU n-gram score
        pred_tokens = [word_tokenize(pred) for pred in self.preds]
        label_tokens = [[word_tokenize(label) for label in l_labels] for l_labels in self.labels]

        assert 1 <= n <= 4
        weights = [0, 0, 0, 0]
        weights[n - 1] = 1

        return sum(sentence_bleu(label_tok, pred_tok, weights=tuple(weights))
                   for pred_tok, label_tok in zip(pred_tokens, label_tokens)) / len(self.preds)

    def rouge(self, n: Literal[1, 2, 3, 4, 5, "l"], t: Literal["n", "l", "w"] = "n",
              stats: Literal["p", "r", "f"] = "p") -> float:
        """  
        stats: "p": precision; "r": recall; "f": f1
        t: Rouge type:
            ROUGE-N: Overlap of N-grams between the system and reference summaries.
            ROUGE-L: Longest Common Subsequence (LCS) based statistics. Longest common 
                        subsequence problem takes into account sentence level structure
                        similarity naturally and identifies longest co-occurring in 
                        sequence n-grams automatically.
            ROUGE-W: Weighted LCS-based statistics that favors consecutive LCSes.
        """
        assert n in {1, 2, 3, 4, 5, "l"}
        evaluator = Rouge(metrics=[f"rouge-{t}"], max_n=n)
        return sum(max(evaluator.get_scores(pred, label)[f"rouge-{n}"][stats] for label in labels)
                   for pred, labels in zip(self.preds, self.labels)) / len(self.preds)
    
    

In [2]:
# Functions for computing human performance

import pandas as pd
import numpy as np
from typing import Iterable, Literal, Mapping
from collections import defaultdict

def evaluate_preds(preds: Iterable[str], labels: Iterable[str], maxs:bool=True) -> [float]:
    """
    Evaluate scores for one person (with multiple scores)
    Input:
        preds: [pred1,pred2...]
        labelss:[label1,label2..]
        maxs: bool: (True: return maximum; False: return average)
                    score for each metric
    Return:
        [exact_match, BLEU1, BLEU2, BLEU3, ROUGE1, ROUGE2, ROUGE3]
    """
    evls = [QAEvaluation(["-"], [pred], [labels]) for pred in preds]
    scores = [evl.exact_match() for evl in evls],\
                *[[evl.bleu(i+1) for evl in evls]for i in range(3)], \
                *[[evl.rouge(i+1) for evl in evls] for i in range(3)]
    
    if maxs: # return maximum score for each metric
        return [max(s) for s in scores]
    else: # return average score for each metric
        return [np.average(s) for s in scores]


def evaluate_one_question(predss: Iterable[Iterable[str]],stdAns:str=None,maxs:bool=True) -> [[float],[float]]:
    '''
    Input:
        predss: humans' answers for one question
        stdAns: standard answer for this question.
        maxs: bool: (True: return maximum; False: return average)
                    score for each metric
    Return:
        based on stdAnswer:
        [person 1: [exact_match, BLEU1, BLEU2, BLEU3, ROUGE1, ROUGE2, ROUGE3]
         person 2: [exact_match, BLEU1, BLEU2, BLEU3, ROUGE1, ROUGE2, ROUGE3]
         ...]
        based on leave-one-human-out:
        [person 1: [exact_match, BLEU1, BLEU2, BLEU3, ROUGE1, ROUGE2, ROUGE3]
         person 2: [exact_match, BLEU1, BLEU2, BLEU3, ROUGE1, ROUGE2, ROUGE3]
         ...]
    '''
    assert stdAns or len(predss) >= 2, "You have to provide either the standard answer " +\
                                            "or more then 2 humans' answers for evaluation."
    
    if len(predss) == 1: # only one person
        return [evaluate_preds(predss[0],[stdAns],maxs=maxs)],[[]]
    
    stdScores = [] # stand answer scores
    leavOneScores = [] # leave one human scores
    for i in range(len(predss)): # for each human
        leavOneLabel = [pred for j in range(len(predss)) if j!=i for pred in predss[j]]
        if stdAns:
            stdScores.append(evaluate_preds(predss[i],[stdAns],maxs=maxs))
            leavOneLabel.append(stdAns)
        else:
            stdScores.append([])
        leavOneScores.append(evaluate_preds(predss[i],leavOneLabel,maxs=maxs))
    return stdScores, leavOneScores


def evaluate_persons(answerss:Iterable[Iterable[str]],workerss:Iterable[str],
                     stdAnswers:Iterable[str]=None, maxs:bool=True) -> [float]:
    
    def eva_scores(stdLeaScores:[[[float],[float]]]) -> [[float],[float]]:
        '''
        Average scores for one person / for all persons
        Input:
            stdLeaScores: [[stdScores1,leaOneScores1],[stdScores2,leaOneScores2],..]
        Output:
            [aveStdScore,aveLeaScore]
        '''
        return [list(np.average(ps,axis=0)) if
                (ps:=[si for s in stdLeaScores if (si:=s[i])!=[]])!=[] else []
                for i in (0,1)]
    
    if stdAnswers is None:
        stdAnswers = [None for _ in answerss]
    person_scores = defaultdict(list)
    for stdAns, answers, workers in zip(stdAnswers,answerss,workerss):
        std_s, lvo_s = evaluate_one_question(answers,stdAns=stdAns,maxs=maxs)
        for stds,lvos,worker in zip(std_s,lvo_s,workers): # TODOï¼šcope and test only one person & no stdAns
            person_scores[worker].append([stds,lvos])
    person_scores = dict(person_scores)
    for k in person_scores.keys():
        person_scores[k] = eva_scores(person_scores[k])
    return eva_scores(list(person_scores.values()))


In [3]:
ann_results = pd.read_csv("crowd_student_result.csv",converters={"answers": lambda x:eval(x),"workerids": lambda x:eval(x)})
org_ann = ann_results.groupby(["question","video","stdAnswer"]).\
    agg({"answers":list,"workerids":list}) # in case same video, same question, same stdAnswer in different HIT
org_ann[["answers","workerids"]] = org_ann[["answers","workerids"]].apply(lambda x: [b for a in x for b in a])

In [4]:
evaluate_persons(org_ann["answers"],org_ann["workerids"],
                 stdAnswers=list(org_ann.index.get_level_values(2)),
                 maxs=False)

[[0.0,
  0.18653746709917995,
  0.028589166522782587,
  0.007142857142857141,
  0.2500413503045082,
  0.028162393162393162,
  0.008333333333333333],
 [0.0,
  0.3933792034752316,
  0.17679080031589456,
  0.08567650441210481,
  0.4043737722685091,
  0.22641025641025642,
  0.1261904761904762]]