# lab6_rnn_for_pos_tagging
- Task 1: Tải và Tiền xử lý Dữ liệu
- Task 2: Tạo PyTorch Dataset và DataLoader
- Task 3: Xây dựng Mô hình RNN
- Task 4: Huấn luyện Mô hình
- Task 5: Đánh giá Mô hình

In [1]:
import os
import re
import math
import random
from typing import List, Tuple, Dict
from collections import Counter


import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)

# Chọn device, nếu có GPU thì dùng GPU, không thì dùng CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

**Task 1: Tải và Tiền xử lý Dữ liệu**

In [2]:
# Task 1.1: Hàm đọc file .conllu -> danh sách các câu [(word, upos), ...]
def load_conllu(file_path: str) -> List[List[Tuple[str, str]]]:
    sentences = []
    current = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            # Loại bỏ khoảng trắng thừa 
            line = line.strip()

            # Nếu gặp dòng trống, kết thúc câu hiện tại
            if not line:
                if current:
                    sentences.append(current)
                    current = []
                continue

            # Bỏ qua các dòng chú thích
            if line.startswith('#'):
                continue

            # Phân tách cột theo tab
            cols = line.split('\t')

            # Bỏ qua các dòng không đủ cột
            if len(cols) < 4:
                continue

            # Lấy ID
            tok_id = cols[0]

            # Bỏ qua multi-word tokens (vd: 3-4) hoặc empty nodes (3.1)
            if '-' in tok_id or '.' in tok_id:
                continue

            # Lấy FORM và UPOS
            form = cols[1]
            upos = cols[3]

            # Thêm (form, upos) vào câu hiện tại
            current.append((form, upos))
    
    # Thêm câu cuối cùng nếu có
    if current:
        sentences.append(current)
    return sentences

# Đường dẫn dữ liệu UD English-EWT trong workspace
DATA_DIR = os.path.join('..', 'UD_English-EWT') if os.path.isdir('UD_English-EWT') == False else 'UD_English-EWT'
train_path = os.path.join(DATA_DIR, 'en_ewt-ud-train.conllu')
dev_path   = os.path.join(DATA_DIR, 'en_ewt-ud-dev.conllu')

train_sents = load_conllu(train_path)
dev_sents   = load_conllu(dev_path)

print(f"Train sentences: {len(train_sents)} | Dev sentences: {len(dev_sents)}")
print('Ví dụ câu (train)[0]:', train_sents[0][:10])

Train sentences: 12544 | Dev sentences: 2001
Ví dụ câu (train)[0]: [('Al', 'PROPN'), ('-', 'PUNCT'), ('Zaman', 'PROPN'), (':', 'PUNCT'), ('American', 'ADJ'), ('forces', 'NOUN'), ('killed', 'VERB'), ('Shaikh', 'PROPN'), ('Abdullah', 'PROPN'), ('al', 'PROPN')]


In [3]:
# Task 1.2: Xây dựng vocabulary từ train
PAD_TOKEN = '<PAD>'
UNK_TOKEN = '<UNK>'
PAD_TAG   = '<PAD>'

def build_vocabs(sentences: List[List[Tuple[str,str]]], min_freq=3):
    word_counter = Counter()
    tag_set = set()

    # Đếm tần suất từ
    for sent in sentences:
        for w, t in sent:
            word_counter[w] += 1
            tag_set.add(t)

    # Vocab từ
    word_to_ix = {PAD_TOKEN: 0, UNK_TOKEN: 1}
    for w, freq in word_counter.items():
        if freq >= min_freq:        # giữ từ có tần suất >= 3
            word_to_ix[w] = len(word_to_ix)

    # Vocab tag
    tag_to_ix = {PAD_TAG: 0}
    for t in sorted(tag_set):
        tag_to_ix[t] = len(tag_to_ix)

    return word_to_ix, tag_to_ix


word_to_ix, tag_to_ix = build_vocabs(train_sents)

# Inverse mapping từ ix -> tag
ix_to_tag = {v: k for k, v in tag_to_ix.items()}

print('Kích thước word_to_ix:', len(word_to_ix))
print('Kích thước tag_to_ix :', len(tag_to_ix))
print('Ví dụ 10 từ đầu:', list(word_to_ix.items())[:10])
print('Ví dụ 10 tag đầu:', list(tag_to_ix.items())[:10])


Kích thước word_to_ix: 6733
Kích thước tag_to_ix : 18
Ví dụ 10 từ đầu: [('<PAD>', 0), ('<UNK>', 1), ('Al', 2), ('-', 3), ('Zaman', 4), (':', 5), ('American', 6), ('forces', 7), ('killed', 8), ('Shaikh', 9)]
Ví dụ 10 tag đầu: [('<PAD>', 0), ('ADJ', 1), ('ADP', 2), ('ADV', 3), ('AUX', 4), ('CCONJ', 5), ('DET', 6), ('INTJ', 7), ('NOUN', 8), ('NUM', 9)]


**Task 2: Tạo PyTorch Dataset và DataLoader**

In [4]:
# Task 2.1: POSDataset và collate_fn
class POSDataset(Dataset):
    def __init__(self, sentences: List[List[Tuple[str,str]]], word_to_ix: Dict[str,int], tag_to_ix: Dict[str,int]):
        self.sentences = sentences
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    # Trả về số lượng câu trong dataset
    def __len__(self):
        return len(self.sentences)
    
    # Trả về tensor word_ids và tag_ids cho câu thứ idx
    def __getitem__(self, idx):
        sent = self.sentences[idx]

        # Lấy words và tags
        words = [w for w, _ in sent]
        tags  = [t for _, t in sent]

        # Chuyển words và tags thành IDs
        word_ids = [self.word_to_ix.get(w, self.word_to_ix[UNK_TOKEN]) for w in words]
        tag_ids  = [self.tag_to_ix[t] for t in tags]
        return torch.tensor(word_ids, dtype=torch.long), torch.tensor(tag_ids, dtype=torch.long)

def collate_fn(batch):
    # batch: list of (word_ids_tensor, tag_ids_tensor)
    word_seqs, tag_seqs = zip(*batch)

    # Tính độ dài mỗi câu
    lengths = torch.tensor([len(x) for x in word_seqs], dtype=torch.long)

    # Pad sequences về cùng độ dài
    padded_words = pad_sequence(word_seqs, batch_first=True, padding_value=word_to_ix[PAD_TOKEN])

    # Pad tag sequences về cùng độ dài
    padded_tags  = pad_sequence(tag_seqs,  batch_first=True, padding_value=tag_to_ix[PAD_TAG])
    return padded_words, padded_tags, lengths

In [5]:
# Task 2.2: Tạo DataLoader cho train và dev
BATCH_SIZE = 64

# Tạo dataset và dataloader
train_ds = POSDataset(train_sents, word_to_ix, tag_to_ix)
dev_ds   = POSDataset(dev_sents,   word_to_ix, tag_to_ix)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
dev_loader   = DataLoader(dev_ds,   batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

len(train_ds), len(dev_ds)

(12544, 2001)

**Task 3: Xây dựng Mô hình RNN**

In [6]:
# Task 3: Mô hình RNN đơn giản cho token classification
class SimpleRNNForTokenClassification(nn.Module):
    def __init__(self, vocab_size: int, tag_size: int, emb_dim: int = 100, hidden_dim: int = 128, num_layers: int = 1, bidirectional: bool = False, dropout: float = 0.1):
        super().__init__()

        # Các lớp của mô hình
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=word_to_ix[PAD_TOKEN])
        self.rnn = nn.RNN(input_size=emb_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
        out_dim = hidden_dim * (2 if bidirectional else 1)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(out_dim, tag_size)
    
    # Định nghĩa hàm forward
    def forward(self, input_ids: torch.Tensor, lengths: torch.Tensor):
        # input_ids: (B, T)
        emb = self.embedding(input_ids)  # (B, T, E)
        # Sử dụng pack để RNN bỏ qua padding
        packed = pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out, _ = self.rnn(packed)
        out, _ = pad_packed_sequence(packed_out, batch_first=True)  # (B, T, H) B là batch size, T là độ dài câu, H là hidden size
        out = self.dropout(out)
        logits = self.classifier(out)  # (B, T, C) B là batch size, T là độ dài câu, C là số tags
        return logits

**Task 4: Huấn luyện Mô hình**

In [7]:
# Task 4: Khởi tạo mô hình, optimizer, loss

# Thiết lập siêu tham số
vocab_size = len(word_to_ix)
tag_size   = len(tag_to_ix)
emb_dim    = 100 # kích thước embedding
hidden_dim = 128 # kích thước hidden state
bidirectional = True # sử dụng RNN hai chiều

model = SimpleRNNForTokenClassification(vocab_size, tag_size, emb_dim=emb_dim, hidden_dim=hidden_dim, bidirectional=bidirectional).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=tag_to_ix[PAD_TAG])

model

SimpleRNNForTokenClassification(
  (embedding): Embedding(6733, 100, padding_idx=0)
  (rnn): RNN(100, 128, batch_first=True, bidirectional=True)
  (dropout): Dropout(p=0.1, inplace=False)
  (classifier): Linear(in_features=256, out_features=18, bias=True)
)

In [8]:
# Task 4: Hàm train và evaluate

# Hàm tính accuracy với mask
def masked_accuracy(logits: torch.Tensor, gold: torch.Tensor, pad_idx: int) -> float:
    # logits: (B,T,C), gold: (B,T) 
    preds = logits.argmax(dim=-1)  # (B,T) lấy nhãn dự đoán
    mask = gold.ne(pad_idx)  # tạo mask cho các vị trí không phải padding
    correct = (preds.eq(gold) & mask).sum().item() # số dự đoán đúng có mask
    total = mask.sum().item()
    if total == 0:
        return 0.0
    return correct / total

# Hàm train một epoch
def train_one_epoch(model, loader, optimizer, criterion, pad_idx):
    model.train()
    total_loss = 0.0
    total_tokens = 0

    # Vòng lặp qua từng batch
    for batch in loader:
        # Lấy dữ liệu từ batch
        words, tags, lengths = batch
        words = words.to(device)
        tags  = tags.to(device)
        lengths = lengths.to(device)

        # Forward + Backward + Optimize
        optimizer.zero_grad()
        logits = model(words, lengths)
        # Flatten để tính CE: (B*T, C) vs (B*T)
        loss = criterion(logits.view(-1, logits.size(-1)), tags.view(-1))
        loss.backward()
        optimizer.step()
        # stats
        n_tokens = tags.ne(pad_idx).sum().item()
        total_tokens += n_tokens
        total_loss += loss.item() * tags.numel()  # scale theo số phần tử
    avg_loss = total_loss / (len(loader.dataset) if len(loader.dataset) > 0 else 1)
    return avg_loss

# Hàm evaluate mô hình
def evaluate(model, loader, criterion, pad_idx):
    model.eval()
    total_loss = 0.0
    total_tokens = 0
    total_correct = 0

    # Đánh giá không cần tính gradient
    with torch.no_grad():

        # Vòng lặp qua từng batch
        for batch in loader:
            # Lấy dữ liệu từ batch
            words, tags, lengths = batch
            words = words.to(device)
            tags  = tags.to(device)
            lengths = lengths.to(device)

            # Forward pass
            logits = model(words, lengths)
            loss = criterion(logits.view(-1, logits.size(-1)), tags.view(-1))
            total_loss += loss.item() * tags.numel()
            # accuracy
            acc = masked_accuracy(logits, tags, pad_idx)
            mask = tags.ne(pad_idx)
            total_correct += (logits.argmax(-1).eq(tags) & mask).sum().item()
            total_tokens += mask.sum().item()
    avg_loss = total_loss / (len(loader.dataset) if len(loader.dataset) > 0 else 1)
    avg_acc = (total_correct / total_tokens) if total_tokens > 0 else 0.0
    return avg_loss, avg_acc

In [9]:
# Task 4: Vòng lặp huấn luyện + báo cáo accuracy train/dev
EPOCHS = 20
best_dev_acc = 0.0
best_state = None

# Vòng lặp huấn luyện
for epoch in range(1, EPOCHS+1):
    # Huấn luyện một epoch
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, pad_idx=tag_to_ix[PAD_TAG])
    dev_loss, dev_acc = evaluate(model, dev_loader, criterion, pad_idx=tag_to_ix[PAD_TAG])
    print(f"Epoch {epoch:02d} | train_loss={train_loss:.4f} | dev_loss={dev_loss:.4f} | dev_acc={dev_acc:.4f}")
    
    # Lưu mô hình tốt nhất theo dev accuracy
    if dev_acc > best_dev_acc:
        best_dev_acc = dev_acc
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}

if best_state is not None:
    model.load_state_dict(best_state)
print(f"Best dev accuracy: {best_dev_acc:.4f}")

Epoch 01 | train_loss=67.9458 | dev_loss=33.3044 | dev_acc=0.7603
Epoch 02 | train_loss=36.7162 | dev_loss=25.0620 | dev_acc=0.8155
Epoch 02 | train_loss=36.7162 | dev_loss=25.0620 | dev_acc=0.8155
Epoch 03 | train_loss=27.3685 | dev_loss=20.9647 | dev_acc=0.8466
Epoch 03 | train_loss=27.3685 | dev_loss=20.9647 | dev_acc=0.8466
Epoch 04 | train_loss=21.4835 | dev_loss=18.4810 | dev_acc=0.8632
Epoch 04 | train_loss=21.4835 | dev_loss=18.4810 | dev_acc=0.8632
Epoch 05 | train_loss=17.7275 | dev_loss=16.6868 | dev_acc=0.8767
Epoch 05 | train_loss=17.7275 | dev_loss=16.6868 | dev_acc=0.8767
Epoch 06 | train_loss=15.0122 | dev_loss=15.5063 | dev_acc=0.8876
Epoch 06 | train_loss=15.0122 | dev_loss=15.5063 | dev_acc=0.8876
Epoch 07 | train_loss=12.7403 | dev_loss=14.9340 | dev_acc=0.8937
Epoch 07 | train_loss=12.7403 | dev_loss=14.9340 | dev_acc=0.8937
Epoch 08 | train_loss=11.3818 | dev_loss=14.6995 | dev_acc=0.8957
Epoch 08 | train_loss=11.3818 | dev_loss=14.6995 | dev_acc=0.8957
Epoch 09 |

**Task 5: Đánh giá Mô hình**

In [10]:
# Task 5: Hàm dự đoán câu mới

# Hàm mã hóa câu thành tensor word IDs
def encode_sentence(words: List[str], word_to_ix: Dict[str,int]):
    return torch.tensor([word_to_ix.get(w, word_to_ix[UNK_TOKEN]) for w in words], dtype=torch.long)

# Hàm dự đoán nhãn cho câu mới
def predict_sentence(model, sentence: str, word_to_ix, ix_to_tag):
    model.eval()

    # Tách câu thành từ và mã hóa thành tensor word IDs
    words = sentence.strip().split()
    x = encode_sentence(words, word_to_ix).unsqueeze(0)  # (1, T)
    lengths = torch.tensor([x.size(1)], dtype=torch.long)
    x = x.to(device)
    lengths = lengths.to(device)

    # Dự đoán nhãn
    with torch.no_grad():
        logits = model(x, lengths)  # (1, T, C)
        pred_ids = logits.argmax(dim=-1).squeeze(0).tolist()
    pred_tags = [ix_to_tag[i] for i in pred_ids]
    return list(zip(words, pred_tags))

In [11]:
# Demo nhanh
predict_sentence(model, "I love NLP .", word_to_ix, ix_to_tag)

[('I', 'PRON'), ('love', 'VERB'), ('NLP', 'NOUN'), ('.', 'PUNCT')]