# Assignment Week 5: Named Entity Extraction with BER

In [1]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Install the necessary libraries


In [2]:
%pip install torch



In [3]:
%pip install transformers datasets



In [4]:
%pip install seqeval



In [5]:
%pip install transformers



Import the Libraries

In [6]:
from transformers import AutoTokenizer
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
from torch.optim import AdamW
from transformers import BertForTokenClassification, get_linear_schedule_with_warmup
from seqeval.metrics import classification_report

In [7]:
from datasets import load_dataset

dataset = load_dataset("conll2003")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [8]:
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 14041
    })
    validation: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3250
    })
    test: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3453
    })
})

In [9]:
sentence_0_str = " ".join(dataset["train"][0]['tokens'])
sentence_0_str

'EU rejects German call to boycott British lamb .'

In [10]:
# map pos tag numbers to their labels
pos_tags = dataset["train"].features["pos_tags"].feature.names
chunk_tags = dataset["train"].features["chunk_tags"].feature.names
ner_tags = dataset["train"].features["ner_tags"].feature.names

print(pos_tags)
print(chunk_tags)
print(ner_tags)

['"', "''", '#', '$', '(', ')', ',', '.', ':', '``', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'NN|SYM', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB']
['O', 'B-ADJP', 'I-ADJP', 'B-ADVP', 'I-ADVP', 'B-CONJP', 'I-CONJP', 'B-INTJ', 'I-INTJ', 'B-LST', 'I-LST', 'B-NP', 'I-NP', 'B-PP', 'I-PP', 'B-PRT', 'I-PRT', 'B-SBAR', 'I-SBAR', 'B-UCP', 'I-UCP', 'B-VP', 'I-VP']
['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC']


Data preparation

In [11]:
class NERDataset(Dataset):
    def __init__(self, tokenized_dataset):
        self.tokenized_dataset = tokenized_dataset

    def __len__(self):
        return len(self.tokenized_dataset)

    def __getitem__(self, idx):
        item = self.tokenized_dataset[idx]
        return {
            'input_ids': torch.tensor(item['input_ids'], dtype=torch.long),
            'attention_mask': torch.tensor(item['attention_mask'], dtype=torch.long),
            'labels': torch.tensor(item['labels'], dtype=torch.long)
        }

In [12]:
def prepare_data():
    # Load the CoNLL-2003 dataset
    dataset = load_dataset("conll2003")
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    def tokenize_and_align_labels(examples):
        tokenized_inputs = tokenizer(
            examples["tokens"],
            truncation=True,
            is_split_into_words=True,
            padding='max_length',
            max_length=128,
            return_tensors="pt"
        )

        labels = []
        for i, label in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []
            for word_idx in word_ids:
                if word_idx is None:
                    label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)
                previous_word_idx = word_idx
            labels.append(label_ids)

        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    # Tokenize datasets
    tokenized_datasets = dataset.map(tokenize_and_align_labels, batched=True, remove_columns=dataset["train"].column_names)
    label_list = dataset["train"].features["ner_tags"].feature.names

    return tokenized_datasets, len(label_list), label_list

Model Implementation

In [13]:
class NERModel(nn.Module):
    def __init__(self, num_labels):
        super(NERModel, self).__init__()
        self.bert = AutoModelForTokenClassification.from_pretrained('bert-base-cased', num_labels=num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask, labels=labels)
        return outputs

In [14]:
def train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=3, patience=2):
    best_val_loss = float("inf")
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for batch_idx, batch in enumerate(train_dataloader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_dataloader)
        val_loss = evaluate_model_loss(model, eval_dataloader, device)
        scheduler.step(val_loss)
        print(f"Epoch {epoch + 1}/{num_epochs} - Train Loss: {avg_train_loss:.4f} - Val Loss: {val_loss:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print("Early stopping triggered.")
                break

In [15]:
def evaluate_model_loss(model, eval_dataloader, device):
    model.eval()
    total_val_loss = 0

    with torch.no_grad():
        for batch in eval_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            total_val_loss += outputs.loss.item()

    avg_val_loss = total_val_loss / len(eval_dataloader)
    return avg_val_loss

In [16]:
def evaluate_model(model, test_dataloader, id2label, device):
    model.eval()
    true_labels, predicted_labels = [], []

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            predictions = torch.argmax(outputs.logits, dim=-1)

            for label_ids, pred_ids in zip(labels, predictions):
                true_labels_batch = [id2label[label.item()] for label in label_ids if label != -100]
                predicted_labels_batch = [id2label[pred.item()] for pred, label in zip(pred_ids, label_ids) if label != -100]

                if len(true_labels_batch) == len(predicted_labels_batch):
                    true_labels.append(true_labels_batch)
                    predicted_labels.append(predicted_labels_batch)

    print(classification_report(true_labels, predicted_labels))

Train the model

In [17]:

from transformers import AutoModelForTokenClassification

# Define your model class
class NERModel(nn.Module):
    def __init__(self, num_labels):
        super(NERModel, self).__init__()
        self.bert = AutoModelForTokenClassification.from_pretrained('bert-base-cased', num_labels=num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        return outputs


In [None]:
def main():
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Load and prepare dataset
    tokenized_datasets, num_labels, label_list = prepare_data()
    id2label = {i: label for i, label in enumerate(label_list)}

    train_dataset = NERDataset(tokenized_datasets["train"])
    eval_dataset = NERDataset(tokenized_datasets["validation"])
    test_dataset = NERDataset(tokenized_datasets["test"])

    train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    eval_dataloader = DataLoader(eval_dataset, batch_size=16)
    test_dataloader = DataLoader(test_dataset, batch_size=16)

    # Model, optimizer, and scheduler
    model = NERModel(num_labels=num_labels).to(device)
    optimizer = AdamW(model.parameters(), lr=5e-5, weight_decay=0.01, eps=1e-8)
    num_epochs = 3
    num_training_steps = num_epochs * len(train_dataloader)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_training_steps // 10, num_training_steps=num_training_steps)

    # Training
    print("Starting training...")
    train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=num_epochs, patience=2)

    # Save the fine-tuned model
    model_save_path = "ner_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved to {model_save_path}")

    # Evaluation
    print("\nEvaluating on test set...")
    evaluate_model(model, test_dataloader, id2label, device)

# Run main function
if __name__ == "__main__":
    main()

Using device: cpu


Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...


Inference

In [None]:
import torch
from google.colab import drive
from transformers import AutoTokenizer, AutoModelForTokenClassification

# Specify the path to save the model
model_path = '/content/ner_model.pth'

def prepare_inference(model_path=model_path):
    """Initialize tokenizer and load model for inference"""
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Load the model architecture with correct num_labels
    model = AutoModelForTokenClassification.from_pretrained('bert-base-cased', num_labels=9)

    # Load the saved state dictionary
    state_dict = torch.load(model_path, weights_only=True)
    model.load_state_dict(state_dict, strict=False)  # strict=True for full load

    # Define the label mapping
    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    return tokenizer, model, id2label

In [None]:
def inference(text, model, tokenizer, id2label):
    """
    Perform NER inference on input text using CPU if CUDA is unavailable.
    """
    device = torch.device("cpu")
    model.to(device)
    model.eval()

    # Tokenize the text with offsets
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, return_offsets_mapping=True)
    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)
    offsets = inputs["offset_mapping"]

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

    predictions = torch.argmax(logits, dim=-1).cpu().numpy()[0]

    # Align predictions with words using offsets
    labeled_words = []
    for i, (start, end) in enumerate(offsets[0]):
        if start == end:
            continue
        label = id2label[predictions[i]]
        word = tokenizer.decode(input_ids[0][i:i + 1])
        labeled_words.append((word, label))

    return labeled_words

In [None]:
def print_entities(labeled_words):
    """Pretty print the labeled entities"""
    current_entity = None
    entity_text = []

    for word, label in labeled_words:
        if label == "O":
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = None
                entity_text = []
        elif label.startswith("B-"):
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
            current_entity = label[2:]
            entity_text = [word]
        elif label.startswith("I-"):
            if current_entity == label[2:]:
                entity_text.append(word)
            else:
                if current_entity:
                    print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = label[2:]
                entity_text = [word]

    if current_entity:  # Print last entity if exists
        print(f"{current_entity}: {' '.join(entity_text)}")

In [None]:
tokenizer, model, id2label = prepare_inference()
text = "John Smith works at Microsoft in Seattle and visited New York last summer."
print("Entities found:")
labeled_words = inference(text, model, tokenizer, id2label)
print_entities(labeled_words)

Pre trained model

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch

def prepare_inference(model_path=None):
    """Initialize tokenizer and load model for inference"""
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Load trained model if path provided, otherwise use a pre-trained model for NER
    if model_path:
        model = torch.load(model_path)
    else:
        # Always use a pre-trained model
        model = AutoModelForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')

    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    return tokenizer, model, id2label

In [None]:

def inference(text, model, tokenizer, id2label):
    """
    Perform NER inference on input text using CPU if CUDA is unavailable.
    """
    device = torch.device("cpu")
    model.to(device)
    model.eval()

    # Tokenize the text with offsets
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, return_offsets_mapping=True)
    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)
    offsets = inputs["offset_mapping"]

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

    predictions = torch.argmax(logits, dim=-1).cpu().numpy()[0]

    # Align predictions with words using offsets
    labeled_words = []
    for i, (start, end) in enumerate(offsets[0]):
        if start == end:
            continue
        label = id2label[predictions[i]]
        word = tokenizer.decode(input_ids[0][i:i + 1])
        labeled_words.append((word, label))

    return labeled_words

In [None]:
def print_entities(labeled_words):
    """Pretty print the labeled entities"""
    current_entity = None
    entity_text = []

    for word, label in labeled_words:
        if label == "O":
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = None
                entity_text = []
        elif label.startswith("B-"):
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
            current_entity = label[2:]
            entity_text = [word]
        elif label.startswith("I-"):
            if current_entity == label[2:]:
                entity_text.append(word)
            else:
                if current_entity:
                    print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = label[2:]
                entity_text = [word]

    if current_entity:
        print(f"{current_entity}: {' '.join(entity_text)}")

In [None]:
tokenizer, model, id2label = prepare_inference()
text = "John Smith works at Microsoft in Seattle and visited New York last summer."
print("Entities found:")
labeled_words = inference(text, model, tokenizer, id2label)
print_entities(labeled_words)