# Task 1: ƒê·ªçc v√† ti·ªÅn x·ª≠ l√Ω d·ªØ li·ªáu

In [2]:
def load_conllu(file_path):
    """
    Tr·∫£ v·ªÅ: list_of_sentences, m·ªói sentence l√† list of (word, upos)
    """
    sentences = []
    with open(file_path, 'r', encoding='utf-8') as f:
        sent = []
        for line in f:
            line = line.strip()
            if not line:
                if sent:
                    sentences.append(sent)
                    sent = []
                continue
            if line.startswith('#'):
                continue
            parts = line.split('\t')
            # guard: ƒë√¥i khi c√≥ d√≤ng kh√¥ng chu·∫©n
            if len(parts) < 5:
                continue
            word = parts[1]
            upos = parts[3]
            sent.append((word, upos))
        if sent:
            sentences.append(sent)
    return sentences


In [6]:
from collections import Counter

def build_vocab(sentences, min_freq=1, lowercase=True):
    word_counter = Counter()
    tag_set = set()
    for sent in sentences:
        for w, t in sent:
            if lowercase:
                w = w.lower()
            word_counter[w] += 1
            tag_set.add(t)
    # special tokens
    word_to_ix = {'<PAD>':0, '<UNK>':1}
    for w, c in word_counter.items():
        if c >= min_freq:
            word_to_ix[w] = len(word_to_ix)
    tag_to_ix = {tag: idx for idx, tag in enumerate(sorted(tag_set))}
    
    tag_to_ix['<PAD>'] = len(tag_to_ix)
    return word_to_ix, tag_to_ix

train_sentences = load_conllu(r'E:\Nam4\NLP\level 1\NLP_22001295_HUS\data\UD_English-EWT\en_ewt-ud-train.conllu')
dev_sentences = load_conllu(r'E:\Nam4\NLP\level 1\NLP_22001295_HUS\data\UD_English-EWT\en_ewt-ud-dev.conllu')

word_to_ix, tag_to_ix = build_vocab(train_sentences, min_freq=1, lowercase=True)

print("Vocab size:", len(word_to_ix))
print("Num tags:", len(tag_to_ix))


Vocab size: 17115
Num tags: 19


# Task 2 ‚Äî Dataset & DataLoader (collate_fn)

In [7]:
import torch
from torch.utils.data import Dataset

class POSDataset(Dataset):
    def __init__(self, sentences, word_to_ix, tag_to_ix, lowercase=True):
        self.sentences = sentences
        self.w2i = word_to_ix
        self.t2i = tag_to_ix
        self.lowercase = lowercase
    
    def __len__(self):
        return len(self.sentences)
    
    def __getitem__(self, idx):
        sent = self.sentences[idx]
        words = []
        tags = []
        for w, t in sent:
            if self.lowercase:
                w = w.lower()
            w_idx = self.w2i.get(w, self.w2i.get('<UNK>'))
            t_idx = self.t2i[t]
            words.append(w_idx)
            tags.append(t_idx)
        return torch.tensor(words, dtype=torch.long), torch.tensor(tags, dtype=torch.long)


In [8]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    words = [item[0] for item in batch]
    tags = [item[1] for item in batch]
    lengths = torch.tensor([len(x) for x in words], dtype=torch.long)
    words_padded = pad_sequence(words, batch_first=True, padding_value=0)  # <PAD> index = 0
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=-100)  
    
    return words_padded, tags_padded, lengths


In [13]:
from torch.utils.data import DataLoader

train_dataset = POSDataset(train_sentences, word_to_ix, tag_to_ix)
dev_dataset = POSDataset(dev_sentences, word_to_ix, tag_to_ix)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
dev_loader = DataLoader(dev_dataset, batch_size=64, shuffle=False, collate_fn=collate_fn)


# Task 3 ‚Äî X√¢y d·ª±ng m√¥ h√¨nh RNN

In [18]:
import torch.nn as nn

class SimpleRNNForTokenClassification(nn.Module):
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags, padding_idx=0):
        super(SimpleRNNForTokenClassification, self).__init__()
        
        # 1. L·ªõp Embedding: Chuy·ªÉn ƒë·ªïi ID c·ªßa t·ª´ th√†nh vector dense
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        
        # 2. L·ªõp RNN: X·ª≠ l√Ω chu·ªói vector v√† t·∫°o ra hidden state t·∫°i m·ªói b∆∞·ªõc
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        
        # 3. L·ªõp Linear: √Ånh x·∫° t·ª´ hidden state sang kh√¥ng gian s·ªë l∆∞·ª£ng nh√£n
        self.linear = nn.Linear(hidden_dim, num_tags)
    
    def forward(self, sentence):
        embeds = self.embedding(sentence)
        rnn_out, _ = self.rnn(embeds)
        tag_scores = self.linear(rnn_out)

        return tag_scores


# Task 4 ‚Äî Hu·∫•n luy·ªán M√¥ h√¨nh

**5 b∆∞·ªõc kinh ƒëi·ªÉn trong m·ªói batch:**
1. **X√≥a gradient c≈©**: `optimizer.zero_grad()`
2. **Forward pass**: T√≠nh to√°n output t·ª´ input
3. **T√≠nh loss**: So s√°nh output v·ªõi nh√£n th·∫≠t
4. **Backward pass**: T√≠nh gradient b·∫±ng lan truy·ªÅn ng∆∞·ª£c
5. **C·∫≠p nh·∫≠t tr·ªçng s·ªë**: `optimizer.step()`

In [20]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

# 1. KH·ªûI T·∫†O M√î H√åNH

vocab_size = len(word_to_ix)
embedding_dim = 100
hidden_dim = 128
num_tags = len(tag_to_ix)

model = SimpleRNNForTokenClassification(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    num_tags=num_tags,
    padding_idx=0  # <PAD> token c√≥ index = 0
).to(device)

# Kh·ªüi t·∫°o optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Kh·ªüi t·∫°o loss function
criterion = nn.CrossEntropyLoss(ignore_index=-100)

Device: cpu


In [33]:
# 2. VI·∫æT H√ÄM HU·∫§N LUY·ªÜN

def train_epoch(model, loader, optimizer, criterion, device):

    model.train() 
    total_loss = 0.0
    num_batches = 0
    
    for batch_idx, (words_padded, tags_padded, lengths) in enumerate(loader):
        # Chuy·ªÉn d·ªØ li·ªáu sang device
        words_padded = words_padded.to(device)
        tags_padded = tags_padded.to(device)
        
        # 5 B∆Ø·ªöC KINH ƒêI·ªÇN:
        
        # B∆∞·ªõc 1: X√≥a gradient c≈©
        optimizer.zero_grad()
        
        # B∆∞·ªõc 2: Forward pass - T√≠nh to√°n output
        logits = model(words_padded)
        
        # B∆∞·ªõc 3: T√≠nh loss
        batch_size, seq_len, num_tags_out = logits.size()
        loss = criterion(logits.view(-1, num_tags_out), tags_padded.view(-1))
        
        # B∆∞·ªõc 4: Backward pass - T√≠nh gradient
        loss.backward()
        
        # B∆∞·ªõc 5: C·∫≠p nh·∫≠t tr·ªçng s·ªë
        optimizer.step()
        
        # L∆∞u loss
        total_loss += loss.item()
        num_batches += 1
        
        # In loss m·ªói 100 batches
        if (batch_idx + 1) % 100 == 0:
            print(f"  Batch {batch_idx + 1}/{len(loader)}: loss = {loss.item():.4f}")
    
    avg_loss = total_loss / num_batches
    return avg_loss


# Task 5 ‚Äî ƒê√°nh gi√° M√¥ h√¨nh

In [34]:
# 1. VI·∫æT H√ÄM ƒê√ÅNH GI√Å

def evaluate(model, loader, device, tag_pad_idx=-100):

    model.eval()  
    total_tokens = 0
    correct_predictions = 0
    
    # T·∫Øt vi·ªác t√≠nh to√°n gradient ƒë·ªÉ ti·∫øt ki·ªám b·ªô nh·ªõ
    with torch.no_grad():
        for words_padded, tags_padded, lengths in loader:
            # Chuy·ªÉn d·ªØ li·ªáu sang device
            words_padded = words_padded.to(device)
            tags_padded = tags_padded.to(device)
            
            # Forward pass
            logits = model(words_padded)  # (batch_size, seq_len, num_tags)
            
            # L·∫•y d·ª± ƒëo√°n b·∫±ng argmax tr√™n chi·ªÅu cu·ªëi c√πng
            predictions = torch.argmax(logits, dim=-1)  # (batch_size, seq_len)
            
            # T·∫°o mask ƒë·ªÉ b·ªè qua padding tokens
            mask = tags_padded != tag_pad_idx
            
            # ƒê·∫øm s·ªë token d·ª± ƒëo√°n ƒë√∫ng (kh√¥ng t√≠nh padding)
            correct_predictions += ((predictions == tags_padded) & mask).sum().item()
            total_tokens += mask.sum().item()
    
    # T√≠nh accuracy
    accuracy = correct_predictions / total_tokens if total_tokens > 0 else 0.0
    return accuracy


In [None]:
# 2. V√íNG L·∫∂P HU·∫§N LUY·ªÜN V√Ä B√ÅO C√ÅO K·∫æT QU·∫¢

print("="*80)
print("B·∫ÆT ƒê·∫¶U HU·∫§N LUY·ªÜN")
print("="*80)

num_epochs = 10
best_dev_acc = 0.0
best_epoch = 0

# L∆∞u l·ªãch s·ª≠ training
history = {
    'train_loss': [],
    'train_acc': [],
    'dev_acc': []
}

for epoch in range(1, num_epochs + 1):
    print(f"\nüìÖ Epoch {epoch}/{num_epochs}")
    print("-" * 80)
    
    # Hu·∫•n luy·ªán
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    
    # ƒê√°nh gi√° tr√™n t·∫≠p train
    train_acc = evaluate(model, train_loader, device, tag_pad_idx=-100)
    
    # ƒê√°nh gi√° tr√™n t·∫≠p dev
    dev_acc = evaluate(model, dev_loader, device, tag_pad_idx=-100)
    
    # L∆∞u l·ªãch s·ª≠
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['dev_acc'].append(dev_acc)
    
    # In k·∫øt qu·∫£
    print(f"\nüìä Results:")
    print(f"  Train Loss:     {train_loss:.4f}")
    print(f"  Train Accuracy: {train_acc:.4f} ({train_acc*100:.2f}%)")
    print(f"  Dev Accuracy:   {dev_acc:.4f} ({dev_acc*100:.2f}%)")
    
    # L∆∞u m√¥ h√¨nh t·ªët nh·∫•t
    if dev_acc > best_dev_acc:
        best_dev_acc = dev_acc
        best_epoch = epoch
        print(f"  üéâ New best model!")
        torch.save(model.state_dict(), 'best_pos_tagger.pt')

print("\n" + "="*80)
print("‚úÖ HO√ÄN TH√ÄNH HU·∫§N LUY·ªÜN")
print("="*80)
print(f"\nüèÜ K·∫æT QU·∫¢ CU·ªêI C√ôNG:")
print(f"  Best Dev Accuracy: {best_dev_acc:.4f} ({best_dev_acc*100:.2f}%)")
print(f"  Achieved at Epoch: {best_epoch}")
print("="*80)



B·∫ÆT ƒê·∫¶U HU·∫§N LUY·ªÜN

Epoch 1/10
--------------------------------------------------------------------------------
  Batch 100/392: loss = 0.0492
  Batch 100/392: loss = 0.0492
  Batch 200/392: loss = 0.0676
  Batch 200/392: loss = 0.0676
  Batch 300/392: loss = 0.0672
  Batch 300/392: loss = 0.0672

 ƒê√°nh gi√° tr√™n t·∫≠p train...

 ƒê√°nh gi√° tr√™n t·∫≠p train...
 ƒê√°nh gi√° tr√™n t·∫≠p dev...

Epoch 1 Results:
  ‚Ä¢ Train Loss:     0.0650
  ‚Ä¢ Train Accuracy: 0.9851 (98.51%)
  ‚Ä¢ Dev Accuracy:   0.8754 (87.54%)
New best model. Saving...

Epoch 2/10
--------------------------------------------------------------------------------
 ƒê√°nh gi√° tr√™n t·∫≠p dev...

Epoch 1 Results:
  ‚Ä¢ Train Loss:     0.0650
  ‚Ä¢ Train Accuracy: 0.9851 (98.51%)
  ‚Ä¢ Dev Accuracy:   0.8754 (87.54%)
New best model. Saving...

Epoch 2/10
--------------------------------------------------------------------------------
  Batch 100/392: loss = 0.0553
  Batch 100/392: loss = 0.0553
  Batch 200/

KeyboardInterrupt: 

In [None]:
# 3. V·∫º ƒê·ªí TH·ªä TRAINING HISTORY

import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot Loss
ax1.plot(range(1, num_epochs + 1), history['train_loss'], marker='o', label='Train Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training Loss')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Plot Accuracy
ax2.plot(range(1, num_epochs + 1), history['train_acc'], marker='o', label='Train Accuracy')
ax2.plot(range(1, num_epochs + 1), history['dev_acc'], marker='s', label='Dev Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Training & Dev Accuracy')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("üìà ƒê·ªì th·ªã training history ƒë√£ ƒë∆∞·ª£c v·∫Ω!")


In [None]:
# 4. H√ÄM D·ª∞ ƒêO√ÅN CHO C√ÇU M·ªöI (N√ÇNG CAO)

def predict_sentence(sentence_text, model, word_to_ix, tag_to_ix, device):
    """
    D·ª± ƒëo√°n POS tags cho m·ªôt c√¢u m·ªõi.
    
    Args:
        sentence_text: C√¢u c·∫ßn d·ª± ƒëo√°n (string), v√≠ d·ª•: "The cat is sleeping"
        model: M√¥ h√¨nh ƒë√£ train
        word_to_ix: Dict √°nh x·∫° word ‚Üí index
        tag_to_ix: Dict √°nh x·∫° tag ‚Üí index
        device: Device (CPU/GPU)
        
    Returns:
        predictions: List of (word, predicted_tag) tuples
    """
    model.eval()
    
    # T·∫°o reverse mapping: index ‚Üí tag
    ix_to_tag = {v: k for k, v in tag_to_ix.items()}
    
    # X·ª≠ l√Ω c√¢u input
    words = sentence_text.lower().split()
    
    # Chuy·ªÉn words th√†nh indices
    word_indices = []
    for word in words:
        # N·∫øu t·ª´ kh√¥ng c√≥ trong vocab, d√πng <UNK>
        word_idx = word_to_ix.get(word, word_to_ix.get('<UNK>', 1))
        word_indices.append(word_idx)
    
    # Chuy·ªÉn th√†nh tensor
    input_tensor = torch.LongTensor([word_indices]).to(device)  # Shape: (1, seq_len)
    
    # D·ª± ƒëo√°n
    with torch.no_grad():
        logits = model(input_tensor)  # Shape: (1, seq_len, num_tags)
        predictions = torch.argmax(logits, dim=-1)  # Shape: (1, seq_len)
    
    # Chuy·ªÉn predictions v·ªÅ CPU v√† l·∫•y list
    predicted_tags = predictions[0].cpu().tolist()
    
    # Decode v·ªÅ tag names
    predicted_tag_names = [ix_to_tag.get(tag_idx, '<UNK>') for tag_idx in predicted_tags]
    
    # T·∫°o k·∫øt qu·∫£
    result = list(zip(words, predicted_tag_names))
    
    return result

print("‚úÖ ƒê√£ ƒë·ªãnh nghƒ©a h√†m predict_sentence()")


In [None]:
# TEST H√ÄM D·ª∞ ƒêO√ÅN

print("\n" + "="*80)
print("TEST D·ª∞ ƒêO√ÅN TR√äN C√ÇU M·ªöI")
print("="*80)

# C√°c c√¢u test
test_sentences = [
    "The cat is sleeping on the couch",
    "I love programming in Python",
    "She quickly ran to the store",
    "The beautiful flowers are blooming in the garden"
]

for sent in test_sentences:
    print(f"\nüìù C√¢u: {sent}")
    predictions = predict_sentence(sent, model, word_to_ix, tag_to_ix, device)
    print(f"üè∑Ô∏è  D·ª± ƒëo√°n:")
    for word, tag in predictions:
        print(f"  {word:15s} ‚Üí {tag}")

print("\n" + "="*80)
