# Hugging Face Transformers

## 1. Library Imports

In [None]:
import json
import numpy as np
import pandas as pd
import torch
import os
from transformers import DebertaV2Tokenizer, DebertaV2ForTokenClassification
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import (
    AutoTokenizer, 
    AutoModelForTokenClassification, 
    Trainer, 
    TrainingArguments,
    DataCollatorForTokenClassification,
    EarlyStoppingCallback,
    
    get_linear_schedule_with_warmup
)

from datasets import Dataset as HFDataset, DatasetDict

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from tqdm.auto import tqdm


## 3. Constant Definition 

In this cell, I´ll document the type of entities and their correspondant colors.

In [None]:
# Define entity types and their descriptions
ENTITY_TYPES = {
    "ACTION": "Direct commands or actions mentioned in the message",
    "SITUATION": "Racing context or circumstance descriptions",
    "INCIDENT": "Accidents or on-track events",
    "STRATEGY_INSTRUCTION": "Strategic directives",
    "POSITION_CHANGE": "References to overtakes or positions",
    "PIT_CALL": "Specific calls for pit stops",
    "TRACK_CONDITION": "Mentions of the track's state",
    "TECHNICAL_ISSUE": "Mechanical or car-related problems",
    "WEATHER": "References to weather conditions"
}

# Color scheme for entity visualization
ENTITY_COLORS = {
    "ACTION": "#4e79a7",           # Blue
    "SITUATION": "#f28e2c",         # Orange
    "INCIDENT": "#e15759",          # Red
    "STRATEGY_INSTRUCTION": "#76b7b2", # Teal
    "POSITION_CHANGE": "#59a14f",   # Green
    "PIT_CALL": "#edc949",          # Yellow
    "TRACK_CONDITION": "#af7aa1",   # Purple
    "TECHNICAL_ISSUE": "#ff9da7",   # Pink
    "WEATHER": "#9c755f"            # Brown
}

print("Entity types defined:")
for entity, description in ENTITY_TYPES.items():
    print(f"  - {entity}: {description}")

## 4. Load and Explore Data

In [None]:
# Load F1 radio data from JSON file
def load_f1_radio_data(json_file):
    """Load and explore F1 radio data from JSON file"""
    with open(json_file, 'r') as f:
        data = json.load(f)
    
    print(f"Loaded {len(data)} messages from {json_file}")
    
    # Show sample structure
    if len(data) > 0:
        print("\nSample record structure:")
        sample = data[0]
        print(f"  Driver: {sample.get('driver', 'N/A')}")
        print(f"  Radio message: {sample.get('radio_message', 'N/A')[:100]}...")
        
        if 'annotations' in sample and len(sample['annotations']) > 1:
            if isinstance(sample['annotations'][1], dict) and 'entities' in sample['annotations'][1]:
                entities = sample['annotations'][1]['entities']
                print(f"  Number of entities: {len(entities)}")
                if len(entities) > 0:
                    entity = entities[0]
                    entity_text = sample['radio_message'][entity[0]:entity[1]]
                    print(f"  Sample entity: [{entity[0]}, {entity[1]}, '{entity_text}', '{entity[2]}']")
    
    return data



In [None]:
# Load the JSON data
json_file_path = "f1_radio_entity_annotations.json"
f1_data = load_f1_radio_data(json_file_path)

# Count entity types in the dataset
entity_counts = {}
for item in f1_data:
    if 'annotations' in item and len(item['annotations']) > 1:
        if isinstance(item['annotations'][1], dict) and 'entities' in item['annotations'][1]:
            for _, _, entity_type in item['annotations'][1]['entities']:
                entity_counts[entity_type] = entity_counts.get(entity_type, 0) + 1

print("\nEntity type distribution in dataset:")
for entity_type, count in sorted(entity_counts.items(), key=lambda x: x[1], reverse=True):
    print(f"  - {entity_type}: {count}")

## 5. Preprocessing F1 Radio Data

In [None]:
def preprocess_f1_data(data):
    """Extract and preprocess F1 radio data with valid annotations"""
    processed_data = []
    skipped_count = 0
    
    for item in data:
        if 'radio_message' not in item or 'annotations' not in item:
            skipped_count += 1
            continue
            
        text = item['radio_message']
        
        # Skip items with empty or null text
        if not text or text.strip() == "":
            skipped_count += 1
            continue
            
        # Extract entities if they exist in expected format
        if len(item['annotations']) > 1 and isinstance(item['annotations'][1], dict):
            annotations = item['annotations'][1]
            if 'entities' in annotations and annotations['entities']:
                entities = annotations['entities']
                
                # Add to processed data
                processed_data.append({
                    'text': text,
                    'entities': entities,
                    'driver': item.get('driver', None)
                })
            else:
                skipped_count += 1
        else:
            skipped_count += 1
    
    print(f"Processed {len(processed_data)} messages with valid annotations")
    print(f"Skipped {skipped_count} messages with missing or invalid annotations")
    
    # Show a sample of processed data
    if processed_data:
        sample = processed_data[10]
        print("\nSample processed message:")
        print(f"Text: {sample['text']}")
        print("Entities:")
        for start, end, entity_type in sample['entities']:
            entity_text = sample['text'][start:end]
            print(f"  - [{start}, {end}] '{entity_text}' ({entity_type})")
    
    return processed_data



In [None]:
# Preprocess the loaded data
processed_f1_data = preprocess_f1_data(f1_data)

## 6. Covert to BIO tagging format

Deeper BIO tagging format information can be searched [here](https://en.wikipedia.org/wiki/Inside–outside–beginning_(tagging)).

### BIO Format Explanation

The **BIO format** is a way to label words in a sentence to indicate if they are part of a named entity, and if so, where in the entity they belong. It uses three types of labels:

- **B- (Beginning)**: The first word in an entity.
- **I- (Inside)**: Any word inside the entity that isn't the first one.
- **O (Outside)**: Words that are not part of any entity.

---

### Example Radio

Here is an example of a radio message from Max Verstappen´s track engineer: 

**Text:**  
*"Max, we've currently got yellows in turn 7. Ferrari in the wall, no? Yes, that's Charles stopped. We are expecting the potential of an aborted start, but just keep to your protocol at the moment."*

Here are the entities mentioned in the message:

1. **'keep to your protocol at the moment'** (ACTION)
2. **'we've currently got yellows in turn 7'** (SITUATION)
3. **'We are expecting the potential of an aborted start'** (SITUATION)
4. **'Ferrari in the wall'** (INCIDENT)
5. **'that's Charles stopped'** (INCIDENT)

---

### Breaking the Sentence

We break the sentence into words and then tag them as follows:

| Word            | BIO Tag          |
|-----------------|------------------|
| Max,            | O                |
| we've           | O                |
| currently       | O                |
| got             | O                |
| yellows         | O                |
| in              | O                |
| turn            | O                |
| 7.              | O                |
| Ferrari         | B-INCIDENT       |
| in              | I-INCIDENT       |
| the             | I-INCIDENT       |
| wall,           | I-INCIDENT       |
| no?             | O                |
| Yes,            | O                |
| that's          | B-INCIDENT       |
| Charles         | I-INCIDENT       |
| stopped.        | I-INCIDENT       |
| We              | B-SITUATION      |
| are             | I-SITUATION      |
| expecting       | I-SITUATION      |
| the             | I-SITUATION      |
| potential       | I-SITUATION      |
| of              | I-SITUATION      |
| an              | I-SITUATION      |
| aborted         | I-SITUATION      |
| start,          | I-SITUATION      |
| but             | O                |
| just            | O                |
| keep            | B-ACTION         |
| to              | I-ACTION         |
| your            | I-ACTION         |
| protocol        | I-ACTION         |
| at              | I-ACTION         |
| the             | I-ACTION         |
| moment.         | I-ACTION         |




In [None]:
def create_ner_tags(text, entities):
    """Convert character-based entity spans to token-based BIO tags"""
    words = text.split()
    tags = ["O"] * len(words)
    char_to_word = {}
    
    # Create mapping from character positions to word indices
    char_idx = 0
    for word_idx, word in enumerate(words):
        # Account for spaces
        if char_idx > 0:
            char_idx += 1  # Space
        
        # Map each character position to its word index
        for char_pos in range(char_idx, char_idx + len(word)):
            char_to_word[char_pos] = word_idx
        
        char_idx += len(word)
    
    # Apply entity tags
    for start_char, end_char, entity_type in entities:
        # Skip invalid spans
        if start_char >= len(text) or end_char > len(text) or start_char >= end_char:
            continue
            
        # Find word indices for start and end characters
        if start_char in char_to_word:
            start_word = char_to_word[start_char]
            # Find the last word of the entity
            end_word = char_to_word.get(end_char - 1, start_word)
            
            # Tag the first word as B-entity
            tags[start_word] = f"B-{entity_type}"
            
            # Tag subsequent words as I-entity
            for word_idx in range(start_word + 1, end_word + 1):
                tags[word_idx] = f"I-{entity_type}"
    
    return words, tags





In [None]:
def convert_to_bio_format(processed_data):
    """Convert processed data to BIO tagging format"""
    bio_data = []
    mapping_errors = 0
    
    for item in processed_data:
        text = item['text']
        entities = item['entities']
        
        # Convert to BIO tags
        words, tags = create_ner_tags(text, entities)
        
        # Check if we mapped any entities
        if all(tag == "O" for tag in tags) and len(entities) > 0:
            mapping_errors += 1
        
        bio_data.append({
            "tokens": words,
            "ner_tags": tags,
            "driver": item.get('driver', None)
        })
    
    print(f"Converted {len(bio_data)} messages to BIO format")
    print(f"Mapping errors: {mapping_errors} (messages where no entities were mapped)")
    
    # Show an example
    if bio_data:
        sample = bio_data[10]
        print("\nSample BIO tagging:")
        print(f"Original text: {' '.join(sample['tokens'])}")
        for token, tag in zip(sample['tokens'], sample['ner_tags']):
            print(f"  {token} -> {tag}")
    
    return bio_data

In [None]:
# Convert processed data to BIO format
bio_data = convert_to_bio_format(processed_f1_data)

### What the Function Does

The function `create_ner_tags` takes the text and entities and converts them into BIO format. It starts by splitting the text into words. 

Then, it maps each word to a tag: "O" for words that are not part of an entity, "B-" for the first word of an entity, and "I-" for subsequent words inside the entity. 

The function also uses the character positions of the entities to determine which words they correspond to. Once the tags are assigned, the function returns the words and their BIO tags, ready for use in training a Named Entity Recognition (NER) model.

## 7. Create tag mappings and prepare datasets.

### 7.1 `create_tag_mappings`

This function creates mappings between NER (Named Entity Recognition) tags and unique IDs. It does this by:

1. Collecting all unique NER tags from the `bio_data`.
2. Sorting and assigning each unique tag an ID.
3. Creating two mappings:
   - `tag2id`: Maps each tag to its corresponding ID.
   - `id2tag`: Maps each ID back to its corresponding tag.

It then prints out the mappings and returns the two dictionaries: `tag2id` and `id2tag`.

**What it does:**
- Converts NER tags into unique IDs for easier processing in machine learning models.
- Helps with transforming the tags when working with model inputs and outputs.

In [None]:
def create_tag_mappings(bio_data):
    """Create mappings between NER tags and IDs"""
    unique_tags = set()
    for item in bio_data:
        unique_tags.update(item["ner_tags"])
    
    tag2id = {tag: id for id, tag in enumerate(sorted(list(unique_tags)))}
    id2tag = {id: tag for tag, id in tag2id.items()}
    
    print(f"Created mappings for {len(tag2id)} unique tags:")
    for tag, idx in tag2id.items():
        print(f"  {tag}: {idx}")
    
    return tag2id, id2tag

In [None]:
# Create tag mappings
tag2id, id2tag = create_tag_mappings(bio_data)

---

### 7.2 `prepare_datasets`

This function prepares the dataset for training a model by splitting it into training, validation, and test sets using the Hugging Face library. Here's what it does:

1. Converts the input `bio_data` into a Hugging Face `Dataset`.
2. Splits the data into two parts: training + validation, and test.
3. Further splits the training data into training and validation sets based on the specified sizes (`test_size` and `val_size`).
4. Returns a `DatasetDict` containing the `train`, `validation`, and `test` sets.

**What it does:**
- Converts the data into a format suitable for machine learning.
- Splits the data into three parts: training, validation, and test sets for model evaluation.

In [None]:
def prepare_datasets(bio_data, test_size=0.1, val_size=0.1, seed=42):
    """Convert to Hugging Face Dataset and split into train/val/test"""
    # Convert to Hugging Face dataset
    hf_dataset = HFDataset.from_list(bio_data)
    
    # First split: train + validation vs test
    train_val_test = hf_dataset.train_test_split(test_size=test_size, seed=seed)
    
    # Second split: train vs validation (validation is val_size/(1-test_size) of the train set)
    val_fraction = val_size / (1 - test_size)
    train_val = train_val_test["train"].train_test_split(test_size=val_fraction, seed=seed)
    
    # Combine into DatasetDict
    datasets = DatasetDict({
        "train": train_val["train"],
        "validation": train_val["test"],
        "test": train_val_test["test"]
    })
    
    print(f"Prepared datasets with:")
    print(f"  - Train: {len(datasets['train'])} examples")
    print(f"  - Validation: {len(datasets['validation'])} examples")
    print(f"  - Test: {len(datasets['test'])} examples")
    
    return datasets

In [None]:
datasets = prepare_datasets(bio_data)

---

## 8. Calling Up the Model 

In [None]:
torch.manual_seed(42)
# Cell 2: Initialize the tokenizer for DeBERTa v3 large
model_name = "microsoft/deberta-v3-large"
tokenizer = DebertaV2Tokenizer.from_pretrained(model_name)

# Check if it loaded correctly
print(f"Tokenizer loaded: {tokenizer.__class__.__name__}")
print(f"Vocabulary size: {len(tokenizer)}")

---

## 9. Custom Dataset for Deberta-v3 Tokenization

In [None]:
class F1RadioNERDataset(torch.utils.data.Dataset):
    def __init__(self, hf_dataset, tokenizer, tag2id, max_len=128):
        self.dataset = hf_dataset
        self.tokenizer = tokenizer
        self.tag2id = tag2id  # Add tag2id mapping
        self.max_len = max_len
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        tokens = item["tokens"]
        tags = item["ner_tags"]
        
        # Create a mapping from token index to word index
        word_ids = []
        all_tokens = []
        
        for word_idx, word in enumerate(tokens):
            # Tokenize each word and keep track of word indices
            word_tokens = self.tokenizer.tokenize(word)
            if not word_tokens:
                # Handle empty tokenization
                word_tokens = [self.tokenizer.unk_token]
            
            for _ in word_tokens:
                word_ids.append(word_idx)
                
            all_tokens.extend(word_tokens)
        
        # Truncate if necessary (leave room for special tokens)
        if len(all_tokens) > self.max_len - 2:  # -2 for [CLS] and [SEP]
            all_tokens = all_tokens[:self.max_len - 2]
            word_ids = word_ids[:self.max_len - 2]
        
        # Add special tokens
        encoded_input = self.tokenizer.encode_plus(
            all_tokens,
            is_split_into_words=False,  # We're passing already tokenized input
            add_special_tokens=True,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        # Initialize labels with ignore index (-100)
        labels = torch.ones(self.max_len, dtype=torch.long) * -100
        
        # Set labels based on word_ids
        # First token ([CLS]) is already -100
        for i, word_idx in enumerate(word_ids):
            if i + 1 < self.max_len - 1:  # +1 for [CLS], leave room for [SEP]
                # Convert string tag to numeric ID if needed
                if isinstance(tags[word_idx], str):
                    tag_id = self.tag2id.get(tags[word_idx], 0)  # Default to 0 (typically 'O')
                else:
                    tag_id = tags[word_idx]  # Already a numeric ID
                    
                labels[i + 1] = tag_id
        
        return {
            "input_ids": encoded_input["input_ids"].flatten(),
            "attention_mask": encoded_input["attention_mask"].flatten(),
            "labels": labels
        }



---
## 10. Pytorch Setup

### 10.1 Creating Pytorch Datasets

In [None]:
# Create PyTorch datasets - now pass the tag2id mapping
train_dataset = F1RadioNERDataset(datasets["train"], tokenizer, tag2id)
val_dataset = F1RadioNERDataset(datasets["validation"], tokenizer, tag2id)
test_dataset = F1RadioNERDataset(datasets["test"], tokenizer, tag2id)



### 10.2 Creating Dataloaders

In [None]:
# Create DataLoaders
batch_size = 8  # Reduced batch size due to model size
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)



### 10.3 Validating Samples

In [None]:
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

# Optional: Check a sample to verify everything is working
sample = train_dataset[0]
print(f"Sample input shape: {sample['input_ids'].shape}")
print(f"Sample attention mask shape: {sample['attention_mask'].shape}")
print(f"Sample labels shape: {sample['labels'].shape}")

---
## 11. Initializing Deberta

In [None]:
# Cell 4: Initialize the DeBERTa v3 large model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

num_labels = len(tag2id)  # Use your existing tag2id mapping
model = DebertaV2ForTokenClassification.from_pretrained(
    model_name, 
    num_labels=num_labels
)
model.to(device)

print(f"Model loaded: {model_name}")
print(f"Number of labels: {num_labels}")

---
## 12. Set Up the Training Configuration

In [None]:
# Cell 5: Training configuration
from sklearn.utils import compute_class_weight


epochs = 10
train_labels = []
for batch in train_loader:
    labels = batch['labels']
    # Filter ignored tokens
    mask = labels != -100
    train_labels.extend(labels[mask].numpy())

# Calculate weights per class
class_weights = compute_class_weight(
    'balanced', 
    classes=np.unique(train_labels), 
    y=train_labels
)

In [None]:
# Cell 5: Training configuration
from sklearn.utils import compute_class_weight


epochs = 10
train_labels = []
for batch in train_loader:
    labels = batch['labels']
    # Filter ignored tokens
    mask = labels != -100
    train_labels.extend(labels[mask].numpy())

# Calculate weights per class
class_weights = compute_class_weight(
    'balanced', 
    classes=np.unique(train_labels), 
    y=train_labels
)
class_weights = torch.FloatTensor(class_weights).to(device)

# Defining CrossEntropyLoss as new loss function
loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights, ignore_index=-100)

# 3. small learning rate for better fine tuning
learning_rate = 1e-5  # Reducir de 2e-5 a 1e-5

# 4. Add warmup steps for stabilizing training
warmup_steps = int(0.1 * len(train_loader) * epochs)  # 10% of total steps
# Total steps
total_steps = len(train_loader) * epochs

# optimizer Adam"
optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

# Scheduler
scheduler = get_linear_schedule_with_warmup(
    optimizer, 
    num_warmup_steps=warmup_steps,
    num_training_steps=len(train_loader) * epochs
)



In [None]:
# Metrics function
def compute_metrics(preds, labels):
    preds = np.argmax(preds, axis=2).flatten()
    labels = labels.flatten()
    
    # Remove ignored index (-100)
    mask = labels != -100
    preds = preds[mask]
    labels = labels[mask]
    
    accuracy = accuracy_score(labels, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average='weighted')
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

---

## 13. Training and Evaluation Functions

In [None]:
# Using personalized loss
def train_epoch():
    model.train()
    total_loss = 0
    
    for batch in tqdm(train_loader, desc="Training"):
        optimizer.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # Forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        
        # Reshape for loss function
        active_loss = labels != -100
        active_logits = logits.view(-1, num_labels)
        active_labels = torch.where(
            active_loss.view(-1), 
            labels.view(-1), 
            torch.tensor(loss_fn.ignore_index).type_as(labels)
        )
        
        # Calculate loss
        loss = loss_fn(active_logits, active_labels)
        total_loss += loss.item()
        
        # Backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        
    return total_loss / len(train_loader)

In [None]:
def evaluate(data_loader):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            
            loss = outputs.loss
            total_loss += loss.item()
            
            logits = outputs.logits
            all_preds.append(logits.detach().cpu().numpy())
            all_labels.append(labels.detach().cpu().numpy())
    
    all_preds = np.concatenate([p for p in all_preds], axis=0)
    all_labels = np.concatenate([l for l in all_labels], axis=0)
    
    metrics = compute_metrics(all_preds, all_labels)
    metrics['loss'] = total_loss / len(data_loader)
    
    return metrics

---
## 14. Training Loop

In [None]:
# Cell 7: Main training loop
best_f1 = 0

for epoch in range(epochs):
    print(f"\n{'='*50}")
    print(f"Epoch {epoch+1}/{epochs}")
    print(f"{'='*50}")
    
    train_loss = train_epoch()
    print(f"Training loss: {train_loss:.4f}")
    
    val_metrics = evaluate(val_loader)
    print(f"Validation loss: {val_metrics['loss']:.4f}")
    print(f"Validation metrics: accuracy={val_metrics['accuracy']:.4f}, precision={val_metrics['precision']:.4f}, "
          f"recall={val_metrics['recall']:.4f}, f1={val_metrics['f1']:.4f}")
    
    # Save best model
    if val_metrics['f1'] > best_f1:
        best_f1 = val_metrics['f1']
        torch.save(model.state_dict(), 'best_deberta_ner_model.pt')
        print(f"New best model saved with F1: {best_f1:.4f}")

print("\nTraining complete!")



In [None]:
from sklearn.metrics import classification_report

# Evaluate on validation set 
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in val_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs.logits, dim=2)
        
        # Filter -100 padding tokens -100
        active_mask = labels != -100
        true = labels[active_mask].cpu().numpy()
        pred = preds[active_mask].cpu().numpy()
        
        all_labels.extend(true)
        all_preds.extend(pred)

# Convertir índices a etiquetas
true_tags = [id2tag[l] for l in all_labels]
pred_tags = [id2tag[p] for p in all_preds]

# Imprimir el classification report
print(classification_report(true_tags, pred_tags))

---
## 14.1 Test Set Evaluation

In [None]:
# Evaluate on test set
print("\nEvaluating on test set...")
test_metrics = evaluate(test_loader)
print(f"Test loss: {test_metrics['loss']:.4f}")
print(f"Test metrics: accuracy={test_metrics['accuracy']:.4f}, precision={test_metrics['precision']:.4f}, "
      f"recall={test_metrics['recall']:.4f}, f1={test_metrics['f1']:.4f}")

----


----

----

In [177]:
from transformers import BertTokenizerFast, BertForTokenClassification
from torch.utils.data import DataLoader
import torch

In [178]:
# Inicialización del tokenizador para BERT large preentrenado en NER
torch.manual_seed(42)
model_name = "dbmdz/bert-large-cased-finetuned-conll03-english"
tokenizer = BertTokenizerFast.from_pretrained(model_name)

# Check if it loaded correctly
print(f"Tokenizer loaded: {tokenizer.__class__.__name__}")
print(f"Vocabulary size: {len(tokenizer)}")

Tokenizer loaded: BertTokenizerFast
Vocabulary size: 28996


In [179]:
# Inicialización del modelo BERT large
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

num_labels = len(tag2id)
model = BertForTokenClassification.from_pretrained(
    model_name, 
    num_labels=num_labels,
    id2label={i: l for l, i in tag2id.items()},
    label2id=tag2id,
    ignore_mismatched_sizes=True  # Para manejar la diferencia en la capa final
)
model.to(device)

print(f"Model loaded: {model_name}")
print(f"Number of labels: {num_labels}")

Using device: cuda


Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([9]) in the checkpoint and torch.Size([19]) in the model instanti

Model loaded: dbmdz/bert-large-cased-finetuned-conll03-english
Number of labels: 19


In [182]:
class F1RadioNERDataset(torch.utils.data.Dataset):
    def __init__(self, hf_dataset, tokenizer, tag2id, max_len=128):
        self.dataset = hf_dataset
        self.tokenizer = tokenizer
        self.tag2id = tag2id
        self.max_len = max_len
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        words = item["tokens"]
        tags = item["ner_tags"]
        
        # Convertir tags de string a ID si es necesario
        tag_ids = []
        for tag in tags:
            if isinstance(tag, str):
                tag_ids.append(self.tag2id[tag])
            else:
                tag_ids.append(tag)
        
        # Tokenizar el texto y alinear etiquetas
        tokenized_inputs = self.tokenizer(
            words,
            is_split_into_words=True,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        # Inicializar etiquetas con -100
        labels = torch.ones(self.max_len, dtype=torch.long) * -100
        
        # Obtener word_ids para alinear etiquetas
        word_ids = tokenized_inputs.word_ids(batch_index=0)
        
        # Establecer etiquetas para tokens no especiales
        previous_word_idx = None
        for i, word_idx in enumerate(word_ids):
            if word_idx is not None:
                if word_idx < len(tag_ids):
                    # Si primera subpalabra, asignar etiqueta
                    # Si no (token continuación), asignar -100 o misma etiqueta según lo que prefieras
                    if word_idx != previous_word_idx:  # Nueva palabra
                        labels[i] = tag_ids[word_idx]
                    else:  # Continuación de palabra
                        # Opción 1: Usar -100 para continuaciones
                        # labels[i] = -100
                        # Opción 2: Usar misma etiqueta para subpalabras
                        labels[i] = tag_ids[word_idx]
            previous_word_idx = word_idx
        
        return {
            "input_ids": tokenized_inputs["input_ids"].flatten(),
            "attention_mask": tokenized_inputs["attention_mask"].flatten(),
            "labels": labels
        }

# Crear los datasets
train_dataset = F1RadioNERDataset(datasets["train"], tokenizer, tag2id)
val_dataset = F1RadioNERDataset(datasets["validation"], tokenizer, tag2id)
test_dataset = F1RadioNERDataset(datasets["test"], tokenizer, tag2id)

# Crear los dataloaders
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
# Mantener la misma configuración de entrenamiento
from sklearn.utils import compute_class_weight

epochs = 10
train_labels = []
for batch in train_loader:
    labels = batch['labels']
    # Filter ignored tokens
    mask = labels != -100
    train_labels.extend(labels[mask].numpy())

# Calculate weights per class
class_weights = compute_class_weight(
    'balanced', 
    classes=np.unique(train_labels), 
    y=train_labels
)
class_weights = torch.FloatTensor(class_weights).to(device)

# Defining CrossEntropyLoss with class weights
loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights, ignore_index=-100)

# Learning rate reducido
# 2e-5
learning_rate = 1e-5

# Add warmup steps
warmup_steps = int(0.1 * len(train_loader) * epochs)  # 10% of total steps
total_steps = len(train_loader) * epochs

# Optimizer Adam
optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

# Scheduler
scheduler = get_linear_schedule_with_warmup(
    optimizer, 
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
)

In [None]:
# Usar el mismo ciclo de entrenamiento
best_f1 = 0

for epoch in range(epochs):
    print(f"\n{'='*50}")
    print(f"Epoch {epoch+1}/{epochs}")
    print(f"{'='*50}")
    
    train_loss = train_epoch()  # La función train_epoch() ya definida
    print(f"Training loss: {train_loss:.4f}")
    
    val_metrics = evaluate(val_loader)  # La función evaluate() ya definida
    print(f"Validation loss: {val_metrics['loss']:.4f}")
    print(f"Validation metrics: accuracy={val_metrics['accuracy']:.4f}, precision={val_metrics['precision']:.4f}, "
          f"recall={val_metrics['recall']:.4f}, f1={val_metrics['f1']:.4f}")
    
    # Save best model
    if val_metrics['f1'] > best_f1:
        best_f1 = val_metrics['f1']
        torch.save(model.state_dict(), 'best_bert_large_ner_model.pt')
        print(f"New best model saved with F1: {best_f1:.4f}")

print("\nTraining complete!")


Epoch 1/10


Training:   0%|          | 0/40 [00:00<?, ?it/s]

Training loss: 3.0243


Evaluating:   0%|          | 0/5 [00:00<?, ?it/s]

Validation loss: 2.8536
Validation metrics: accuracy=0.0814, precision=0.2006, recall=0.0814, f1=0.0800
New best model saved with F1: 0.0800

Epoch 2/10


Training:   0%|          | 0/40 [00:00<?, ?it/s]

Training loss: 2.7945


Evaluating:   0%|          | 0/5 [00:00<?, ?it/s]

Validation loss: 2.8052
Validation metrics: accuracy=0.1265, precision=0.3278, recall=0.1265, f1=0.1148
New best model saved with F1: 0.1148

Epoch 3/10


Training:   0%|          | 0/40 [00:00<?, ?it/s]

Training loss: 2.4414


Evaluating:   0%|          | 0/5 [00:00<?, ?it/s]

Validation loss: 2.6126
Validation metrics: accuracy=0.1823, precision=0.4099, recall=0.1823, f1=0.1751
New best model saved with F1: 0.1751

Epoch 4/10


Training:   0%|          | 0/40 [00:00<?, ?it/s]

Training loss: 2.0613


Evaluating:   0%|          | 0/5 [00:00<?, ?it/s]

Validation loss: 2.5291
Validation metrics: accuracy=0.1796, precision=0.3414, recall=0.1796, f1=0.1577

Epoch 5/10


Training:   0%|          | 0/40 [00:00<?, ?it/s]

In [None]:
# Evaluación final con classification report
from sklearn.metrics import classification_report

# Evaluate on test set
print("\nEvaluating on test set...")
test_metrics = evaluate(test_loader)
print(f"Test loss: {test_metrics['loss']:.4f}")
print(f"Test metrics: accuracy={test_metrics['accuracy']:.4f}, precision={test_metrics['precision']:.4f}, "
      f"recall={test_metrics['recall']:.4f}, f1={test_metrics['f1']:.4f}")

# Classification report detallado
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs.logits, dim=2)
        
        # Filter -100 padding tokens
        active_mask = labels != -100
        true = labels[active_mask].cpu().numpy()
        pred = preds[active_mask].cpu().numpy()
        
        all_labels.extend(true)
        all_preds.extend(pred)

# Convertir índices a etiquetas
true_tags = [id2tag[l] for l in all_labels]
pred_tags = [id2tag[p] for p in all_preds]

# Imprimir el classification report
print(classification_report(true_tags, pred_tags))