In [None]:
import json
import torch
from transformers import AlbertTokenizer, AlbertForQuestionAnswering
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score

tokenizer = AlbertTokenizer.from_pretrained('albert-base-v2', use_fast=True)

def load_spoken_squad_data(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    contexts = []
    answers = []
    
    for item in data['data']:
        for paragraph in item['paragraphs']:
            context = paragraph['context']
            for qa in paragraph['qas']:
                for answer in qa['answers']:
                    contexts.append(context)
                    answers.append({
                        'text': answer['text'],
                        'answer_start': answer['answer_start'],
                        'answer_end': answer['answer_start'] + len(answer['text'])
                    })
    
    return contexts, answers

train_contexts, train_answers = load_spoken_squad_data('Spoken-SQuAD-master/spoken_train-v1.1.json')

def tokenize_and_align_labels(contexts, answers):
    encodings = tokenizer(contexts, truncation=True, padding=True, return_tensors="pt")
    add_token_positions(encodings, answers)
    return encodings

def add_token_positions(encodings, answers):
    start_positions = []
    end_positions = []

    for i in range(len(answers)):
        context = encodings['input_ids'][i]
        answer = answers[i]['text']
        
        start_char = answers[i]['answer_start']
        end_char = answers[i]['answer_end']

        # Convert input ids to tokens
        context_tokens = tokenizer.convert_ids_to_tokens(context)
        start_token = None
        end_token = None
        
        # Find token positions that match the character indices
        char_idx = 0
        for idx, token in enumerate(context_tokens):
            char_idx += len(token)  # Accumulate token length to find character positions
            if start_char <= char_idx and start_token is None:
                start_token = idx
            if end_char <= char_idx and end_token is None:
                end_token = idx
                break
        
        start_positions.append(start_token)
        end_positions.append(end_token)

    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

train_encodings = tokenize_and_align_labels(train_contexts, train_answers)

# Define a custom Dataset class
class QA_Dataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __len__(self):
        return len(self.encodings['input_ids'])

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        return item

train_dataset = QA_Dataset(train_encodings)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

model = AlbertForQuestionAnswering.from_pretrained('albert-base-v2')


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)


optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

# Function to calculate F1 score
def calculate_f1(start_preds, end_preds, start_trues, end_trues):
    f1_scores = []
    for sp, ep, st, et in zip(start_preds, end_preds, start_trues, end_trues):
        pred = (sp.item(), ep.item())
        true = (st.item(), et.item())
        if pred == true:
            f1_scores.append(1.0)  # Perfect match
        else:
            f1_scores.append(0.0)  
    return sum(f1_scores) / len(f1_scores) if f1_scores else 0.0

EPOCHS = 3  
for epoch in range(EPOCHS):
    model.train()
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        start_positions = batch['start_positions']. to(device)
        end_positions = batch['end_positions'].to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask,
                         start_positions=start_positions, end_positions=end_positions)

        loss = outputs.loss
        loss.backward()

        optimizer.step()
    
    # Evaluation for F1 score
    model.eval()
    start_preds, end_preds = [], []
    start_trues, end_trues = [], []

    with torch.no_grad():
        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            start_logits = outputs.start_logits
            end_logits = outputs.end_logits

            start_preds.extend(torch.argmax(start_logits, dim=1).cpu().numpy())
            end_preds.extend(torch.argmax(end_logits, dim=1).cpu().numpy())
            start_trues.extend(batch['start_positions'].cpu().numpy())
            end_trues.extend(batch['end_positions'].cpu().numpy())

    f1 = calculate_f1(start_preds, end_preds, start_trues, end_trues)
    print(f"Epoch {epoch + 1}: Loss = {loss.item()}, F1 Score = {f1}")


model.save_pretrained('model/albert-finetuned-spoken-squad')
tokenizer.save_pretrained('model_save/albert-finetuned-spoken-squad')