# Model Evaluation
Please make sure to change the path when loading data and model.

In [13]:
import torch
import json
import requests
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast, BertForQuestionAnswering
from tqdm import tqdm

## Load Data

In [14]:
def read_data(path):
    """
    Read SQuAD data from a JSON file.

    Parameters:
    - path: Path to the JSON file containing SQuAD data

    Returns:
    - contexts: List of contexts (passages)
    - questions: List of questions
    - answers: List of answers
    """
    # Open the JSON file and load the data
    with open(path, 'r', encoding='utf-8') as f:
        squad = json.load(f)

    # Initialize lists to store contexts, questions, and answers
    contexts = []
    questions = []
    answers = []

    # Iterate over groups in the SQuAD data
    for group in squad.get('data', []):
        # Iterate over paragraphs in the group
        for passage in group.get('paragraphs', []):
            # Get the context (passage)
            context = passage.get('context', '')
            # Iterate over questions and answers in the paragraph
            for qa in passage.get('qas', []):
                # Get the question
                question = qa.get('question', '')
                # Iterate over answers for the question
                for answer in qa.get('answers', []):
                    # Append context, question, and answer to their respective lists
                    contexts.append(context)
                    questions.append(question)
                    answers.append(answer)

    # Return the lists of contexts, questions, and answers
    return contexts, questions, answers

def add_end_index(answers, contexts):
    for answer, context in zip(answers, contexts):
        gold_text = answer['text']
        start_idx = answer['answer_start']
        end_idx = start_idx + len(gold_text)

        # Check if the answer is correctly positioned
        for offset in [0, -1, -2]:
            if context[start_idx + offset:end_idx + offset] == gold_text:
                # Update answer start and end indices
                answer['answer_start'] = start_idx + offset
                answer['answer_end'] = end_idx + offset
                break  # Break loop once correct offset is found

def add_token_positions(encodings, answers):
    """
    Adds token positions for answers to encodings.

    Parameters:
    - encodings: Encodings object containing tokenized inputs
    - answers: List of dictionaries containing answer positions

    Returns:
    None (modifies encodings in place)
    """
    start_positions = []
    end_positions = []

    # Loop through each answer
    for i, answer in enumerate(answers):
        # Convert character positions to token positions
        start_positions.append(encodings.char_to_token(i, answer['answer_start']))
        end_positions.append(encodings.char_to_token(i, answer['answer_end'] - 1))

        # Handle cases where answer passage has been truncated
        if start_positions[-1] is None:
            start_positions[-1] = tokenizer.model_max_length
        if end_positions[-1] is None:
            end_positions[-1] = tokenizer.model_max_length

    # Update encodings with start and end positions
    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

class SQuAD_Dataset(torch.utils.data.Dataset):
    """
    Custom dataset class for SQuAD.

    Parameters:
    - encodings: Encodings object containing tokenized inputs
    """
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        """
        Retrieves an item from the dataset.

        Parameters:
        - idx: Index of the item to retrieve

        Returns:
        Dictionary containing tensors for each key in the encodings
        """
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    def __len__(self):
        """
        Returns the length of the dataset.

        Returns:
        Integer representing the length of the dataset
        """
        return len(self.encodings.input_ids)


In [15]:
# Read training data
contexts, questions, answers = read_data('/accounts/grad/fangyuan_li/259/data/train-v2.0.json')
# Read validation data
valid_contexts, valid_questions, valid_answers = read_data('/accounts/grad/fangyuan_li/259/data/val-v2.0.json')
# Split train-v2.0 into train and test sets
train_contexts = contexts[5000:]
train_questions = questions[5000:]
train_answers = answers[5000:]

test_contexts = contexts[:5000]
test_questions = questions[:5000]
test_answers = answers[:5000]

# Add indexes
add_end_index(train_answers, train_contexts)
add_end_index(valid_answers, valid_contexts)
add_end_index(test_answers, test_contexts)

# Initialize tokenizer
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
train_encodings = tokenizer(train_contexts, train_questions, truncation=True, padding=True)
valid_encodings = tokenizer(valid_contexts, valid_questions, truncation=True, padding=True)
test_encodings = tokenizer(test_contexts, test_questions, truncation=True, padding=True)

# Add token positions for training data
add_token_positions(train_encodings, train_answers)
# Add token positions for validation data
add_token_positions(valid_encodings, valid_answers)
# Add token positions for test data
add_token_positions(test_encodings, test_answers)

# Create training dataset
train_dataset = SQuAD_Dataset(train_encodings)
# Create validation dataset
valid_dataset = SQuAD_Dataset(valid_encodings)
# Create test dataset
test_dataset = SQuAD_Dataset(test_encodings)

# Define the dataloaders
# train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
# valid_loader = DataLoader(valid_dataset, batch_size=16)
# test_loader = DataLoader(test_dataset, batch_size=16)

## Load Model

In [16]:
model = BertForQuestionAnswering.from_pretrained("./full_data/QLoRA2/")

Some weights of BertForQuestionAnswering were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['qa_outputs.bias', 'qa_outputs.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [17]:
# Check the available device and use GPU if available, otherwise use CPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# Print the device being used
print(f'Working on {device}')

Working on cuda


## Model Evaluation

### F1 Score

In [18]:
def get_input(text):
    input_ids = text['input_ids']
    start_true = text['start_positions']
    end_true = text['end_positions'] + 1
    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[start_true:end_true]))
    return answer

def get_prediction(text, device='cuda'):
    input_ids = text['input_ids'].unsqueeze(0).to(device)
    attention_mask = text['attention_mask'].unsqueeze(0).to(device)
    # Forward pass through the model
    outputs = model(input_ids, attention_mask=attention_mask)
    # Get predicted start and end positions
    start_pred = torch.argmax(outputs['start_logits'], dim=1)
    end_pred = torch.argmax(outputs['end_logits'], dim=1) + 1
    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[0][start_pred:end_pred]))
    return answer

def normalize_text(s):
    """
    Normalize text by removing articles, punctuation, and standardizing whitespace.

    Parameters:
    - s: Input text to be normalized

    Returns:
    - Normalized text
    """
    import string, re

    # Function to remove articles from text
    def remove_articles(text):
        regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
        return re.sub(regex, " ", text)

    # Function to fix white space in text
    def white_space_fix(text):
        return " ".join(text.split())

    # Function to remove punctuation from text
    def remove_punc(text):
        exclude = set(string.punctuation)
        return "".join(ch for ch in text if ch not in exclude)

    # Function to convert text to lowercase
    def lower(text):
        return text.lower()

    # Apply text normalization steps
    return white_space_fix(remove_articles(remove_punc(lower(s))))

def exact_match(prediction, truth):
    """
    Compute exact match between predicted answer and true answer.

    Parameters:
    - prediction: Predicted answer
    - truth: True answer

    Returns:
    - Boolean indicating whether the prediction exactly matches the truth
    """
    return bool(normalize_text(prediction) == normalize_text(truth))

def compute_f1(prediction, truth):
    """
    Compute F1 score between predicted answer and true answer.

    Parameters:
    - prediction: Predicted answer
    - truth: True answer

    Returns:
    - F1 score
    """
    pred_tokens = normalize_text(prediction).split()
    truth_tokens = normalize_text(truth).split()

    # If either the prediction or the truth is no-answer then F1 score is 1 if they agree, 0 otherwise
    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return int(pred_tokens == truth_tokens)

    common_tokens = set(pred_tokens) & set(truth_tokens)

    # If there are no common tokens then F1 score is 0
    if len(common_tokens) == 0:
        return 0

    prec = len(common_tokens) / len(pred_tokens)
    rec = len(common_tokens) / len(truth_tokens)

    return round(2 * (prec * rec) / (prec + rec), 2)

In [19]:
model.eval()
model = model.to(device)

f1 = []
em = []

for i, text in enumerate(test_dataset):
    # Get the true answer for the question
    answer = get_input(text)
    # Get the predicted answer for the question
    prediction = get_prediction(text)
    # Compute exact match score
    em_score = exact_match(prediction, answer)
    # Compute F1 score
    f1_score = compute_f1(prediction, answer)

    em.append(em_score)
    f1.append(f1_score)
    if (i % 500)==0:
        print('num:', i, 'f1:', sum(f1) / len(f1), 'em: ', sum(em) / len(em))

avg_f1 = sum(f1) / len(f1)
exact_match = sum(em) / len(em)
print('f1: ', avg_f1)
print('em: ', exact_match)

num: 0 f1: 0.16 em:  0.0
num: 500 f1: 0.8056087824351299 em:  0.7145708582834331
num: 1000 f1: 0.8194405594405583 em:  0.7242757242757243
num: 1500 f1: 0.8169420386409052 em:  0.72618254497002
num: 2000 f1: 0.8089205397301347 em:  0.7126436781609196
num: 2500 f1: 0.8037624950019994 em:  0.7085165933626549
num: 3000 f1: 0.7995434855048351 em:  0.6917694101966011
num: 3500 f1: 0.8031248214795813 em:  0.6966580976863753
num: 4000 f1: 0.8080754811297219 em:  0.7048237940514871
num: 4500 f1: 0.8048189291268663 em:  0.6965118862475006
f1:  0.8011140000000058
em:  0.693


### Accuracy

In [20]:
# Define the dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)

In [21]:
# Set the model to evaluation mode
model.eval()

# Initialize a list to store accuracy values
acc = []

# Iterate over batches in the validation data
for batch in tqdm(test_loader):
    with torch.no_grad():
        # Move input tensors to the appropriate device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_true = batch['start_positions'].to(device)
        end_true = batch['end_positions'].to(device)

        # Forward pass through the model
        outputs = model(input_ids, attention_mask=attention_mask)

        # Get predicted start and end positions
        start_pred = torch.argmax(outputs['start_logits'], dim=1)
        end_pred = torch.argmax(outputs['end_logits'], dim=1)

        # Compute accuracy for start positions and end positions
        acc.append(((start_pred == start_true).sum() / len(start_pred)).item())
        acc.append(((end_pred == end_true).sum() / len(end_pred)).item())

# Compute the average accuracy
acc = sum(acc) / len(acc)
acc

100%|██████████| 313/313 [00:46<00:00,  6.71it/s]


0.7167531948881789