In [None]:
import pandas as pd
from pathlib import Path
from sklearn.metrics import classification_report, f1_score
import numpy as np
import collections

current_path = Path().resolve().parent
print(current_path)

In [8]:
def parse_file(file_path):
    instances = []
    try:
        with open(file_path, encoding='utf-8') as nf:
            lines = nf.readlines()
            tokens, labels, predictions = [], [], []
            for line in lines:
                if line.strip():
                    tk, lb, pr = line.split()
                    tokens.append(tk)
                    labels.append(lb)
                    predictions.append(pr)
                else:
                    instances.append((tokens, labels, predictions))
                    tokens, labels, predictions = [], [], []
    except FileNotFoundError:
        print(f"File not found: {file_path}")
    except ValueError:
        print(f"Invalid file format: {file_path}")

    return instances

def get_arguments_indices(sequence):
    indices = []
    start_index = None
    for i, label in enumerate(sequence):
        if label == '1':
            
            if start_index is None:
                start_index = i
            else:
                indices.append((start_index, i))
                start_index = i
        
        elif (label == '0') and (start_index is not None):
            indices.append((start_index, i))
            start_index = None

    if start_index is not None:
        indices.append((start_index, len(sequence)))

    return indices

def compute_r_matrix(gold_indices, predicted_indices):

    def get_R(gold_argument, predicted_argument):
        (gold_start, gold_end) = gold_argument
        (pred_start, pred_end) = predicted_argument

        intersection_start = max(gold_start, pred_start)
        intersection_end = min(gold_end, pred_end)

        len_intersection_interval = (intersection_end - intersection_start) if intersection_start <= intersection_end else 0
        len_longer_span = max(gold_end - gold_start, pred_end - pred_start)
        return round((len_intersection_interval / len_longer_span), 3)
    
    R_matrix = np.zeros((len(gold_indices), len(predicted_indices)), dtype=float)
    
    for i, gold_argument in enumerate(gold_indices):
        for j, predicted_argument in enumerate(predicted_indices):
            R_matrix[i][j] = get_R(gold_argument, predicted_argument)
    
    return R_matrix

def categorize(matrix, tau = 0.7, omega = 0.3):

    def is_split(matrix, column):
        gold_row_index = np.where(column > 0)[0]
        gold_row = matrix[gold_row_index, :]
        positive_values = gold_row[gold_row > 0].tolist()
        if len(positive_values) > 1:
            sum_values = sum(positive_values)
            greater_than_omega = [x >= omega for x in positive_values]
            if (sum_values >= tau) and (all(greater_than_omega)):
                return True

        return False

    gold_arguments_names = [f'G{i}' for i in range(matrix.shape[0])]
    predicted_arguments_names = [f'P{i}' for i in range(matrix.shape[1])]

    categorization = { 'PM': 0, 'DISP': 0, 'SP': 0, 'MG': 0, 'MU': 0, 'UNR': 0 }

    for i, pred_arg in enumerate(predicted_arguments_names):
        column = matrix[:, i]
        positive_values = column[column > 0].tolist()
        # print(positive_values)
        if len(positive_values) == 0:
            categorization[pred_arg] = 'MU'
        elif len(positive_values) == 1:
            if positive_values[0] == 1:
                categorization[pred_arg] = 'PM'
            else:
                if is_split(matrix, column):
                    categorization[pred_arg] = 'SP'
                else:
                    categorization[pred_arg] = 'DISP'

        elif len(positive_values) > 1:
            sum_values = sum(positive_values)
            greater_than_omega = [x >= omega for x in positive_values]
            
            if (sum_values >= tau) and (all(greater_than_omega)):
                categorization[pred_arg] = 'MG'
            else:
                categorization[pred_arg] = 'DISP'

    for i, gold_arg in enumerate(gold_arguments_names):
        row = matrix[i, :]
        positive_values = row[row > 0].tolist()
        if len(positive_values) == 0:
            categorization[gold_arg] = 'UNR'

    return categorization

def get_best_run_number(row):
    columns = [f'run{i}' for i in range(10)]
    values = row[columns].values
    return np.argmax(values)

def r_average(counter, total_gold_argument_units):
    # print(counter, total_gold_argument_units)
    suma_r_values = sum(x[1] for x in counter)
    return round(suma_r_values / len(counter), 2), round(suma_r_values / total_gold_argument_units, 2), len(counter)

In [None]:
f1_results = pd.read_csv(str(current_path / 'data.csv'))
f1_results['best_run'] = f1_results.apply(lambda row: get_best_run_number(row), axis = 1)

print(f1_results.shape)
f1_results.head()

In [14]:
def get_number_btas(lista):
    count = collections.Counter()
    for x in lista:
        count.update(x)

    return count['1']

In [None]:
train_set = ['pe', 'we', 'abam', 'mix1']
test_set = ['pe', 'we', 'abam']
models = ['bert', 'bert_crf', 'distilbert', 'distilbert_crf', 'distilbert_bilstmcrf', 'bert_bilstmcrf']

info_df_results = []

for test in test_set:
    for elem in train_set:
        for model in models:
            best_run = f1_results.loc[(f1_results['train'] == elem) & (f1_results['test'] == test) & (f1_results['model'] == model)].best_run.values[0]
        
            total_counter_categorization = collections.Counter({'PM':0, 'DISP': 0, 'SP': 0, 'MG': 0, 'MU': 0, 'UNR': 0})

            folder_dir = f"{elem}_{model}"
            model_name = "-".join(model.split("_"))
            file_name = f"results-{elem}-{test}-{model_name}-{best_run}.txt"
            file_path = (current_path / 'results' / 'BIO' / folder_dir / file_name)
            
            total_gold_argument_units, total_pred_argument_units = 0, 0

            instances = parse_file(file_path)

            btags_gold = get_number_btas([x[1] for x in instances])
            btags_pred = get_number_btas([x[2] for x in instances])

            for (tokens, labels, predictions) in instances:
                arg_component_indices = get_arguments_indices(labels)
                arg_predicted_indices = get_arguments_indices(predictions)

                total_gold_argument_units += len(arg_component_indices)
                total_pred_argument_units += len(arg_predicted_indices)

                if (len(arg_component_indices) > 0) or (len(arg_predicted_indices) > 0):

                    R_matrix = compute_r_matrix(arg_component_indices, arg_predicted_indices)

                    categorization = categorize(R_matrix, 0.7, 0.35)

                    total_counter_categorization.update(collections.Counter(categorization.values()))


            row_df_results = [elem, test, model, total_gold_argument_units, total_pred_argument_units] + [total_counter_categorization[x] for x in ['PM', 'DISP', 'SP', 'MG', 'MU', 'UNR']]

            info_df_results.append(row_df_results)
        

In [None]:
errors_results_df = pd.DataFrame(info_df_results, 
             columns = ['train', 'test', 'model', 'gold_arguments', 'predicted_arguments', 'PM', 'DISP', 'SP', 'MG', 'MU', 'UNR'])

errors_results_df