In [1]:
import json
import numpy as np

In [2]:
# get required data per annotator
def extract_data_per_annotator(annotator_number):
    file_path = f'data_extraction/personal_annotation_{annotator_number}.json'

    with open(file_path, 'r') as json_file:
        data = json.load(json_file)

    return data

In [3]:
# outputs how many data items each annotator labelled as "None"
def output_number_of_none_arguments_per_annotator():
    annotators_none_values = []
    for i in range(0, 5):
        annotator_none_values = 0
        data = extract_data_per_annotator(i + 1).get("labels")

        for entry in data:
            if 'None' in entry:
                annotator_none_values += 1
        annotators_none_values.append(annotator_none_values)

    return annotators_none_values

print(output_number_of_none_arguments_per_annotator())

[13, 35, 13, 18, 22]


In [6]:
from sklearn.metrics import precision_score, recall_score, f1_score
from itertools import combinations

def tokenize(text):
    """
    Tokenizes the text into words and returns a list of tokens.
    """
    return text.split()

def span_to_token_set(span, text):
    """
    Converts a text span into a set of token positions.
    """
    tokens = tokenize(text)
    span_tokens = tokenize(span)
    token_positions = {i for i, token in enumerate(tokens) if token in span_tokens}
    return token_positions

def precision_recall_f1(set1, set2):
    """
    Calculates precision, recall, and F1 score between two sets of tokens.
    """
    true_positive = len(set1 & set2)
    false_positive = len(set1 - set2)
    false_negative = len(set2 - set1)

    precision = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0
    recall = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1

def average_f1_score(data, text):
    """
    Calculates the average F1 score for overlapping spans in the dataset.
    """
    total_f1 = 0
    count = 0
    for annotations in data:
        pairwise_f1 = []
        for (a, b) in combinations(annotations, 2):
            token_set_a = set().union(*[span_to_token_set(span, text) for span in a])
            token_set_b = set().union(*[span_to_token_set(span, text) for span in b])
            _, _, f1 = precision_recall_f1(token_set_a, token_set_b)
            pairwise_f1.append(f1)
        total_f1 += sum(pairwise_f1)
        count += len(pairwise_f1)
    return total_f1 / count if count > 0 else 0


number_of_annotations = 50 # change as needed
annotation_cumulative = []
for i in range(0, number_of_annotations):
    current_annotations = []
    for j in range (0, 5):
        if j == 0 or j == 2 or j == 1:
            continue
        data = extract_data_per_annotator(j + 1).get("labels")
        current_annotations.append(data[i])
    annotation_cumulative.append(current_annotations)

In [7]:
import re
import numpy as np
import json
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [8]:
def extract_data_per_annotator(annotator_number):
    file_path = f'data_extraction/personal_annotation_{annotator_number}.json'

    with open(file_path, 'r') as json_file:
        data = json.load(json_file)

    return data

In [9]:
%run process_answer.ipynb

def get_data(llm_method, annotator_1, annotator_2):
    # annotator_data = extract_data_per_annotator(annotator_number)
    llm_result_1 = process_and_store_llm_answers(f'llm_responses/{llm_method}_{annotator_1}.json')
    llm_result_2 = process_and_store_llm_answers(f'llm_responses/{llm_method}_{annotator_2}.json')

    return llm_result_1, llm_result_2

In [20]:
# pairwise cot
def compute_cosine_similarity(number_of_entries, samples_used_for_training, llm_method, annotator_1, annotator_2):
    dataset_average_cosine_similarity = 0
    llm_result_1, llm_result_2 = get_data(llm_method, annotator_1, annotator_2)

    average_similarity_all_runs = 0
    for j in range(0, 10):
        for i in range(0, number_of_entries):
            if i in samples_used_for_training:
                continue

            # llm response
            c = llm_result_1[j][i]
            if len(c) == 0:
                c.append('None')

            if len(c) >= 2:
                for response in c:
                    if response == "None" or response == "":
                        c.remove(response)

            d = llm_result_2[j][i]
            if len(d) == 0:
                d.append('None')

            if len(d) >= 2:
                for response in d:
                    if response == "None" or response == "":
                        d.remove(response)


            vectorizer = TfidfVectorizer().fit(c + d)
            tfidf_c = vectorizer.transform(c)
            tfidf_d = vectorizer.transform(d)

            similarity_matrix = cosine_similarity(tfidf_c, tfidf_d)

            # adjust similarities and compute the highest value for each row
            similarity_sum = 0
            for i, row in enumerate(similarity_matrix):
                adjusted_row = [sim if sim >= 0.2 else 0 for sim in row]
                highest_similarity = max(adjusted_row)
                similarity_sum += highest_similarity



            average_similarity = np.round(similarity_sum/(len(c)), 5)
            dataset_average_cosine_similarity += average_similarity


        dataset_average_cosine_similarity = np.round(dataset_average_cosine_similarity/(number_of_entries - len(set(samples_used_for_training))), 3)
        average_similarity_all_runs += dataset_average_cosine_similarity

    average_similarity_all_runs = np.round(average_similarity_all_runs/10, 2)
    return average_similarity_all_runs

In [21]:
# one shot cot
def one_shot_cot():
    number_of_entries = 50
    samples_used_for_training = [0]
    llm_method = 'one_shot_cot'

    cumulative_cosine_similarity = []
    for i in range(0, 5):
        current_annotator_cosine_similarity = []
        for j in range(0, 5):
            if j > i:
                current_annotator_cosine_similarity.append(compute_cosine_similarity(number_of_entries, samples_used_for_training, f'{llm_method}', i + 1, j + 1))
            elif i == j:
                current_annotator_cosine_similarity.append(1)
            else:
                current_annotator_cosine_similarity.append(0)

        cumulative_cosine_similarity.append(current_annotator_cosine_similarity)

    return cumulative_cosine_similarity

result = one_shot_cot()
for row in result:
    print(row)

[1, 0.56, 0.71, 0.71, 0.69]
[0, 1, 0.6, 0.59, 0.6]
[0, 0, 1, 0.68, 0.68]
[0, 0, 0, 1, 0.69]
[0, 0, 0, 0, 1]


In [22]:
def few_shot_cot():
    number_of_entries = 50
    samples_used_for_training = [0, 3, 12]
    llm_method = 'few_shot_cot'

    cumulative_cosine_similarity = []
    for i in range(0, 5):
        current_annotator_cosine_similarity = []
        for j in range(0, 5):
            if j > i:
                current_annotator_cosine_similarity.append(compute_cosine_similarity(number_of_entries, samples_used_for_training, f'{llm_method}', i + 1, j + 1))
            elif i == j:
                current_annotator_cosine_similarity.append(1)
            else:
                current_annotator_cosine_similarity.append(0)

        cumulative_cosine_similarity.append(current_annotator_cosine_similarity)

    return cumulative_cosine_similarity

result = few_shot_cot()
for row in result:
    print(row)

[1, 0.39, 0.57, 0.65, 0.64]
[0, 1, 0.42, 0.43, 0.43]
[0, 0, 1, 0.58, 0.61]
[0, 0, 0, 1, 0.64]
[0, 0, 0, 0, 1]
