# Named Entity Extraction with BERT

## What is BERT?

Read the paper: https://arxiv.org/pdf/1810.04805

Read this blog to understand transformers: https://jalammar.github.io/illustrated-transformer/

## CoNLL dataset

Read more about it here: https://www.clips.uantwerpen.be/conll2003/ner/

In [None]:
%pip install transformers datasets

In [None]:
from datasets import load_dataset

dataset = load_dataset("conll2003")

In [6]:
# Let's inspect the dataset
sentence_0 = dataset["train"][0]
print(sentence_0)

{'id': '0', 'tokens': ['EU', 'rejects', 'German', 'call', 'to', 'boycott', 'British', 'lamb', '.'], 'pos_tags': [22, 42, 16, 21, 35, 37, 16, 21, 7], 'chunk_tags': [11, 21, 11, 12, 21, 22, 11, 12, 0], 'ner_tags': [3, 0, 7, 0, 0, 0, 7, 0, 0]}


Can you identify each of the keys?

Explain:
- id
- tokens
- pos_tags
- chunk_tags
- ner_tags



In [7]:
sentence_0_str = " ".join(dataset["train"][0]['tokens'])
sentence_0_str

'EU rejects German call to boycott British lamb .'

In [8]:
# map pos tag numbers to their labels
pos_tags = dataset["train"].features["pos_tags"].feature.names
chunk_tags = dataset["train"].features["chunk_tags"].feature.names
ner_tags = dataset["train"].features["ner_tags"].feature.names

print(pos_tags)
print(chunk_tags)
print(ner_tags)

['"', "''", '#', '$', '(', ')', ',', '.', ':', '``', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'NN|SYM', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB']
['O', 'B-ADJP', 'I-ADJP', 'B-ADVP', 'I-ADVP', 'B-CONJP', 'I-CONJP', 'B-INTJ', 'I-INTJ', 'B-LST', 'I-LST', 'B-NP', 'I-NP', 'B-PP', 'I-PP', 'B-PRT', 'I-PRT', 'B-SBAR', 'I-SBAR', 'B-UCP', 'I-UCP', 'B-VP', 'I-VP']
['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC']


In [9]:
print("POS Tags for sentence_0")
for i, pos_ in enumerate(sentence_0['pos_tags']):
    print(f"{pos_}  \t- {pos_tags[pos_]} \t- {sentence_0['tokens'][i]}")

POS Tags for sentence_0
22  	- NNP 	- EU
42  	- VBZ 	- rejects
16  	- JJ 	- German
21  	- NN 	- call
35  	- TO 	- to
37  	- VB 	- boycott
16  	- JJ 	- British
21  	- NN 	- lamb
7  	- . 	- .


In [10]:
print("Chunk Tags for sentence_0")
for i, chunk_ in enumerate(sentence_0['chunk_tags']):
    print(f"{chunk_} \t- {chunk_tags[chunk_]} \t- {sentence_0['tokens'][i]}")

Chunk Tags for sentence_0
11 	- B-NP 	- EU
21 	- B-VP 	- rejects
11 	- B-NP 	- German
12 	- I-NP 	- call
21 	- B-VP 	- to
22 	- I-VP 	- boycott
11 	- B-NP 	- British
12 	- I-NP 	- lamb
0 	- O 	- .


In [11]:
print("NER Tags for sentence_0")
for i, ner_ in enumerate(sentence_0['ner_tags']):
    print(f"{ner_} \t- {ner_tags[ner_]} \t\t- {sentence_0['tokens'][i]}")

NER Tags for sentence_0
3 	- B-ORG 		- EU
0 	- O 		- rejects
7 	- B-MISC 		- German
0 	- O 		- call
0 	- O 		- to
0 	- O 		- boycott
7 	- B-MISC 		- British
0 	- O 		- lamb
0 	- O 		- .


In [12]:
# Check the size of the dataset
print(f"Train size: {len(dataset['train'])}")
print(f"Validation size: {len(dataset['validation'])}")
print(f"Test size: {len(dataset['test'])}")

Train size: 14041
Validation size: 3250
Test size: 3453


## Dataset preparation for traning

## Implement the model

In [None]:
from transformers import AutoTokenizer
import torch
from torch.utils.data import DataLoader, Dataset

class NERDataset(Dataset):
    def __init__(self, tokenized_dataset):
        self.tokenized_dataset = tokenized_dataset

    def __len__(self):
        return len(self.tokenized_dataset)

    def __getitem__(self, idx):
        item = self.tokenized_dataset[idx]
        return {
            'input_ids': torch.tensor(item['input_ids'], dtype=torch.long),
            'attention_mask': torch.tensor(item['attention_mask'], dtype=torch.long),
            'labels': torch.tensor(item['labels'], dtype=torch.long)
        }

def prepare_data():
    # Load the CoNLL-2003 dataset
    dataset = load_dataset("conll2003")

    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Get label list from dataset
    label_list = dataset["train"].features["ner_tags"].feature.names

    def tokenize_and_align_labels(examples):
        tokenized_inputs = tokenizer(
            examples["tokens"],
            truncation=True,
            is_split_into_words=True,
            padding='max_length',
            max_length=128,
            return_tensors="pt"  # Return PyTorch tensors
        )

        labels = []
        for i, label in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []
            for word_idx in word_ids:
                if word_idx is None:
                    label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)
                previous_word_idx = word_idx
            labels.append(label_ids)

        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    # Tokenize datasets
    tokenized_datasets = dataset.map(
        tokenize_and_align_labels,
        batched=True,
        remove_columns=dataset["train"].column_names
    )

    return tokenized_datasets, len(label_list)


In [None]:
from transformers import BertForTokenClassification
import torch.nn as nn

class NERModel(nn.Module):
    def __init__(self, num_labels):
        super().__init__()
        self.bert = BertForTokenClassification.from_pretrained(
            'bert-base-cased',
            num_labels=num_labels
        )

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        return outputs

## Implement the traning loop

In [None]:
def train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=3):
    # Set the model in training mode
    model.train()

    # Early Stopping parameters
    patience = 2
    best_eval_loss = float('inf')
    epochs_without_improvement = 0

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        total_train_loss = 0

        for batch in train_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()

            # Forward pass
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss  # Access the loss from the model output
            total_train_loss += loss.item()

            # Backward pass and optimization step
            loss.backward()
            optimizer.step()
            scheduler.step()  # Update learning rate

        # Calculate the average training loss
        avg_train_loss = total_train_loss / len(train_dataloader)
        print(f"Training loss: {avg_train_loss:.4f}")

        # Validation phase
        model.eval()  # Set the model to evaluation mode
        total_eval_loss = 0

        with torch.no_grad():  # Disable gradient computation for validation
            for batch in eval_dataloader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)

                # Forward pass (no backpropagation in evaluation mode)
                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                total_eval_loss += loss.item()

        # Calculate the average validation loss
        eval_loss = total_eval_loss / len(eval_dataloader)
        print(f"Validation loss: {eval_loss:.4f}")

        # Early Stopping Check
        if eval_loss < best_eval_loss:
            best_eval_loss = eval_loss
            epochs_without_improvement = 0
            print("Model improved, saving best model...")
            model.save_pretrained("best_model")
            tokenizer.save_pretrained("best_model")
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= patience:
                print("Early stopping triggered.")
                break

        # Return to training mode after evaluation
        model.train()

    print("Training complete!")

## Implement the evaluation function

In [None]:
%pip install seqeval

In [None]:
from seqeval.metrics import classification_report

def evaluate_model(model, test_dataloader, id2label, device):
    model.eval()  # Set the model to evaluation mode
    true_labels = []
    predicted_labels = []

    with torch.no_grad():  # Disable gradient computation for evaluation
        for batch in test_dataloader:
            # Move input data to the device
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)  # True labels for each token

            # Forward pass
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits  # Raw prediction scores

            # Convert logits to label indices
            predictions = torch.argmax(logits, dim=-1)

            # Process each sentence in the batch
            for i in range(len(labels)):
                # Extract the true labels and predictions for each token
                true_label_ids = labels[i]
                pred_label_ids = predictions[i]

                # Convert label indices to label names
                true_label = [id2label[label_id.item()] for label_id in true_label_ids if label_id != -100]
                pred_label = [id2label[pred_id.item()] for pred_id, label_id in zip(pred_label_ids, true_label_ids) if label_id != -100]

                # Append to the lists for evaluation
                true_labels.append(true_label)
                predicted_labels.append(pred_label)

    # Print classification report
    print(classification_report(true_labels, predicted_labels))


## Implement the main() function

In [None]:
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

def main():
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Load and prepare dataset
    print("Loading and preparing dataset...")
    tokenized_datasets, num_labels = prepare_data()

    # Convert to custom Dataset objects
    train_dataset = NERDataset(tokenized_datasets["train"])
    eval_dataset = NERDataset(tokenized_datasets["validation"])
    test_dataset = NERDataset(tokenized_datasets["test"])

    # Create data loaders
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=32,  # Increased batch size
        shuffle=True
    )
    eval_dataloader = DataLoader(
        eval_dataset,
        batch_size=32
    )
    test_dataloader = DataLoader(
        test_dataset,
        batch_size=32
    )

    # Model initialization
    model = NERModel(num_labels=num_labels)
    model.to(device)

    # Define optimizer with better parameters
    optimizer = AdamW(
        model.parameters(),
        lr=9e-5,              # Reduced learning rate
        weight_decay=0.02,    # Increased weight decay for regularization
        eps=1e-8              # Stability parameter
    )

    # Learning rate scheduler
    num_epochs = 3
    num_training_steps = num_epochs * len(train_dataloader)
    num_warmup_steps = num_training_steps // 10
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=num_training_steps
    )

    # Training
    print("Starting training...")
    train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=num_epochs)

    # Evaluation
    print("\nEvaluating on validation set...")
    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    evaluate_model(model, eval_dataloader, id2label, device)

    return model

In [None]:
model = main()

Using device: cuda
Loading and preparing dataset...


Map:   0%|          | 0/3453 [00:00<?, ? examples/s]

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...
Epoch 1/3


## Implement inference function


In [None]:
from transformers import AutoTokenizer
import torch
import numpy as np

from transformers import AutoTokenizer
import torch
import numpy as np

def prepare_inference(model_path=None):
    """Initialize tokenizer and load model for inference"""
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Load trained model if path provided, otherwise use existing model
    if model_path:
        model = torch.load(model_path)

    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    return tokenizer, id2label

def inference(text, model, tokenizer, id2label, device='cuda'):
    """
    Perform NER inference on input text

    Args:
        text (str): Input text to analyze
        model: Trained NER model
        tokenizer: BERT tokenizer
        id2label (dict): Mapping from label ids to label names
        device (str): Device to run inference on ('cuda' or 'cpu')

    Returns:
        list: List of tuples containing (word, entity_label)
    """
    ## fill in your code

    # Ensure model is in evaluation mode


    # Tokenize the text

    # Perform inference


    # Convert predictions to labels


    # Align predictions with words




    # Combine words with their predicted labels


    return labeled_words

def print_entities(labeled_words):
    """Pretty print the labeled entities"""
    current_entity = None
    entity_text = []

    for word, label in labeled_words:
        if label == "O":
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = None
                entity_text = []
        elif label.startswith("B-"):
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
            current_entity = label[2:]  # Remove "B-" prefix
            entity_text = [word]
        elif label.startswith("I-"):
            if current_entity == label[2:]:  # If it's the same entity type
                entity_text.append(word)
            else:
                if current_entity:
                    print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = label[2:]
                entity_text = [word]

    if current_entity:  # Print last entity if exists
        print(f"{current_entity}: {' '.join(entity_text)}")

In [None]:
# First initialize
tokenizer, id2label = prepare_inference()

# Example texts to analyze
texts = [
    "John Smith works at Microsoft in Seattle and visited New York last summer.",
    "The European Union signed a trade deal with Japan in Brussels.",
    "Tesla CEO Elon Musk announced new features coming to their vehicles."
]

# Process each text
for text in texts:
    print("\nText:", text)
    print("Entities found:")
    results = inference(text, model, tokenizer, id2label)
    print_entities(results)

In [None]:
# save the model to drive

from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


In [None]:
save_path = "/content/drive/MyDrive/BERT_NER/bert_ner_model.pth"
torch.save(model, save_path)

In [None]:
model = torch.load(save_path)

  model = torch.load(save_path)
