In [None]:
import torch
import numpy as np
from transformers import AutoModelForQuestionAnswering, AutoTokenizer
from datasets import load_from_disk
import evaluate


tokenizer_spanbert = AutoTokenizer.from_pretrained("SpanBert/spanbert-base-cased")
tokenizer_roberta  = AutoTokenizer.from_pretrained("roberta-base")
model_directory = "spanbert_qa_finetuned_model"

model = AutoModelForQuestionAnswering.from_pretrained(model_directory)
#tokenizer = AutoTokenizer.from_pretrained(model_directory)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Model is not empty: ", model is not None)
#print("Tokenizer is not empty: ", tokenizer is not None)

In [None]:
train_dataset_roberta = load_from_disk("Roberta/roberta_preprocessed_train_dataset/data-00000-of-00001.arrow")
val_dataset_roberta = load_from_disk("Roberta/roberta_preprocessed_val_dataset/data-00000-of-00001.arrow")
test_dataset_roberta = load_from_disk("Roberta/roberta_preprocessed_test_dataset/data-00000-of-00001.arrow")

In [None]:
train_dataset_spanbert = load_from_disk("SpanBert/spanbert_preprocessed_train_dataset/data-00000-of-00001.arrow")
val_dataset_spanbert = load_from_disk("SpanBert/spanbert_preprocessed_train_dataset/data-00000-of-00001.arrow")
test_dataset_spanbert = load_from_disk("SpanBert/spanbert_preprocessed_train_dataset/data-00000-of-00001.arrow")

In [None]:
def predict(example, model, tokenizer, device):

    input_ids = torch.tensor(example["input_ids"]).unsqueeze(0)  # Shape: [1, seq_length]
    attention_mask = torch.tensor(example["attention_mask"]).unsqueeze(0)

    inputs = {"input_ids": input_ids, "attention_mask": attention_mask}
    if "token_type_ids" in example:
        token_type_ids = torch.tensor(example["token_type_ids"]).unsqueeze(0)
        inputs["token_type_ids"] = token_type_ids

    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    start_logits = outputs.start_logits.cpu().numpy()[0]
    end_logits = outputs.end_logits.cpu().numpy()[0]

    start_index = np.argmax(start_logits)
    end_index = np.argmax(end_logits)

    # Extract predicted token IDs from the original input_ids list
    predicted_token_ids = example["input_ids"][start_index : end_index + 1]

    # Decode token IDs to text using the fallback tokenizer
    predicted_answer = tokenizer.decode(predicted_token_ids, skip_special_tokens=True)
    return predicted_answer

In [None]:
def evaluation(predicted_answer, reference_answer, dataset, model, tokenizer, device):
    predicted_answers = []
    reference_answers = []

    for example in dataset:
        pred_ans = predict(example, model, tokenizer, device)
        predicted_answers.append(pred_ans)
        reference_answers.append(example["answer"])

    accuracy_metric = evaluate.load("accuracy")
    #f1_metric = evaluate.load("f1")
    bleu_metric = evaluate.load("bleu")

    acc = accuracy_metric.compute(predictions=predicted_answers, references=reference_answers)

    # Compute Macro F1 Score.
    #f1 = f1_metric.compute(predictions=predicted_answers, references=reference_answers, average="macro")

    # Compute BLEU Score.
    # BLEU requires tokenized input (lists of tokens).
    bleu = bleu_metric.compute(
        predictions=[pred.split() for pred in predicted_answers],
        references=[[ref.split()] for ref in reference_answers]
    )

    return acc, bleu, predicted_answers, reference_answers



In [None]:
from collections import Counter

def compute_exact_match(pred, ref):
    # Exact match: 1 if they are identical, else 0.
    return int(pred.strip() == ref.strip())

def compute_f1(pred, ref):
    # Tokenize the answers.
    pred_tokens = pred.split()
    ref_tokens = ref.split()

    # Handle cases where both are empty.
    if len(pred_tokens) == 0 and len(ref_tokens) == 0:
        return 1.0
    # If one is empty and the other is not, f1 is zero.
    if len(pred_tokens) == 0 or len(ref_tokens) == 0:
        return 0.0

    pred_counter = Counter(pred_tokens)
    ref_counter = Counter(ref_tokens)
    common = pred_counter & ref_counter
    num_same = sum(common.values())
    if num_same == 0:
        return 0.0
    precision = num_same / len(pred_tokens)
    recall = num_same / len(ref_tokens)
    f1 = 2 * precision * recall / (precision + recall)
    return f1


def eval_reccon_metrics(predicted_answers, reference_answers):
    em_all = []
    f1_all = []
    em_positive = []  # for examples with non-empty reference answer
    f1_positive = []
    em_negative = []  # for examples where reference answer is empty
    f1_negative = []

    for pred, ref in zip(predicted_answers, reference_answers):
        em = compute_exact_match(pred, ref)
        f1_val = compute_f1(pred, ref)
        em_all.append(em)
        f1_all.append(f1_val)

        if ref == "":  # Negative example (expected prediction: empty)
            em_negative.append(em)
            f1_negative.append(f1_val)
        else:
            em_positive.append(em)
            f1_positive.append(f1_val)

    overall_em = np.mean(em_all)
    overall_f1 = np.mean(f1_all)
    positive_em = np.mean(em_positive) if em_positive else 0
    positive_f1 = np.mean(f1_positive) if f1_positive else 0
    negative_em = np.mean(em_negative) if em_negative else 0
    negative_f1 = np.mean(f1_negative) if f1_negative else 0

    result = {
        "overall_em": overall_em,
        "overall_f1": overall_f1,
        "positive_em": positive_em,
        "positive_f1": positive_f1,
        "negative_em": negative_em,
        "negative_f1": negative_f1,
    }
    return result



In [None]:
result =

print("Overall Exact Match (EM):", result["overall_em"])
print("Overall F1 Score:", result["overall_f1"])
print("Positive Exact Match (EM):", result["positive_em"])
print("Positive F1 Score:", result["positive_f1"])
print("Negative Exact Match (EM):", result["negative_em"])
print("Negative F1 Score:", result["negative_f1"])