In [None]:
from itertools import combinations
from statistics import mean
import json

### Inter Annotator Agreement

Using the _positive specific agreement_ (equal to the _F-measure_) as described in Hripcsak and Rothschild (2005), since negative cases, which are needed for _Cohen's Kappa_, are poorly defined for span-labeling. For our definition of agreement, we consider _exact span matches_ (as used in Wang et al. 2021). This leads to a definition 

$$
p_{pos} = \frac{2a}{2a + b + c} 
$$

where $a$ is the amount of spans both agree on exactly and $b$ is the amount of spans identified by annotator 1, not identified by annotator 2, and vice versa for $c$. This measure can, of course, only accommodate 2 annotators, so we measure the pairwise agreement for each pair of annotators.

In [None]:
ANNOTATION_FOLDER = '../../../data/interim/annotation/'

ANNOTATED_FILES = [
    'bogdan.jsonl',
    'gino.jsonl',
    'veron.jsonl'
]

In [None]:
def p_pos(a, b, c):
    return (2 * a) / (2 * a + b + c)

def get_agreement_datapoint(datapoint_1, datapoint_2):
    '''
    returns agreement (a, b and c) for two annotators at a single datapoint
    assumes that there is only one label class
    '''

    if datapoint_1['text'] != datapoint_2['text']: # something in the text alignment has gone wrong
        raise Exception("Text not equal!")
    
    # spans found by annotator 1
    spans_1 = set( 
        [
            (label[0], label[1]) # start and end of span
            for label in datapoint_1['label']
        ]
    )

    # spans found by annotator 2
    spans_2 = set(
        [
            (label[0], label[1]) # start and end of span
            for label in datapoint_2['label']
        ]
    )

    a = len(
        spans_1.intersection(spans_2) # spans both annotators share
    )

    b = len(spans_1) - a
    c = len(spans_2) - a

    return a, b, c

def get_agreement(annotator_1, annotator_2):
    '''
    returns p_pos for all datapoints of two annotators
    '''
    annotator_1 = sorted(annotator_1, key=lambda x: x['id'])
    annotator_2 = sorted(annotator_2, key=lambda x: x['id'])

    a = 0
    b = 0
    c = 0

    for datapoint_1, datapoint_2 in zip(annotator_1, annotator_2):
        a_i, b_i, c_i = get_agreement_datapoint(datapoint_1, datapoint_2)

        a += a_i
        b += b_i
        c += c_i

    return p_pos(a, b, c)


def pairwise_agreement(annotators):
    ''' 
    returns the p_pos for all combinations of two annotators
    '''

    agreements = [

        (
            (annotator_1[1], annotator_2[1]), # indexes of the compared annotators
            get_agreement(annotator_1[0], annotator_2[0])
        )

        for annotator_1, annotator_2
            in combinations(
                zip(
                    annotators,
                    range(1, len(annotators) + 1)
                ), 
                2
            )
    ]

    return agreements

In [None]:
# runs above calculations on our data

annotator_lists = []

for annotator_file in ANNOTATED_FILES:
    with open(ANNOTATION_FOLDER + 'annotations/' + annotator_file, 'r', encoding='utf-8') as file:
        annotator_list = [
            json.loads(line)
            for line in file.readlines()
        ]
        annotator_lists.append(sorted(annotator_list, key=lambda x: x['id']))


print(pairwise_agreement(annotator_lists))
print(mean(x[1] for x in pairwise_agreement(annotator_lists)))