In [1]:
%%capture
!pip install "datasets==2.21.0"

In [2]:
# Lab 5 (NER) - RNN cho Nhận dạng Thực thể Tên (NER)

import math
from typing import List, Tuple, Dict

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

from datasets import load_dataset

PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"


In [3]:
def load_conll2003(split_train: str = "train", split_val: str = "validation"):
    """Tải bộ dữ liệu CoNLL 2003 từ Hugging Face datasets.
    Trả về:
        train_sentences, train_tags, val_sentences, val_tags, id2tag (list)
    """
    dataset = load_dataset("conll2003", trust_remote_code=True)

    train_ds = dataset[split_train]
    val_ds = dataset[split_val]

    train_sentences = train_ds["tokens"]
    train_ner_ids = train_ds["ner_tags"]
    val_sentences = val_ds["tokens"]
    val_ner_ids = val_ds["ner_tags"]

    feature = train_ds.features["ner_tags"].feature
    id2tag = list(feature.names)

    train_tags = [[id2tag[i] for i in seq] for seq in train_ner_ids]
    val_tags = [[id2tag[i] for i in seq] for seq in val_ner_ids]

    return train_sentences, train_tags, val_sentences, val_tags, id2tag


def build_vocabs(
    sentences: List[List[str]], tags: List[List[str]]
) -> Tuple[Dict[str, int], Dict[str, int]]:
    """Xây dựng word_to_ix và tag_to_ix."""
    word_to_ix: Dict[str, int] = {}
    tag_to_ix: Dict[str, int] = {}

    word_to_ix[PAD_TOKEN] = 0
    word_to_ix[UNK_TOKEN] = 1
    next_word_idx = 2

    for sent in sentences:
        for w in sent:
            if w not in word_to_ix:
                word_to_ix[w] = next_word_idx
                next_word_idx += 1

    next_tag_idx = 0
    for seq_tags in tags:
        for t in seq_tags:
            if t not in tag_to_ix:
                tag_to_ix[t] = next_tag_idx
                next_tag_idx += 1

    return word_to_ix, tag_to_ix


print("Loading dataset...")
train_sentences, train_tags, val_sentences, val_tags, id2tag = load_conll2003()

print("Building vocabularies...")
word_to_ix, tag_to_ix = build_vocabs(train_sentences, train_tags)

print(f"Vocab size: {len(word_to_ix)}")
print(f"Number of NER tags (without pad): {len(tag_to_ix)}")


Loading dataset...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading builder script:   0%|          | 0.00/9.57k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/12.3k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/983k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/14041 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3250 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/3453 [00:00<?, ? examples/s]

Building vocabularies...
Vocab size: 23625
Number of NER tags (without pad): 9


In [4]:
class NERDataset(Dataset):
    def __init__(
        self,
        sentences: List[List[str]],
        tags: List[List[str]],
        word_to_ix: Dict[str, int],
        tag_to_ix: Dict[str, int],
    ):
        assert len(sentences) == len(tags)
        self.sentences = sentences
        self.tags = tags
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self) -> int:
        return len(self.sentences)

    def __getitem__(self, idx: int):
        words = self.sentences[idx]
        tags = self.tags[idx]

        unk_idx = self.word_to_ix[UNK_TOKEN]
        word_indices = torch.tensor(
            [self.word_to_ix.get(w, unk_idx) for w in words],
            dtype=torch.long,
        )

        tag_indices = torch.tensor(
            [self.tag_to_ix[t] for t in tags],
            dtype=torch.long,
        )

        return word_indices, tag_indices


def ner_collate_fn(
    batch: List[Tuple[torch.Tensor, torch.Tensor]],
    pad_idx_word: int,
    pad_idx_tag: int,
):
    """Pad các câu và nhãn trong batch về cùng độ dài."""
    sentences, tags = zip(*batch)

    lengths = [len(s) for s in sentences]
    max_len = max(lengths)

    padded_sentences = []
    padded_tags = []

    for s, t in zip(sentences, tags):
        pad_len = max_len - len(s)
        if pad_len > 0:
            s_padded = torch.cat([s, torch.full((pad_len,), pad_idx_word, dtype=torch.long)])
            t_padded = torch.cat([t, torch.full((pad_len,), pad_idx_tag, dtype=torch.long)])
        else:
            s_padded = s
            t_padded = t
        padded_sentences.append(s_padded)
        padded_tags.append(t_padded)

    batch_sentences = torch.stack(padded_sentences, dim=0)
    batch_tags = torch.stack(padded_tags, dim=0)
    lengths_tensor = torch.tensor(lengths, dtype=torch.long)

    return batch_sentences, batch_tags, lengths_tensor


# Chuẩn bị Dataset & DataLoader
if "<pad>" not in tag_to_ix:
    pad_tag_idx = len(tag_to_ix)
    tag_to_ix["<pad>"] = pad_tag_idx
else:
    pad_tag_idx = tag_to_ix["<pad>"]

pad_word_idx = word_to_ix[PAD_TOKEN]

train_dataset = NERDataset(train_sentences, train_tags, word_to_ix, tag_to_ix)
val_dataset = NERDataset(val_sentences, val_tags, word_to_ix, tag_to_ix)


def collate_wrapper(batch):
    return ner_collate_fn(batch, pad_idx_word=pad_word_idx, pad_idx_tag=pad_tag_idx)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_wrapper)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, collate_fn=collate_wrapper)

len(train_dataset), len(val_dataset)


(14041, 3250)

In [5]:
class SimpleBiLSTMForNER(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        tagset_size: int,
        embedding_dim: int = 128,
        hidden_dim: int = 256,
        num_layers: int = 1,
        pad_idx: int = 0,
    ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
        )
        self.fc = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, x: torch.Tensor, lengths: torch.Tensor) -> torch.Tensor:
        """x: (B, T), lengths: (B,)"""
        embeds = self.embedding(x)

        packed = nn.utils.rnn.pack_padded_sequence(
            embeds,
            lengths.cpu(),
            batch_first=True,
            enforce_sorted=False,
        )

        packed_out, _ = self.lstm(packed)
        out, _ = nn.utils.rnn.pad_packed_sequence(
            packed_out, batch_first=True, total_length=x.size(1)
        )

        logits = self.fc(out)
        return logits


vocab_size = len(word_to_ix)
tagset_size = len(tag_to_ix)

model = SimpleBiLSTMForNER(
    vocab_size=vocab_size,
    tagset_size=tagset_size,
    embedding_dim=128,
    hidden_dim=256,
    num_layers=1,
    pad_idx=pad_word_idx,
)

model


SimpleBiLSTMForNER(
  (embedding): Embedding(23625, 128, padding_idx=0)
  (lstm): LSTM(128, 256, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=512, out_features=10, bias=True)
)

In [6]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    total_tokens = 0

    for sentences, tags, lengths in dataloader:
        sentences = sentences.to(device)
        tags = tags.to(device)
        lengths = lengths.to(device)

        optimizer.zero_grad()
        logits = model(sentences, lengths)

        B, T, C = logits.shape
        logits_flat = logits.view(B * T, C)
        tags_flat = tags.view(B * T)

        loss = criterion(logits_flat, tags_flat)
        loss.backward()
        optimizer.step()

        with torch.no_grad():
            num_valid = (tags_flat != criterion.ignore_index).sum().item()

        total_loss += loss.item() * num_valid
        total_tokens += num_valid

    avg_loss = total_loss / max(total_tokens, 1)
    return avg_loss


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    total_tokens = 0
    total_correct = 0

    with torch.no_grad():
        for sentences, tags, lengths in dataloader:
            sentences = sentences.to(device)
            tags = tags.to(device)
            lengths = lengths.to(device)

            logits = model(sentences, lengths)
            B, T, C = logits.shape
            logits_flat = logits.view(B * T, C)
            tags_flat = tags.view(B * T)

            loss = criterion(logits_flat, tags_flat)

            preds_flat = torch.argmax(logits_flat, dim=-1)
            mask = tags_flat != criterion.ignore_index

            correct = (preds_flat[mask] == tags_flat[mask]).sum().item()
            num_valid = mask.sum().item()

            total_loss += loss.item() * num_valid
            total_tokens += num_valid
            total_correct += correct

    avg_loss = total_loss / max(total_tokens, 1)
    acc = total_correct / max(total_tokens, 1)
    return avg_loss, acc


In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=pad_tag_idx)

num_epochs = 3
best_val_acc = 0.0

for epoch in range(1, num_epochs + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)

    print(
        f"Epoch {epoch}/{num_epochs} - "
        f"train_loss: {train_loss:.4f} - "
        f"val_loss: {val_loss:.4f} - "
        f"val_acc: {val_acc:.4f}"
    )

    if val_acc > best_val_acc:
        best_val_acc = val_acc

print(f"Best validation accuracy: {best_val_acc:.4f}")


Epoch 1/3 - train_loss: 0.5180 - val_loss: 0.3128 - val_acc: 0.9078
Epoch 2/3 - train_loss: 0.2127 - val_loss: 0.2092 - val_acc: 0.9395
Epoch 3/3 - train_loss: 0.1052 - val_loss: 0.1823 - val_acc: 0.9472
Best validation accuracy: 0.9472


In [8]:
def predict_sentence(
    model: nn.Module,
    sentence: str,
    word_to_ix: Dict[str, int],
    ix_to_tag: Dict[int, str],
    device: torch.device,
):
    model.eval()
    tokens = sentence.split()
    unk_idx = word_to_ix[UNK_TOKEN]
    indices = torch.tensor(
        [[word_to_ix.get(w, unk_idx) for w in tokens]], dtype=torch.long
    )
    lengths = torch.tensor([len(tokens)], dtype=torch.long)

    with torch.no_grad():
        indices = indices.to(device)
        lengths = lengths.to(device)
        logits = model(indices, lengths)
        preds = torch.argmax(logits, dim=-1).squeeze(0)

    pred_tags = [ix_to_tag[int(i)] for i in preds.cpu().tolist()]
    return list(zip(tokens, pred_tags))


ix_to_tag = {v: k for k, v in tag_to_ix.items()}
example = "VNU University is located in Hanoi"
pairs = predict_sentence(model, example, word_to_ix, ix_to_tag, device)

print("Example prediction:")
for w, t in pairs:
    print(f"{w}\t{t}")


Example prediction:
VNU	B-ORG
University	I-ORG
is	O
located	O
in	O
Hanoi	B-LOC
