In [1]:
from typing import List
import numpy as np
from scipy.stats import mode
import pandas as pd



def benchmark_labelers(
        labelers_high_precision_pos: List[int],
        labelers_high_precision_neg: List[int],
        labels: List[List[int]],
        consider_disagreeing_benchmark_labels: bool = True,
        pos_benchmark_by_majority: bool = False,
        neg_benchmark_by_majority: bool = False,
        labeler_names_to_benchmark: List[str] = None, 
        labeler_names: List[str] = None):
    """
    given labels and the ids of labelers with trusty positive labels and the ids of labelers with trusty negative labels, 
    this function estimates ground truth labels and provides metrics for estimating the reliability of the labelers.

    background:
    In many use cases, some labelers are known to be more trustworthy, while the performance of others is unknown.
    This is especially the case if some labels are costly assigned by human annotators and others by cheap heuristics.
    Due to vast amounts of data to be labeled, even manually evaluating labels provdied by all labelers can be out of scope. 
    Goal of this micro project is to provide a practical heuristic for a heuristic initial evaluation of labelers.


    


    parameters:
    labelers_high_precision_pos: List[int]
    ids of labelers that are expected to make few mistakes when predicting positive labels

    labelers_high_precision_neg: List[int]
    ids of labelers that are expected to make few mistakes when predicting negative labels

    labels: List[List[int]]
    labels of all labelers including the high precision labelers for both classes.
    labels[i][j] = label assigned by labeler <i> to data point <j>

    consider_disagreeing_benchmark_labels: bool = True
    for data point i, labelers_high_precision_pos predict a positive label and labelers_high_precision_neg predict a negative label.
    if consider_disagreeing_benchmark_labels is True, both labels are considered "correct" in the benchmark.
    if consider_disagreeing_benchmark_labels is False, the data point is considered to be abstain.

    pos_benchmark_by_majority: bool = False
    if True: the benchmark label is considered to be positive, if the majority vote of the labelers_high_precision_pos is positive
    if False: the benchmark label is considered to be positive, if any of the labelers_high_precision_pos predicts positive

    neg_benchmark_by_majority: bool = False
    if True: the benchmark label is considered to be negative, if the majority vote of the labelers_high_precision_neg is negative
    if False: the benchmark label is considered to be negative, if any of the labelers_high_precision_neg predicts negative

    labeler_names: List[str] = None
    names of the labelers which should be benchmarked against the estimated benchmark labels

    labeler_names_to_benchmark: List[str] = None
    names/string_ids of the labelers


    


    Result interpretation (for positive labeling):
    Labeler has: high Precision + high Recall + P_Additional > 0: 
    Labeler performs similar to the high precision benchmark labelers on the benchmarked observations and is thus likely trustworthy.
    The additional positive lables are thus more likely to be trusteds.

    Labeler has: high Precision + low Recall + P_Additional > 0:
    Labeler precisely predicts positive labels on benchmarked observations and thus provides positive labels that are likely trustworthy.
    The additional positive labels are thus more likely to be trusteds.
    Additional negative labels should not be trusted.

    Labeler has: low Precision + high Recall + P_Additional > 0:
    Labeler often predicts positive label but produces many false positives.
    If a data point is labeled as negative, it is unlikely that the true label is postive.
    The positive labels from this labeler on the other hand don't provide much information.

    The higher P_Abstain, the less reliable are the labelers Precision and Recall.
    The higher N_Abstain, the less reliable is the labelers Recall.

    note: 
    Precision = TP / (TP + FP)
    Recall = TP / (TP + FN) 


    


    example usage:
    labeler_names = [
        "Labeler with High Precision Pos 1", 
        "Labeler with High Precision Pos 2",  
        "Labeler with High Precision Neg 1", 
        "Labeler with High Precision Neg 2", 
        "Labeler with Perfect Labels", 
        "Labeler with Completely Wrong Labels", 
        "Unknwon Labeler 1",
        "Unknwon Labeler 2",
        "Unknwon Labeler 3",
        "Unknwon Labeler 4",
        "Unknwon Labeler 5"] 

    labeler_names_to_benchmark = [
        "Labeler with Perfect Labels", 
        "Labeler with Completely Wrong Labels", 
        "Unknwon Labeler 1",
        "Unknwon Labeler 2",
        "Unknwon Labeler 3",
        "Unknwon Labeler 4",
        "Unknwon Labeler 5"]

    # unknown ground truth labels = [1, 0, 1, 1, 0, 0, 1]

    labels = [[1, 0, -1, 1, -1, 1, 0],  #  high precision pos 1
            [-1, -1, 1, -1, -1, -1, 1],  # high precision pos 2
            [-1, 0, -1, -1, 0, -1, -1],  # high precision neg 1
            [-1, 0, -1, -1, -1, 0, -1],  # high precision neg 2
            [1, 0, 1, 1, 0, 0, 1],  # perfect
            [0, 1, 0, 0, 1, 1, 0],  # completely wrong

            [1, -1, -1, 1, 0, 1, 1],  # unknown
            [-1, -1, -1, 1, -1, -1, 1],  # unknown
            [-1, -1, 1, 0, 0, 0, 1],  # unkown
            [1, 0, 1, 0, 0, 0, 0],  # unknown
            [1, -1, 1, 1, 0, 1, 0]  # unknown
            ]  

    out = benchmark_labelers(
        labelers_high_precision_pos=[0, 1],
        labelers_high_precision_neg=[2, 3],
        labels=labels,
        labeler_names=labeler_names,
        labeler_names_to_benchmark=labeler_names_to_benchmark)
    benchmark_results = out["benchmark_result"]
    """
    labels = np.array(labels)

    # benchmark 
    # pos
    if pos_benchmark_by_majority:
        benchmark_pos = np.where((labels != -1)[labelers_high_precision_pos], labels[labelers_high_precision_pos], np.nan)
        benchmark_pos = mode(benchmark_pos, nan_policy="omit")[0]
        benchmark_pos = [int(i) for i in np.where(benchmark_pos)[0]]
    else:
        benchmark_pos = [i for i, val in enumerate(np.any(labels[labelers_high_precision_pos] == 1, axis=0)) if val]
    # neg
    if neg_benchmark_by_majority:
        benchmark_neg = np.where((labels != -1)[labelers_high_precision_neg], labels[labelers_high_precision_neg], np.nan)
        benchmark_neg = mode(benchmark_neg, nan_policy="omit")[0]
        benchmark_neg = [int(i) for i in np.where(benchmark_neg)[0]]
    else:
        benchmark_neg = [i for i, val in enumerate(np.any(labels[labelers_high_precision_neg] == 0, axis=0)) if val]

    disagreement = list(set(benchmark_pos) & set(benchmark_neg))
    benchmark_labels = len(set(benchmark_pos) | set(benchmark_neg))
    
    print("benchmark labels:", benchmark_labels, "/", len(labels[0]))
    print("benchmark disagrees in", len(disagreement), "label(s)")

    if not consider_disagreeing_benchmark_labels:
        benchmark_pos = [i for i in benchmark_pos if i not in disagreement]
        benchmark_neg = [i for i in benchmark_neg if i not in disagreement]
    df_result_rows = []
    for i in range(len(labels)):
        # skip labelers with high precision if wanted
        if labeler_names_to_benchmark is not None:
            if labeler_names[i] in labeler_names_to_benchmark:
                tp = 0
                fp = 0
                tn = 0
                fn = 0
                abstain_pos = 0
                abstain_neg = 0
                additional_pos = 0
                additional_neg = 0
                additional_abstain = 0
                for j in range(len(labels[i])):
                    if (j in benchmark_pos):
                        if labels[i][j] == 1:
                            tp += 1
                        elif labels[i][j] == -1:
                            abstain_pos +=1
                        else:
                            fp += 1
                    if (j in benchmark_neg):
                        if labels[i][j] == 0:
                            tn += 1
                        elif labels[i][j] == -1:
                            abstain_neg += 1
                        else:
                            fn += 1
                    if (j not in benchmark_pos) and (j not in benchmark_neg):
                        if labels[i][j] == 1:
                            additional_pos += 1
                        elif labels[i][j] == 0:
                            additional_neg += 1
                        else:
                            additional_abstain += 1
                # accuracy
                if tp + fp + tn + fn == 0:
                    acc = 0
                else:
                    acc = int(np.round((tp + tn) / (tp + fp + tn + fn) * 100, 0))
                # precision
                if tp + fp == 0:
                    precision = 0
                else:
                    precision = int(np.round(tp / (tp + fp) * 100, 0))
                # recall
                if tp + fn == 0:
                    recall = 0
                else:
                    recall = int(np.round(tp / (tp + fn) * 100, 0))
                df_result_rows.append([acc, precision, recall, tp, fp, abstain_pos, additional_pos, tn, fn, abstain_neg, additional_neg, additional_abstain])

    df_result = pd.DataFrame(data=df_result_rows, columns=["Accuracy (ignoring Abstain)", "Precision (Ignoring Abstain)", "Recall (Ignoring Abstain)", "TP", "FP", "P_Abstain", "P_Additional", "TN", "FN", "N_Abstain", "N_Additional", "Abstain_Additional"])
    if labeler_names is not None:
        df_result.index = labeler_names_to_benchmark
    return {"benchmark_result": df_result, "benchmark_pos": benchmark_pos, "benchmark_neg": benchmark_neg}

In [2]:
labeler_names = [
    "Labeler with High Precision Pos 1", 
    "Labeler with High Precision Pos 2",  
    "Labeler with High Precision Neg 1", 
    "Labeler with High Precision Neg 2", 
    "Labeler with Perfect Labels", 
    "Labeler with Completely Wrong Labels", 
    "Unknwon Labeler 1",
    "Unknwon Labeler 2",
    "Unknwon Labeler 3",
    "Unknwon Labeler 4",
    "Unknwon Labeler 5"] 

labeler_names_to_benchmark = [
    "Labeler with Perfect Labels", 
    "Labeler with Completely Wrong Labels", 
    "Unknwon Labeler 1",
    "Unknwon Labeler 2",
    "Unknwon Labeler 3",
    "Unknwon Labeler 4",
    "Unknwon Labeler 5"]

# unknown ground truth labels = [1, 0, 1, 1, 0, 0, 1]

labels = [[1, 0, -1, 1, -1, 1, 0],  #  high precision pos 1
        [-1, -1, 1, -1, -1, -1, 1],  # high precision pos 2
        [-1, 0, -1, -1, 0, -1, -1],  # high precision neg 1
        [-1, 0, -1, -1, -1, 0, -1],  # high precision neg 2
        [1, 0, 1, 1, 0, 0, 1],  # perfect
        [0, 1, 0, 0, 1, 1, 0],  # completely wrong

        [1, -1, -1, 1, 0, 1, 1],  # unknown
        [-1, -1, -1, 1, -1, -1, 1],  # unknown
        [-1, -1, 1, 0, 0, 0, 1],  # unkown
        [1, 0, 1, 0, 0, 0, 0],  # unknown
        [1, -1, 1, 1, 0, 1, 0]  # unknown
        ]  

out = benchmark_labelers(
    labelers_high_precision_pos=[0, 1],
    labelers_high_precision_neg=[2, 3],
    labels=labels,
    labeler_names=labeler_names,
    labeler_names_to_benchmark=labeler_names_to_benchmark,
    consider_disagreeing_benchmark_labels=False)
benchmark_results = out["benchmark_result"]
benchmark_results

benchmark labels: 7 / 7
benchmark disagrees in 1 label(s)


Unnamed: 0,Accuracy (ignoring Abstain),Precision (Ignoring Abstain),Recall (Ignoring Abstain),TP,FP,P_Abstain,P_Additional,TN,FN,N_Abstain,N_Additional,Abstain_Additional
Labeler with Perfect Labels,100,100,100,4,0,0,0,2,0,0,1,0
Labeler with Completely Wrong Labels,0,0,0,0,4,0,1,0,2,0,0,0
Unknwon Labeler 1,100,100,100,3,0,1,1,1,0,1,0,0
Unknwon Labeler 2,100,100,100,2,0,2,0,0,0,2,0,1
Unknwon Labeler 3,75,67,100,2,1,1,0,1,0,1,1,0
Unknwon Labeler 4,67,50,100,2,2,0,0,2,0,0,1,0
Unknwon Labeler 5,80,75,100,3,1,0,1,1,0,1,0,0
