# BERT Model for Named Entity Recognition (NER)


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForTokenClassification
from sklearn.metrics import accuracy_score, classification_report
import re
from seqeval.metrics import f1_score, precision_score, recall_score, classification_report
from tqdm import tqdm

In [None]:
# Load dataset
df = pd.read_csv('../../Datasets/FINAL/DATASET_BERT_CHUNKED.csv')
df.head()

In [None]:
# Check dataset information
print(f"Dataset shape: {df.shape}")
print("\nColumns:", df.columns.tolist())
print("\nNull values:\n", df.isnull().sum())

In [None]:
# Process the unique labels
unique_labels = set()
for labels in df['labels'].str.split():
    unique_labels.update(labels)
unique_labels = sorted(list(unique_labels))
print(f"Unique labels: {unique_labels}")

# Create label to id mapping
label_to_id = {label: i for i, label in enumerate(unique_labels)}
id_to_label = {i: label for i, label in enumerate(unique_labels)}

print(f"\nLabel to ID mapping: {label_to_id}")

In [None]:
# Create dataset class
class NERDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        labels = self.labels[idx].split()
        
        encoding = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )
        
        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attention_mask'].squeeze()
        
        # Align the labels with tokens (handling word pieces)
        tokens = self.tokenizer.tokenize(text)
        token_labels = []
        
        text_words = text.split()
        
        # Prepare word to token map
        word_ids = []
        current_word_idx = -1
        
        for token_idx, token in enumerate(tokens):
            if token.startswith("##"):
                # This is a continuation of the previous word
                word_ids.append(current_word_idx)
            else:
                # This is a new word
                current_word_idx += 1
                word_ids.append(current_word_idx)
                
            if current_word_idx >= len(labels):
                break
                
        # Convert labels to IDs and align with tokens
        label_ids = [-100] * self.max_len  # -100 is ignored by PyTorch loss functions
        
        # Add [CLS] token label
        label_ids[0] = -100
        
        for token_idx, word_idx in enumerate(word_ids):
            if word_idx < len(labels) and token_idx + 1 < self.max_len:
                label_ids[token_idx + 1] = label_to_id[labels[word_idx]]
        
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label_ids, dtype=torch.long)
        }

In [None]:
# Load pretrained tokenizer (using correct Indonesian BERT model)
tokenizer = BertTokenizer.from_pretrained('indobenchmark/indobert-base-p1')

# Load base BERT model and add new classification head for our labels
from transformers import BertConfig
config = BertConfig.from_pretrained('indobenchmark/indobert-base-p1')
config.num_labels = len(label_to_id)

model = BertForTokenClassification.from_pretrained(
    'indobenchmark/indobert-base-p1',
    config=config,
    ignore_mismatched_sizes=True
)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Using device: {device}")
print(f"Model will be trained for {len(label_to_id)} labels")

In [None]:
# Split the dataset
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
print(f"Training set size: {train_df.shape[0]}")
print(f"Test set size: {test_df.shape[0]}")

# Create datasets
train_dataset = NERDataset(
    train_df['text'].tolist(),
    train_df['labels'].tolist(),
    tokenizer
)

test_dataset = NERDataset(
    test_df['text'].tolist(),
    test_df['labels'].tolist(),
    tokenizer
)

# Create data loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=False
)

In [None]:
# Training function
def train(model, dataloader, optimizer, device, epoch):
    model.train()
    total_loss = 0
    
    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}")
    
    for batch in progress_bar:
        optimizer.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        
        loss = outputs.loss
        total_loss += loss.item()
        
        loss.backward()
        optimizer.step()
        
        progress_bar.set_postfix({'loss': f"{loss.item():.4f}"})
    
    return total_loss / len(dataloader)

In [None]:
# Evaluation function
def evaluate(model, dataloader, device):
    model.eval()
    
    true_labels = []
    predicted_labels = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=2)
            
            # Remove padding and ignored tokens
            for i in range(labels.shape[0]):
                true_seq = []
                pred_seq = []
                for j in range(labels.shape[1]):
                    if labels[i, j] != -100:
                        true_seq.append(id_to_label[labels[i, j].item()])
                        pred_seq.append(id_to_label[predictions[i, j].item()])
                
                true_labels.append(true_seq)
                predicted_labels.append(pred_seq)
    
    return true_labels, predicted_labels

In [None]:
# Train the model
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
num_epochs = 3

for epoch in range(num_epochs):
    avg_loss = train(model, train_loader, optimizer, device, epoch)
    print(f"Epoch {epoch+1}/{num_epochs} - Average loss: {avg_loss:.4f}")

In [None]:
# Evaluate the model
true_labels, predicted_labels = evaluate(model, test_loader, device)

# Print performance metrics
print("F1 Score:", f1_score(true_labels, predicted_labels))
print("Precision:", precision_score(true_labels, predicted_labels))
print("Recall:", recall_score(true_labels, predicted_labels))
print("\nClassification Report:")
print(classification_report(true_labels, predicted_labels))

In [None]:
# Save the model
model_save_path = './models/bert_ner_model'
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)
print(f"Model saved to {model_save_path}")

In [None]:
# Function to predict NER for new text
def predict_ner(text, model, tokenizer, device):
    model.eval()
    
    # Tokenize the text
    encoded_input = tokenizer(
        text,
        padding='max_length',
        truncation=True,
        max_length=512,
        return_tensors='pt'
    )
    
    input_ids = encoded_input['input_ids'].to(device)
    attention_mask = encoded_input['attention_mask'].to(device)
    
    # Make prediction
    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
    logits = outputs.logits
    predictions = torch.argmax(logits, dim=2)
    
    # Convert predictions to labels
    predicted_labels = []
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    
    for token, prediction in zip(tokens, predictions[0]):
        if token in [tokenizer.cls_token, tokenizer.sep_token, tokenizer.pad_token]:
            continue
            
        predicted_label = id_to_label[prediction.item()]
        predicted_labels.append((token, predicted_label))
    
    return predicted_labels

In [None]:
# Test with a new document
sample_text = """
PUTUSAN Nomor 123/Pid. B/2020/PN Jkt DEMI KEADILAN BERDASARKAN KETUHANAN YANG MAHA ESA
Pengadilan Negeri Jakarta yang mengadili perkara pidana dengan acara pemeriksaan biasa dalam 
tingkat pertama menjatuhkan putusan sebagai berikut dalam perkara Terdakwa:
Nama lengkap : Budi Santoso;
Tempat lahir : Jakarta;
Umur/tanggal lahir : 35 Tahun/10 Januari 1985;
Jenis kelamin : Laki-laki;
Kebangsaan : Indonesia;
"""

predicted_entities = predict_ner(sample_text, model, tokenizer, device)

# Display results
for token, label in predicted_entities:
    if label != 'O':  # Only show named entities
        print(f"{token}: {label}")

In [None]:
# Visualize the entities in a more structured way
from IPython.display import display, HTML
import pandas as pd

def visualize_entities(text, model, tokenizer, device):
    # Get predictions
    encoded_input = tokenizer(
        text,
        padding='max_length',
        truncation=True,
        max_length=512,
        return_tensors='pt'
    )
    
    input_ids = encoded_input['input_ids'].to(device)
    attention_mask = encoded_input['attention_mask'].to(device)
    
    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
    
    logits = outputs.logits
    predictions = torch.argmax(logits, dim=2)
    
    # Get tokens and predictions
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    token_predictions = [id_to_label[p.item()] for p in predictions[0]]
    
    # Create a dataframe for visualization
    df = pd.DataFrame({
        'Token': tokens,
        'Prediction': token_predictions
    })
    
    # Filter out special tokens
    special_tokens = [tokenizer.cls_token, tokenizer.sep_token, tokenizer.pad_token]
    df = df[~df['Token'].isin(special_tokens)]
    
    # Only show entities (not O)
    entity_df = df[df['Prediction'] != 'O']
    
    # Group entities
    entities = []
    current_entity = None
    current_type = None
    
    for i, row in df.iterrows():
        if row['Prediction'].startswith('B_'):
            if current_entity:
                entities.append((current_entity, current_type))
            current_entity = row['Token']
            current_type = row['Prediction'][2:]  # Remove B_ prefix
        elif row['Prediction'].startswith('I_') and current_entity and row['Prediction'][2:] == current_type:
            current_entity += " " + row['Token'].replace('##', '')
        elif row['Prediction'] == 'O':
            if current_entity:
                entities.append((current_entity, current_type))
                current_entity = None
                current_type = None
    
    if current_entity:
        entities.append((current_entity, current_type))
    
    # Create a result dataframe
    result_df = pd.DataFrame(entities, columns=['Entity', 'Type'])
    
    return result_df

# Test with sample text
result = visualize_entities(sample_text, model, tokenizer, device)
display(result)

In [None]:
# Load a full document for testing
def process_document(text, model, tokenizer, device, chunk_size=400):
    words = text.split()
    chunks = [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)]
    
    all_entities = []
    
    for chunk in chunks:
        chunk_entities = visualize_entities(chunk, model, tokenizer, device)
        all_entities.append(chunk_entities)
    
    # Combine results
    if all_entities:
        return pd.concat(all_entities, ignore_index=True)
    else:
        return pd.DataFrame(columns=['Entity', 'Type'])

# Use a sample from the dataset
document_text = df['text'].iloc[0]

# Process the document
document_entities = process_document(document_text, model, tokenizer, device)
display(document_entities)