# Xây dụng mô hình RNN cho bài toán Part-of-Speech Tagging

## Task 1: Tải và tiền xử lý dữ liệu

1. Viết hàm đọc file `.conllu`

In [1]:
def load_conllu(file_path):
    """
    Đọc file .conllu và trả về danh sách các câu.
    Mỗi câu là một list các tuple (word, upos_tag)
    """
    sentences = []
    sentence = []

    with open(file_path, encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line == "":  # kết thúc một câu
                if sentence:
                    sentences.append(sentence)
                    sentence = []
            elif line.startswith("#"):  # comment → bỏ qua
                continue
            else:
                parts = line.split("\t")
                if len(parts) >= 4:
                    word = parts[1]
                    upos = parts[3]
                    sentence.append((word, upos))
        # Thêm câu cuối nếu file không kết thúc bằng line trống
        if sentence:
            sentences.append(sentence)
    return sentences

train_path = "../Data/UD_English-EWT/en_ewt-ud-train.conllu"
dev_path = "../Data/UD_English-EWT/en_ewt-ud-dev.conllu"

train_sentences = load_conllu(train_path)
dev_sentences   = load_conllu(dev_path)


2. Xây dựng từ điển

In [2]:
# Từ điển word → index
word_to_ix = {}
for sent in train_sentences:
    for word, tag in sent:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)
# Thêm token <UNK> cho từ không có trong từ điển
word_to_ix["<UNK>"] = len(word_to_ix)

# Từ điển tag → index
tag_to_ix = {}
for sent in train_sentences:
    for word, tag in sent:
        if tag not in tag_to_ix:
            tag_to_ix[tag] = len(tag_to_ix)

# In ra kích thước từ điển
print("Kích thước của word_to_ix:", len(word_to_ix))
print("Số nhãn UPOS:", len(tag_to_ix))


Kích thước của word_to_ix: 20201
Số nhãn UPOS: 18


## Task 2: Tạo PyTorch Dataset và DataLoader

In [3]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

class POSDataset(Dataset):
    def __init__(self, sentences, word_to_ix, tag_to_ix):
        """
        sentences: Danh sách các câu đã xử lý.
        word_to_ix: Mapping của từ-> index
        tag_to_ix: Mapping của nhãn -> index
        """
        self.sentences = sentences
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix
    
    def __len__(self):
        return len(self.sentences)
    
    def __getitem__(self, index):
        sentence = self.sentences[index]
        words = [word for word, tag in sentence]
        tags  = [tag for word, tag in sentence]
        
        # Chuyển sang index, dùng <UNK> nếu từ không có trong từ điển
        word_indices = [self.word_to_ix.get(w, self.word_to_ix["<UNK>"]) for w in words]
        tag_indices  = [self.tag_to_ix[t] for t in tags]
        
        return torch.tensor(word_indices, dtype=torch.long), torch.tensor(tag_indices, dtype=torch.long)

def collate_fn(batch):
    """
    batch: danh sách các (sentence_indices, tag_indices)
    Trả về:
        - sentences_padded: tensor (batch_size, max_len)
        - tags_padded: tensor (batch_size, max_len)
        - lengths: danh sách độ dài thực của từng câu
    """
    sentences, tags = zip(*batch)
    lengths = torch.tensor([len(s) for s in sentences], dtype=torch.long)
    
    # Pad sequences về cùng chiều dài
    sentences_padded = pad_sequence(sentences, batch_first=True, padding_value=0)
    tags_padded      = pad_sequence(tags, batch_first=True, padding_value=-100)  # -100 cho ignore_index
    
    return sentences_padded, tags_padded, lengths

train_dataset = POSDataset(train_sentences, word_to_ix, tag_to_ix)
dev_dataset   = POSDataset(dev_sentences, word_to_ix, tag_to_ix)

batch_size = 32

train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
)

dev_loader = DataLoader(
    dev_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)

## Task 3: Xây dựng mô hình RNN

In [4]:
import torch
import torch.nn as nn

class SimpleRNNForTokenClassification(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes, padding_idx=0):
        """
        vocab_size: số lượng từ trong từ điển
        embedding_dim: kích thước embedding vector
        hidden_dim: số neuron ẩn của RNN
        num_classes: số nhãn POS (UPOS)
        padding_idx: chỉ số dùng để pad, Embedding sẽ bỏ qua
        """
        super(SimpleRNNForTokenClassification, self).__init__()
        
        # 1. Embedding layer
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dim,
            padding_idx=padding_idx
        )
        # 2. RNN layer
        self.rnn = nn.RNN(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            batch_first=True,
            bidirectional=False  # Nếu True: output hidden sẽ 2*hidden_dim
        )
        # 3. Linear layer: dự đoán nhãn cho mỗi token
        self.fc = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x, lengths=None):
        """
        x: Tensor (batch_size, seq_len) chứa index từ
        lengths: Tensor (batch_size,) độ dài thực của từng câu (nếu sử dụng packed sequence)
        """
        # x: (batch_size, seq_len) → embeddings: (batch_size, seq_len, embedding_dim)
        embedded = self.embedding(x)
        
        if lengths is not None:
            # Pack sequence để RNN bỏ qua padding
            packed_embedded = nn.utils.rnn.pack_padded_sequence(
                embedded, lengths.cpu(), batch_first=True, enforce_sorted=False
            )
            packed_output, hidden = self.rnn(packed_embedded)
            # Unpack để ra tensor bình thường (batch_size, seq_len, hidden_dim)
            output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        else:
            # Nếu không dùng lengths → input bình thường
            output, hidden = self.rnn(embedded)  # output: (batch_size, seq_len, hidden_dim)
        
        # Dự đoán nhãn cho mỗi token
        logits = self.fc(output)  # logits: (batch_size, seq_len, num_classes)
        return logits


## Task 4: Huấn luyện mô hình

In [5]:
import torch.optim as optim

vocab_size = len(word_to_ix)
embedding_dim = 100
hidden_dim = 128
num_classes = len(tag_to_ix)
padding_idx = word_to_ix["<UNK>"]  # hoặc 0 nếu pad_value = 0 trong collate_fn

model = SimpleRNNForTokenClassification(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    num_classes=num_classes,
    padding_idx=padding_idx
)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=-100)  # bỏ qua padding


num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch_idx, (sentences, tags, lengths) in enumerate(train_loader):
        sentences = sentences
        tags = tags
        lengths = lengths

        optimizer.zero_grad()
        logits = model(sentences, lengths)  # (batch_size, seq_len, num_classes)

        # Flatten batch và seq_len để tính loss
        logits_flat = logits.view(-1, num_classes)   # (batch_size*seq_len, num_classes)
        tags_flat = tags.view(-1)                    # (batch_size*seq_len,)

        loss = criterion(logits_flat, tags_flat)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

Epoch 1/10, Loss: 1.1465
Epoch 2/10, Loss: 0.6471
Epoch 3/10, Loss: 0.4886
Epoch 4/10, Loss: 0.3894
Epoch 5/10, Loss: 0.3182
Epoch 6/10, Loss: 0.2646
Epoch 7/10, Loss: 0.2215
Epoch 8/10, Loss: 0.1871
Epoch 9/10, Loss: 0.1588
Epoch 10/10, Loss: 0.1339


In [6]:
def evaluate(model, data_loader, device="cpu"):
    """
    model: mô hình đã huấn luyện
    data_loader: DataLoader của tập dev/test
    device: 'cuda' hoặc 'cpu'
    
    Trả về: token-level accuracy
    """
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for sentences, tags, lengths in data_loader:
            sentences = sentences.to(device)
            tags = tags.to(device)
            lengths = lengths.to(device)
            
            # Forward pass
            logits = model(sentences, lengths)  # (batch_size, seq_len, num_classes)
            preds = torch.argmax(logits, dim=-1)  # (batch_size, seq_len)
            
            # Flatten để so sánh
            preds_flat = preds.view(-1)
            tags_flat = tags.view(-1)
            
            # Chỉ tính các token không phải padding
            mask = tags_flat != -100
            correct += (preds_flat[mask] == tags_flat[mask]).sum().item()
            total += mask.sum().item()
    
    accuracy = correct / total if total > 0 else 0
    return accuracy

train_accuracy = evaluate(model, train_loader)
dev_accuracy = evaluate(model, dev_loader)

print(f"Độ chính xác trên Train: {train_accuracy*100:.2f}%")
print(f"Độ chính xác trên Dev: {dev_accuracy*100:.2f}%")



Độ chính xác trên Train: 96.62%
Độ chính xác trên Dev: 87.91%


In [9]:
def predict_sentence(sentence):
    # Chuyển câu thành index theo từ điển word_to_ix
    indices = [word_to_ix.get(w, word_to_ix["<UNK>"]) for w in s.split()]
    input_tensor = torch.tensor([indices], dtype=torch.long).to("cpu")  # batch_size=1
    lengths = torch.tensor([len(indices)], dtype=torch.long).to("cpu")

    # Dự đoán
    model.eval()
    with torch.no_grad():
        logits = model(input_tensor, lengths)
        preds = torch.argmax(logits, dim=-1).cpu().numpy()[0]

    # Chuyển từ index về nhãn
    pred_tags = [(w, list(tag_to_ix.keys())[list(tag_to_ix.values()).index(p)]) for w, p in zip(s.split(), preds)]
    
    return pred_tags

s = "I love NLP"
print(f"Câu: {s}")
print(f"Dự đoán:", predict_sentence(s))

Câu: I love NLP
Dự đoán: [('I', 'PRON'), ('love', 'VERB'), ('NLP', 'NOUN')]
