In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

import spacy
import numpy as np
from collections import Counter
from tqdm import tqdm
import time
import random
import os
import requests
import zipfile
from typing import List, Dict, Tuple, Optional

SEED = 1234
BATCH_SIZE = 128
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
N_LAYERS = 2
DROPOUT = 0.25
N_EPOCHS = 15
MIN_FREQ = 2
MAX_SENT_LEN = 100

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

def download_udpos_data():
    """Download UDPOS dataset from GitHub"""
    url = "https://raw.githubusercontent.com/UniversalDependencies/UD_English-EWT/master/"
    files = {
        'train': 'en_ewt-ud-train.conllu',
        'valid': 'en_ewt-ud-dev.conllu',
        'test': 'en_ewt-ud-test.conllu'
    }

    data = {}
    for split, filename in files.items():
        response = requests.get(url + filename)
        data[split] = response.text
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(response.text)

    return data

def parse_conllu(content: str) -> List[Dict]:
    """Parse CONLL-U format data"""
    sentences = []
    current_sent = {'tokens': [], 'upos': [], 'xpos': []}

    for line in content.split('\n'):
        line = line.strip()
        if not line:
            if current_sent['tokens']:
                sentences.append(current_sent)
                current_sent = {'tokens': [], 'upos': [], 'xpos': []}
        elif not line.startswith('#'):
            parts = line.split('\t')
            if len(parts) >= 4:
                current_sent['tokens'].append(parts[1])  # word form
                current_sent['upos'].append(parts[3])    # universal POS
                if len(parts) > 4:
                    current_sent['xpos'].append(parts[4])  # language-specific POS

    return sentences

def load_udpos_data() -> Tuple[List, List, List]:
    print("Loading UDPOS dataset...")

    # Try to download if not exists
    if not all(os.path.exists(f) for f in ['en_ewt-ud-train.conllu',
                                            'en_ewt-ud-dev.conllu',
                                            'en_ewt-ud-test.conllu']):
        download_udpos_data()

    # Parse data
    with open('en_ewt-ud-train.conllu', 'r', encoding='utf-8') as f:
        train_data = parse_conllu(f.read())
    with open('en_ewt-ud-dev.conllu', 'r', encoding='utf-8') as f:
        valid_data = parse_conllu(f.read())
    with open('en_ewt-ud-test.conllu', 'r', encoding='utf-8') as f:
        test_data = parse_conllu(f.read())

    print(f"Training examples: {len(train_data)}")
    print(f"Validation examples: {len(valid_data)}")
    print(f"Test examples: {len(test_data)}")

    return train_data, valid_data, test_data

class Vocabulary:
    def __init__(self, min_freq: int = 1, specials: List[str] = ['<pad>', '<unk>']):
        self.min_freq = min_freq
        self.specials = specials
        self.itos: List[str] = []
        self.stoi: Dict[str, int] = {}
        self.freqs: Counter = Counter()

    def build_vocab(self, sentences: List[List[str]]):
        for sent in sentences:
            self.freqs.update(sent)

        for special in self.specials:
            self.itos.append(special)
            self.stoi[special] = len(self.itos) - 1

        for word, freq in self.freqs.most_common():
            if freq >= self.min_freq and word not in self.stoi:
                self.itos.append(word)
                self.stoi[word] = len(self.itos) - 1

    def numericalize(self, tokens: List[str], max_len: Optional[int] = None) -> List[int]:
        if max_len:
            tokens = tokens[:max_len]
        unk_idx = self.stoi['<unk>']
        return [self.stoi.get(token, unk_idx) for token in tokens]

    def __len__(self) -> int:
        return len(self.itos)

# ==================== DATASET ====================
class PoSDataset(Dataset):
    def __init__(self, data: List[Dict], word_vocab: Vocabulary, tag_vocab: Vocabulary, max_len: int = MAX_SENT_LEN):
        self.data = data
        self.word_vocab = word_vocab
        self.tag_vocab = tag_vocab
        self.max_len = max_len

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> Dict:
        item = self.data[idx]

        tokens = item['tokens'][:self.max_len]
        tags = item['upos'][:self.max_len]

        token_ids = self.word_vocab.numericalize(tokens)
        tag_ids = self.tag_vocab.numericalize(tags)

        return {
            'tokens': tokens,
            'token_ids': torch.tensor(token_ids),
            'tag_ids': torch.tensor(tag_ids),
            'length': len(token_ids)
        }

def collate_fn(batch: List[Dict]) -> Dict:
    max_len = max([item['length'] for item in batch])
    pad_idx = word_vocab.stoi['<pad>']
    tag_pad_idx = tag_vocab.stoi['<pad>']

    token_tensor = torch.full((max_len, len(batch)), pad_idx, dtype=torch.long)
    tag_tensor = torch.full((max_len, len(batch)), tag_pad_idx, dtype=torch.long)

    for i, item in enumerate(batch):
        length = item['length']
        token_tensor[:length, i] = item['token_ids']
        tag_tensor[:length, i] = item['tag_ids']

    return {
        'text': token_tensor.to(device),
        'tags': tag_tensor.to(device),
        'lengths': torch.tensor([item['length'] for item in batch])
    }

class BiLSTMPOSTagger(nn.Module):
    def __init__(self,
                 input_dim: int,
                 embedding_dim: int,
                 hidden_dim: int,
                 output_dim: int,
                 n_layers: int,
                 bidirectional: bool,
                 dropout: float,
                 pad_idx: int):

        super().__init__()

        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=pad_idx)

        self.lstm = nn.LSTM(embedding_dim,
                            hidden_dim,
                            num_layers=n_layers,
                            bidirectional=bidirectional,
                            dropout=dropout if n_layers > 1 else 0,
                            batch_first=False)

        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text: torch.Tensor) -> torch.Tensor:
        # text = [seq_len, batch_size]

        embedded = self.dropout(self.embedding(text))
        # embedded = [seq_len, batch_size, emb_dim]

        outputs, (hidden, cell) = self.lstm(embedded)
        # outputs = [seq_len, batch_size, hid_dim * n_directions]

        predictions = self.fc(self.dropout(outputs))
        # predictions = [seq_len, batch_size, output_dim]

        return predictions

def init_weights(m: nn.Module):
    for name, param in m.named_parameters():
        if 'weight' in name:
            nn.init.normal_(param.data, mean=0, std=0.1)
        else:
            nn.init.constant_(param.data, 0)

def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def categorical_accuracy(preds: torch.Tensor, y: torch.Tensor, tag_pad_idx: int) -> torch.Tensor:
    max_preds = preds.argmax(dim=1, keepdim=True)
    non_pad_elements = (y != tag_pad_idx).nonzero(as_tuple=True)[0]
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / torch.FloatTensor([y[non_pad_elements].shape[0]]).to(device)

def epoch_time(start_time: float, end_time: float) -> Tuple[int, int]:
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def train(model: nn.Module,
          dataloader: DataLoader,
          optimizer: optim.Optimizer,
          criterion: nn.Module,
          tag_pad_idx: int) -> Tuple[float, float]:
    epoch_loss = 0
    epoch_acc = 0

    model.train()

    for batch in tqdm(dataloader, desc='Training'):
        text = batch['text']
        tags = batch['tags']

        optimizer.zero_grad()

        predictions = model(text)

        predictions = predictions.view(-1, predictions.shape[-1])
        tags = tags.view(-1)

        loss = criterion(predictions, tags)
        acc = categorical_accuracy(predictions, tags, tag_pad_idx)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

def evaluate(model: nn.Module,
             dataloader: DataLoader,
             criterion: nn.Module,
             tag_pad_idx: int) -> Tuple[float, float]:
    epoch_loss = 0
    epoch_acc = 0

    model.eval()

    with torch.no_grad():
        for batch in tqdm(dataloader, desc='Evaluating'):
            text = batch['text']
            tags = batch['tags']

            predictions = model(text)

            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)

            loss = criterion(predictions, tags)
            acc = categorical_accuracy(predictions, tags, tag_pad_idx)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

def tag_sentence(model: nn.Module,
                 sentence: str,
                 word_vocab: Vocabulary,
                 tag_vocab: Vocabulary,
                 device: torch.device) -> Tuple[List[str], List[str], List[str]]:
    model.eval()

    try:
        nlp = spacy.load('en_core_web_sm')
    except:
        os.system('python -m spacy download en_core_web_sm')
        nlp = spacy.load('en_core_web_sm')

    tokens = [token.text.lower() for token in nlp(sentence)]

    unk_idx = word_vocab.stoi['<unk>']
    token_ids = word_vocab.numericalize(tokens)
    unks = [t for t, idx in zip(tokens, token_ids) if idx == unk_idx]

    token_tensor = torch.tensor(token_ids).unsqueeze(-1).to(device)

    with torch.no_grad():
        predictions = model(token_tensor)
        predicted_indices = predictions.argmax(-1).squeeze().cpu().numpy()

    predicted_tags = [tag_vocab.itos[idx] for idx in predicted_indices]

    return tokens, predicted_tags, unks

if __name__ == "__main__":
    print("=" * 50)
    print("BiLSTM for Part-of-Speech Tagging")
    print("=" * 50)

    # 1. Load data
    train_data_raw, valid_data_raw, test_data_raw = load_udpos_data()

    # 2. Build vocabularies
    print("\nBuilding vocabularies...")

    # Word vocabulary
    word_vocab = Vocabulary(min_freq=MIN_FREQ)
    word_vocab.build_vocab([sent['tokens'] for sent in train_data_raw])

    # Tag vocabulary (UD tags)
    tag_vocab = Vocabulary(min_freq=1)  # Include all tags
    tag_vocab.build_vocab([[tag] for sent in train_data_raw for tag in sent['upos']])

    print(f"Word vocabulary size: {len(word_vocab)}")
    print(f"Tag vocabulary size: {len(tag_vocab)}")
    print(f"Top 10 most common words: {word_vocab.freqs.most_common(10)}")
    print(f"Tags: {tag_vocab.itos}")

    # 3. Create datasets
    train_dataset = PoSDataset(train_data_raw, word_vocab, tag_vocab)
    valid_dataset = PoSDataset(valid_data_raw, word_vocab, tag_vocab)
    test_dataset = PoSDataset(test_data_raw, word_vocab, tag_vocab)

    # 4. Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
                             shuffle=True, collate_fn=collate_fn)
    valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE,
                             shuffle=False, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                            shuffle=False, collate_fn=collate_fn)

    # 5. Initialize model
    print("\nInitializing model...")
    INPUT_DIM = len(word_vocab)
    OUTPUT_DIM = len(tag_vocab)
    PAD_IDX = word_vocab.stoi['<pad>']
    TAG_PAD_IDX = tag_vocab.stoi['<pad>']

    model = BiLSTMPOSTagger(INPUT_DIM,
                           EMBEDDING_DIM,
                           HIDDEN_DIM,
                           OUTPUT_DIM,
                           N_LAYERS,
                           True,  # bidirectional
                           DROPOUT,
                           PAD_IDX)

    model.apply(init_weights)
    model = model.to(device)

    print(f"Model has {count_parameters(model):,} trainable parameters")

    # 6. Setup training
    optimizer = optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss(ignore_index=TAG_PAD_IDX)

    # 7. Traininhttps://raw.githubusercontent.comg loop
    print("\nStarting training...")
    best_valid_loss = float('inf')

    for epoch in range(N_EPOCHS):
        start_time = time.time()

        train_loss, train_acc = train(model, train_loader, optimizer, criterion, TAG_PAD_IDX)
        valid_loss, valid_acc = evaluate(model, valid_loader, criterion, TAG_PAD_IDX)

        end_time = time.time()
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        # Save best model
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), 'best_pos_model.pt')
            print(f"  -> New best model saved!")

        print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
        print(f'  Train Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'  Val Loss: {valid_loss:.3f} | Val Acc: {valid_acc*100:.2f}%')

    # 8. Test evaluation
    print("\n" + "=" * 50)
    print("Testing best model...")
    model.load_state_dict(torch.load('best_pos_model.pt'))
    test_loss, test_acc = evaluate(model, test_loader, criterion, TAG_PAD_IDX)
    print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

    # 9. Inference example
    print("\n" + "=" * 50)
    print("Inference example:")
    test_sentence = "The Queen will deliver a speech about the conflict in North Korea at 1pm tomorrow."
    tokens, tags, unks = tag_sentence(model, test_sentence, word_vocab, tag_vocab, device)

    print(f"\nSentence: {test_sentence}")
    print(f"Unknown tokens: {unks}")
    print("\nPredictions:")
    for token, tag in zip(tokens, tags):
        print(f"  {tag:10} → {token}")

    print("\n" + "=" * 50)
    print("Training completed successfully!")

Using device: cpu
BiLSTM for Part-of-Speech Tagging
Loading UDPOS dataset...
Training examples: 12544
Validation examples: 2001
Test examples: 2077

Building vocabularies...
Word vocabulary size: 10099
Tag vocabulary size: 20
Top 10 most common words: [('.', 8640), ('the', 8151), (',', 7021), ('to', 5076), ('and', 4855), ('a', 3609), ('of', 3589), ('I', 3123), ('in', 2911), ('is', 2154)]
Tags: ['<pad>', '<unk>', 'NOUN', 'PUNCT', 'VERB', 'PRON', 'ADP', 'DET', 'ADJ', 'AUX', 'PROPN', 'ADV', 'CCONJ', 'PART', 'NUM', 'SCONJ', '_', 'SYM', 'INTJ', 'X']

Initializing model...
Model has 1,645,824 trainable parameters

Starting training...


Training: 100%|██████████| 98/98 [01:42<00:00,  1.05s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.53it/s]


  -> New best model saved!
Epoch: 01 | Time: 1m 46s
  Train Loss: 1.764 | Train Acc: 45.47%
  Val Loss: 0.765 | Val Acc: 76.41%


Training: 100%|██████████| 98/98 [01:42<00:00,  1.04s/it]
Evaluating: 100%|██████████| 16/16 [00:04<00:00,  3.73it/s]


  -> New best model saved!
Epoch: 02 | Time: 1m 46s
  Train Loss: 0.549 | Train Acc: 83.34%
  Val Loss: 0.382 | Val Acc: 88.27%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.59it/s]


  -> New best model saved!
Epoch: 03 | Time: 1m 43s
  Train Loss: 0.306 | Train Acc: 90.86%
  Val Loss: 0.318 | Val Acc: 90.11%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.62it/s]


  -> New best model saved!
Epoch: 04 | Time: 1m 44s
  Train Loss: 0.226 | Train Acc: 93.22%
  Val Loss: 0.291 | Val Acc: 91.06%


Training: 100%|██████████| 98/98 [01:43<00:00,  1.06s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.72it/s]


  -> New best model saved!
Epoch: 05 | Time: 1m 47s
  Train Loss: 0.186 | Train Acc: 94.32%
  Val Loss: 0.275 | Val Acc: 91.62%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.62it/s]


  -> New best model saved!
Epoch: 06 | Time: 1m 44s
  Train Loss: 0.164 | Train Acc: 94.95%
  Val Loss: 0.272 | Val Acc: 91.58%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.02s/it]
Evaluating: 100%|██████████| 16/16 [00:04<00:00,  3.82it/s]


  -> New best model saved!
Epoch: 07 | Time: 1m 44s
  Train Loss: 0.146 | Train Acc: 95.41%
  Val Loss: 0.267 | Val Acc: 91.91%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.02s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.62it/s]


Epoch: 08 | Time: 1m 43s
  Train Loss: 0.133 | Train Acc: 95.80%
  Val Loss: 0.271 | Val Acc: 91.82%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:04<00:00,  3.95it/s]


Epoch: 09 | Time: 1m 44s
  Train Loss: 0.121 | Train Acc: 96.16%
  Val Loss: 0.267 | Val Acc: 92.08%


Training: 100%|██████████| 98/98 [01:41<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.54it/s]


Epoch: 10 | Time: 1m 44s
  Train Loss: 0.109 | Train Acc: 96.54%
  Val Loss: 0.273 | Val Acc: 91.86%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.02s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.53it/s]


Epoch: 11 | Time: 1m 43s
  Train Loss: 0.100 | Train Acc: 96.84%
  Val Loss: 0.274 | Val Acc: 92.01%


Training: 100%|██████████| 98/98 [01:42<00:00,  1.05s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.64it/s]


Epoch: 12 | Time: 1m 45s
  Train Loss: 0.092 | Train Acc: 97.06%
  Val Loss: 0.278 | Val Acc: 92.27%


Training: 100%|██████████| 98/98 [01:42<00:00,  1.04s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.56it/s]


Epoch: 13 | Time: 1m 45s
  Train Loss: 0.085 | Train Acc: 97.31%
  Val Loss: 0.286 | Val Acc: 92.05%


Training: 100%|██████████| 98/98 [01:40<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.04it/s]


Epoch: 14 | Time: 1m 44s
  Train Loss: 0.078 | Train Acc: 97.53%
  Val Loss: 0.296 | Val Acc: 92.07%


Training: 100%|██████████| 98/98 [01:41<00:00,  1.03s/it]
Evaluating: 100%|██████████| 16/16 [00:03<00:00,  4.73it/s]


Epoch: 15 | Time: 1m 44s
  Train Loss: 0.071 | Train Acc: 97.72%
  Val Loss: 0.299 | Val Acc: 92.10%

Testing best model...


Evaluating: 100%|██████████| 17/17 [00:04<00:00,  4.21it/s]


Test Loss: 0.254 | Test Acc: 91.91%

Inference example:

Sentence: The Queen will deliver a speech about the conflict in North Korea at 1pm tomorrow.
Unknown tokens: ['korea']

Predictions:
  DET        → the
  NOUN       → queen
  AUX        → will
  VERB       → deliver
  DET        → a
  NOUN       → speech
  ADP        → about
  DET        → the
  NOUN       → conflict
  ADP        → in
  PROPN      → north
  PROPN      → korea
  ADP        → at
  NUM        → 1
  NOUN       → pm
  NOUN       → tomorrow
  PUNCT      → .

Training completed successfully!
