# POS Tagging (PyTorch)

In [2]:
# Imports
import os
from typing import List, Tuple
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np

# Config 
DATA_DIR = "data/UD_English-EWT"   
TRAIN_FILE = os.path.join(DATA_DIR, "en_ewt-ud-train.conllu")
DEV_FILE   = os.path.join(DATA_DIR, "en_ewt-ud-dev.conllu")

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)


Device: cpu


# Hàm đọc file .conllu

In [3]:
def load_conllu(file_path: str) -> List[List[Tuple[str, str]]]:
    """
    Đọc file conllu, trả về danh sách câu; mỗi câu là danh sách (word, upos)
    Ignores multiword tokens / comments; uses column 2 (FORM) and 4 (UPOS).
    """
    sentences = []
    with open(file_path, "r", encoding="utf-8") as f:
        sent = []
        for line in f:
            line = line.strip()
            if not line:
                if sent:
                    sentences.append(sent)
                    sent = []
                continue
            if line.startswith("#"):
                continue
            parts = line.split("\t")
            if len(parts) < 5:
                continue
            # skip multiword or empty id lines (like "1-2")
            token_id = parts[0]
            if "-" in token_id or "." in token_id:
                continue
            form = parts[1]
            upos = parts[3]
            sent.append((form, upos))
        # last sentence
        if sent:
            sentences.append(sent)
    return sentences

# Load data
train_sents = load_conllu(TRAIN_FILE)
dev_sents   = load_conllu(DEV_FILE)

print("Train sentences:", len(train_sents))
print("Dev sentences:  ", len(dev_sents))
# print example
print(train_sents[0][:10])


FileNotFoundError: [Errno 2] No such file or directory: 'data/UD_English-EWT\\en_ewt-ud-train.conllu'

# Xây dựng vocab cho từ & tag

In [None]:
from collections import Counter, defaultdict

# Special tokens
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"

def build_vocabs(sentences):
    word_counter = Counter()
    tag_set = set()
    for sent in sentences:
        for w, t in sent:
            word_counter[w] += 1
            tag_set.add(t)
    # create word_to_ix with PAD at 0, UNK at 1
    word_to_ix = {PAD_TOKEN: 0, UNK_TOKEN: 1}
    for i, (w, _) in enumerate(word_counter.most_common(), start=2):
        word_to_ix[w] = i
    tag_to_ix = {PAD_TOKEN: 0}  # pad label index = 0, we'll ignore it in loss
    for i, tag in enumerate(sorted(tag_set), start=1):
        tag_to_ix[tag] = i
    return word_to_ix, tag_to_ix

word_to_ix, tag_to_ix = build_vocabs(train_sents)
ix_to_tag = {v:k for k,v in tag_to_ix.items()}

print("Vocab size (words):", len(word_to_ix))
print("Num tags:", len(tag_to_ix))
print("Some tags:", list(tag_to_ix.items())[:10])


# Dataset và collate_fn (pad per-batch)

In [None]:
class POSDataset(Dataset):
    def __init__(self, sentences, word_to_ix, tag_to_ix):
        self.sentences = sentences
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sent = self.sentences[idx]
        words = [w for w,t in sent]
        tags  = [t for w,t in sent]
        # convert to indices
        w_idxs = [ self.word_to_ix.get(w, self.word_to_ix[UNK_TOKEN]) for w in words ]
        t_idxs = [ self.tag_to_ix[t] for t in tags ]
        return torch.tensor(w_idxs, dtype=torch.long), torch.tensor(t_idxs, dtype=torch.long)

# collate_fn: pads sequences in a batch
def collate_fn(batch):
    """
    batch: list of tuples (word_tensor, tag_tensor) variable lengths
    returns: padded_word_tensor (B, L), padded_tag_tensor (B, L), lengths list
    """
    words, tags = zip(*batch)
    lengths = torch.tensor([w.size(0) for w in words], dtype=torch.long)
    words_padded = pad_sequence(words, batch_first=True, padding_value=word_to_ix[PAD_TOKEN])
    tags_padded  = pad_sequence(tags,  batch_first=True, padding_value=tag_to_ix[PAD_TOKEN])
    return words_padded, tags_padded, lengths

# Create datasets and loaders
train_dataset = POSDataset(train_sents, word_to_ix, tag_to_ix)
dev_dataset   = POSDataset(dev_sents,   word_to_ix, tag_to_ix)

BATCH_SIZE = 64

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
dev_loader   = DataLoader(dev_dataset,   batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

# quick check
for xb, yb, lengths in train_loader:
    print("batch shapes:", xb.shape, yb.shape, "lengths:", lengths[:5])
    break


# Mô hình RNN (Embedding + RNN + Linear)

In [None]:
class SimpleRNNTagger(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags, n_layers=1, bidirectional=False, dropout=0.0):
        super(SimpleRNNTagger, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=word_to_ix[PAD_TOKEN])
        self.rnn = nn.RNN(input_size=embedding_dim,
                          hidden_size=hidden_dim,
                          num_layers=n_layers,
                          batch_first=True,
                          bidirectional=bidirectional,
                          nonlinearity='tanh',
                          dropout=dropout if n_layers>1 else 0.0)
        self.bidirectional = bidirectional
        rnn_output_dim = hidden_dim * (2 if bidirectional else 1)
        self.classifier = nn.Linear(rnn_output_dim, num_tags)

    def forward(self, x, lengths=None):
        # x: (B, L)
        emb = self.embedding(x)  # (B, L, E)
        # RNN (we are NOT packing here — packing optional)
        rnn_out, _ = self.rnn(emb)  # (B, L, H*directions)
        logits = self.classifier(rnn_out)  # (B, L, num_tags)
        return logits

# Instantiate
vocab_size = len(word_to_ix)
embedding_dim = 100
hidden_dim = 128
num_tags = len(tag_to_ix)

model = SimpleRNNTagger(vocab_size=vocab_size, embedding_dim=embedding_dim,
                        hidden_dim=hidden_dim, num_tags=num_tags,
                        n_layers=1, bidirectional=False, dropout=0.1)
model.to(DEVICE)
print(model)


# Loss, optimizer, helper functions

In [None]:
# Loss: ignore_index = tag_to_ix[PAD_TOKEN] so pad tokens not counted
criterion = nn.CrossEntropyLoss(ignore_index=tag_to_ix[PAD_TOKEN])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def compute_accuracy(preds, labels, ignore_index=tag_to_ix[PAD_TOKEN]):
    """
    preds: (B, L) long, labels: (B, L)
    returns: token-level accuracy ignoring PAD
    """
    mask = (labels != ignore_index)
    correct = (preds == labels) & mask
    total = mask.sum().item()
    if total == 0:
        return 0.0
    return correct.sum().item() / total

# For convenience: function to convert indices -> tags
def idxs_to_tags(idxs_batch):
    # idxs_batch: 1D or 2D tensor
    if isinstance(idxs_batch, torch.Tensor):
        idxs = idxs_batch.cpu().numpy()
    else:
        idxs = np.array(idxs_batch)
    if idxs.ndim == 1:
        return [ix_to_tag.get(int(i), "UNK") for i in idxs]
    else:
        return [[ix_to_tag.get(int(i), "UNK") for i in row] for row in idxs]


# Vòng huấn luyện & đánh giá

In [None]:
from tqdm import tqdm

def evaluate(model, data_loader):
    model.eval()
    total_loss = 0.0
    total_acc = 0.0
    total_batches = 0
    with torch.no_grad():
        for xb, yb, lengths in data_loader:
            xb = xb.to(DEVICE)
            yb = yb.to(DEVICE)
            logits = model(xb)  # (B, L, num_tags)
            # reshape for loss: (B*L, num_tags), labels (B*L)
            B, L, C = logits.shape
            logits_flat = logits.view(-1, C)
            labels_flat = yb.view(-1)
            loss = criterion(logits_flat, labels_flat)
            total_loss += loss.item()
            preds = torch.argmax(logits, dim=-1)
            acc = compute_accuracy(preds, yb)
            total_acc += acc
            total_batches += 1
    avg_loss = total_loss / (total_batches if total_batches>0 else 1)
    avg_acc = total_acc / (total_batches if total_batches>0 else 1)
    return avg_loss, avg_acc

# Training loop
NUM_EPOCHS = 10
best_dev_acc = 0.0
best_state = None

for epoch in range(1, NUM_EPOCHS+1):
    model.train()
    running_loss = 0.0
    for xb, yb, lengths in tqdm(train_loader, desc=f"Epoch {epoch}"):
        xb = xb.to(DEVICE)
        yb = yb.to(DEVICE)
        optimizer.zero_grad()
        logits = model(xb)
        B, L, C = logits.shape
        logits_flat = logits.view(-1, C)
        labels_flat = yb.view(-1)
        loss = criterion(logits_flat, labels_flat)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_train_loss = running_loss / len(train_loader)
    dev_loss, dev_acc = evaluate(model, dev_loader)
    print(f"Epoch {epoch} | Train loss: {avg_train_loss:.4f} | Dev loss: {dev_loss:.4f} | Dev acc: {dev_acc:.4f}")
    # save best
    if dev_acc > best_dev_acc:
        best_dev_acc = dev_acc
        best_state = {k:v.cpu() for k,v in model.state_dict().items()}
        print(" -> New best dev acc:", best_dev_acc)
# load best
if best_state is not None:
    model.load_state_dict(best_state)
print("Training finished. Best dev acc:", best_dev_acc)


# Hàm predict_sentence

In [None]:
from torch.nn.utils.rnn import pad_sequence
import re

def simple_tokenize(text: str):
    # basic whitespace tokenizer; you can replace with better tokenizer
    return text.strip().split()

def predict_sentence(model, sentence: str, word_to_ix, idx_to_tag, device=DEVICE, max_len=None):
    model.eval()
    tokens = simple_tokenize(sentence)
    idxs = [ word_to_ix.get(w, word_to_ix[UNK_TOKEN]) for w in tokens ]
    tensor = torch.tensor(idxs, dtype=torch.long).unsqueeze(0)  # (1, L)
    tensor = tensor.to(device)
    with torch.no_grad():
        logits = model(tensor)  # (1, L, num_tags)
        preds = torch.argmax(logits, dim=-1).squeeze(0)  # (L,)
    pred_tags = [ idx_to_tag.get(int(i), "UNK") for i in preds.cpu().numpy() ]
    return list(zip(tokens, pred_tags))

# Example predictions
examples = [
    "I love NLP",
    "She is reading the book",
    "The quick brown fox jumps over the lazy dog"
]

for s in examples:
    print(s, "->", predict_sentence(model, s, word_to_ix, ix_to_tag))


# In báo cáo chi tiết: accuracy, vài ví dụ dev

In [None]:

dev_loss, dev_acc = evaluate(model, dev_loader)
print("Final dev loss:", dev_loss, "dev acc:", dev_acc)

# Show predictions on first 10 dev sentences
for i in range(10):
    sent = dev_sents[i]
    words = " ".join(w for w,t in sent)
    preds = predict_sentence(model, words, word_to_ix, ix_to_tag)
    gold = [t for w,t in sent]
    print("SENT:", words)
    print("PRED:", preds)
    print("GOLD:", gold)
    print("-"*60)
