# Lab 5 : More on Recurrent neural networks (LSTM)
```
- [S25] Advanced Machine Learning, Innopolis University
- Teaching Assistant: Gcinizwe Dlamini
```
<hr>


```
Lab Plan
1. LSTM basics
2. Application of LSTM
3. Self practice tasks
```

<hr>


## 0. Recap

![](http://karpathy.github.io/assets/rnn/diags.jpeg)

## Sample Data

In [2]:
import torch
from torch import nn
import torch.nn.functional as F

simple_sequence = torch.Tensor([[0.3,1.9,4.5],[0.4,0.1,0.23],[0.7,0.91,0.43], [0.34,0.01,0.002]])
simple_sequence = simple_sequence.unsqueeze(0)
simple_sequence.shape

torch.Size([1, 4, 3])

## 1. LSTM basics

The `simple_sequence` variable represents a sequence of length 4, where each element (time-stamp) is represented by a feature vector of length 3. LSTM calculations are defined as:

![](https://media.licdn.com/dms/image/v2/C5612AQH5Im8XrvLmYQ/article-cover_image-shrink_600_2000/article-cover_image-shrink_600_2000/0/1564974698831?e=2147483647&v=beta&t=4sP9wrqZVaKsUt8NLXwuN4hfYc0m8RKI3a5g_jUW2xc)


$$i_{t} = \sigma\left(W_{ii}x_t + b_{ii} + W_{hi}h_{t-1} + b_{hi} \right)$$
$$f_t = \sigma \left( W_{if}x_t + b_{if} + W_{hf}h_{t-1} + b_{hf} \right)$$
$$g_t = tanh(W_{ig}x_t + b_{ig} + W_{hg}h_{t-1} + b_{hg})$$
$$o_t = \sigma \left( W_{io}x_t + b_{io} + W_{ho}h_{t-1} + h_{ho}\right)$$
$$c_t = f_t \odot c_{t-1} + i_t \odot g_t$$
$$h_t = o_t \odot tanh(c_t)$$

where $h_t$ represents the hidden state at time $t$; $c_t$ cell cell state at time $t$, $x_t$ is the input at time $t$, $h_{t-1}$ is the hidden state of the layer at time $t-1$ or the initial hidden state at time 0, and $i_t$, $f_t$, $g_t$, $o_t$ are the input, forget, cell, and output gates, respectively.

 <br>
Lets see whats inside Pytorch and compare with our theory

**Note:** For simplicity, the bias is set to zeros and weights set to ones

In [3]:
torch.manual_seed(20)
hidden_size = 1
simple_lstm_layer = torch.nn.LSTM(input_size=3, hidden_size=hidden_size, bidirectional=False, num_layers=1, batch_first=True)


share_weight = torch.randn(simple_lstm_layer.weight_ih_l0.shape, dtype = torch.float)
simple_lstm_layer.weight_ih_l0 = torch.nn.Parameter(share_weight)

# bias set to zeros
simple_lstm_layer.bias_ih_l0 = torch.nn.Parameter(torch.zeros(simple_lstm_layer.bias_ih_l0.shape))
simple_lstm_layer.bias_hh_l0 = torch.nn.Parameter(torch.zeros(simple_lstm_layer.bias_ih_l0.shape))

lstm_pytorch_output = simple_lstm_layer(simple_sequence[0][0].unsqueeze(dim=0).unsqueeze(dim=0))
simple_lstm_layer.state_dict()

OrderedDict([('weight_ih_l0',
              tensor([[-0.9475, -0.6130, -0.1291],
                      [-0.4107,  1.3931, -0.0984],
                      [ 1.6791, -0.9381, -0.4899],
                      [ 0.2811, -0.2813,  0.4779]])),
             ('weight_hh_l0',
              tensor([[ 0.8846],
                      [-0.4928],
                      [ 0.4776],
                      [ 0.0807]])),
             ('bias_ih_l0', tensor([0., 0., 0., 0.])),
             ('bias_hh_l0', tensor([0., 0., 0., 0.]))])

### Whole sequence output

In [4]:
output, (hidden, cell) = simple_lstm_layer(simple_sequence)
output, cell, hidden

(tensor([[[-0.0976],
          [ 0.0470],
          [ 0.0490],
          [ 0.1372]]], grad_fn=<TransposeBackward0>),
 tensor([[[0.2679]]], grad_fn=<StackBackward0>),
 tensor([[[0.1372]]], grad_fn=<StackBackward0>))

### 1.2 Extract / define the calculation variables (weights \& bias)

In [5]:
W_ii, W_if, W_ig, W_io = simple_lstm_layer.weight_ih_l0.split(hidden_size, dim=0)
b_ii, b_if, b_ig, b_io = simple_lstm_layer.bias_ih_l0.split(hidden_size, dim=0)

W_hi, W_hf, W_hg, W_ho = simple_lstm_layer.weight_hh_l0.split(hidden_size, dim=0)
b_hi, b_hf, b_hg, b_ho = simple_lstm_layer.bias_hh_l0.split(hidden_size, dim=0)

### 2.2 Calculations

$i_{t} = \sigma\left(W_{ii}x_t + b_{ii} + W_{hi}h_{t-1} + b_{hi} \right)$ <br>
$f_t = \sigma \left( W_{if}x_t + b_{if} + W_{hf}h_{t-1} + b_{hf} \right)$ <br>
$g_t = tanh(W_{ig}x_t + b_{ig} + W_{hg}h_{t-1} + b_{hg})$ <br>
$o_t = \sigma \left( W_{io}x_t + b_{io} + W_{ho}h_{t-1} + h_{ho}\right)$ <br>
$c_t = f_t \odot c_{t-1} + i_t \odot g_t$ <br>
$h_t = o_t \odot tanh(c_t)$ <br>

In [6]:
input_x = simple_sequence[0][0].unsqueeze(0)
prev_h = torch.zeros((1, hidden_size))
prev_c = torch.zeros((1, hidden_size))

i_t = torch.sigmoid(F.linear(input_x, W_ii, b_ii) +  F.linear(W_hi, prev_h, b_hi))
f_t = torch.sigmoid(F.linear(input_x, W_if, b_if) +  F.linear(prev_h, W_hf, b_hf))
g_t = torch.tanh(F.linear(input_x, W_ig, b_ii) +  F.linear(prev_h, W_hg, b_hg))
o_t = torch.sigmoid(F.linear(input_x, W_io, b_ii) +  F.linear(prev_h, W_ho, b_ho))
c_t = f_t * prev_c + i_t * g_t
h_t = o_t * torch.tanh(c_t)

In [7]:
i_t

tensor([[0.1161]], grad_fn=<SigmoidBackward0>)

In [8]:
o_t, c_t, h_t

(tensor([[0.8456]], grad_fn=<SigmoidBackward0>),
 tensor([[-0.1159]], grad_fn=<AddBackward0>),
 tensor([[-0.0976]], grad_fn=<MulBackward0>))

### 2.3 Comapre manual calculations with Pytorch implementation

In [9]:
output.squeeze(0)[0], h_t

(tensor([-0.0976], grad_fn=<SelectBackward0>),
 tensor([[-0.0976]], grad_fn=<MulBackward0>))

**Task:** Calculate the outputs for the rest of the full sentence -> `simple_sequence` manually and compare with PyTorch output

In [10]:
simple_sequence.squeeze(0).squeeze(0)[1]

tensor([0.4000, 0.1000, 0.2300])

In [11]:
prev_h = torch.zeros((1, hidden_size))
prev_c = torch.zeros((1, hidden_size))

for i in range(simple_sequence.shape[1]):
  input_x = simple_sequence[0][i].unsqueeze(0)
  i_t = torch.sigmoid(F.linear(input_x, W_ii, b_ii) +  F.linear(W_hi, prev_h, b_hi))
  f_t = torch.sigmoid(F.linear(input_x, W_if, b_if) +  F.linear(prev_h, W_hf, b_hf))
  g_t = torch.tanh(F.linear(input_x, W_ig, b_ii) +  F.linear(prev_h, W_hg, b_hg))
  o_t = torch.sigmoid(F.linear(input_x, W_io, b_ii) +  F.linear(prev_h, W_ho, b_ho))
  c_t = f_t * prev_c + i_t * g_t
  h_t = o_t * torch.tanh(c_t)
  prev_c = c_t
  prev_h = h_t
  print(h_t)


tensor([[-0.0976]], grad_fn=<MulBackward0>)
tensor([[0.0470]], grad_fn=<MulBackward0>)
tensor([[0.0490]], grad_fn=<MulBackward0>)
tensor([[0.1372]], grad_fn=<MulBackward0>)


In [12]:
output

tensor([[[-0.0976],
         [ 0.0470],
         [ 0.0490],
         [ 0.1372]]], grad_fn=<TransposeBackward0>)

## 2. Application of LSTM (Sentiment Analysis)

### 2.1 Dataset Description

[IMDb dataset](http://ai.stanford.edu/~amaas/data/sentiment/) having 50K movie reviews for natural language processing or Text analytics. This is a dataset for binary sentiment classification containing substantially more data than previous benchmark datasets.

In [48]:
import collections

import datasets
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import tqdm

### 2.2 Get Dataset and preprocess

In [49]:
train_data, test_data = datasets.load_dataset("imdb", split=["train", "test"])

### 2.3 Tokenize Dataset

In [50]:
max_text_length = 128


### 2.4 Create Vocabulary

In [51]:
import re
from collections import Counter


def tokenize(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text) 
    return text.split()  


def build_vocab(dataset, min_freq=5):
    counter = Counter()
    for example in dataset:
        tokens = tokenize(example['text'])
        counter.update(tokens)

    # special_tokens = ["<unk>", "<pad>"]
    vocab = {'<unk>': 0, '<pad>': 1} 
    idx = 2
    for word, count in counter.items():
        if count >= min_freq:
            vocab[word] = idx
            idx += 1

    return vocab


vocab = build_vocab(train_data, min_freq = 5)

### 2.5 Encode Data

In [52]:

def encode_text(text, vocab):
    tokens = tokenize(text)
    return [vocab.get(token, vocab['<unk>']) for token in tokens]

def encode_data(dataset, vocab):
    encoded_data = []
   
    for example in dataset:
        input_ids = encode_text(example['text'], vocab)
        label = example['label']
        encoded_data.append({'input_ids': input_ids, 'label': label})
    return encoded_data

train_encoded = encode_data(train_data, vocab)
test_encoded = encode_data(test_data, vocab)

### 2.6 Creating Dataloaders


In [53]:
from torch.utils.data import Dataset, DataLoader

class ImdbDataset(Dataset):
    def __init__(self, data, vocab, max_len=256):
        self.data = data
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        example = self.data[idx]
        input_ids = example['input_ids']
        label = example['label']

        # Padding or truncating to max_len
        input_ids = input_ids[:self.max_len]  # Truncate to max_len
        padding_len = self.max_len - len(input_ids)
        input_ids = input_ids + [self.vocab['<pad>']] * padding_len  # Pad with <pad> token

        return torch.tensor(input_ids), torch.tensor(label)

# Create PyTorch datasets for train and test
train_dataset = ImdbDataset(train_encoded, vocab, max_len = 128)
test_dataset = ImdbDataset(test_encoded, vocab, max_len = 128)

# Create dataloaders
BATCH_SIZE = 64
train_data_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

### 2.7 Define LSTM model

In [54]:
class SentimentLSTM(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, pad_index, n_layers=1, bidirectional=False):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_index)
    self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, batch_first=True)
    self.fc = nn.Linear(hidden_dim, output_dim)

  def forward(self, ids):
      embedded = self.embedding(ids)
      output, (hidden, cell) = self.lstm(embedded)
      prediction = self.fc(hidden[-1])
      return prediction

### 2.8 Model training parameters

In [55]:
vocab_size = len(vocab)
embedding_dim = 300
hidden_dim = 32
output_dim = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 5e-4
pad_index = 2

model = SentimentLSTM(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    output_dim=output_dim,
    pad_index=pad_index,
).to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

### 2.9 Model Evaluation

In [None]:
def get_accuracy(preds, labels):
    if torch.is_tensor(preds):
        preds = preds.cpu().numpy()
    if torch.is_tensor(labels):
        labels = labels.cpu().numpy()
    correct = (preds == labels).sum()
    return correct / len(labels)


def evaluate(dataloader, model, criterion, device):
    model.eval()
    total_loss = 0.0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for input_ids, label in dataloader:
            input_ids, label = input_ids.to(device), label.to(device)
            output = model(input_ids)
            loss = criterion(output, label)
            total_loss += loss.item()
            preds = torch.argmax(output, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(label.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = get_accuracy(np.array(all_preds), np.array(all_labels))
    print(f'Accuracy: {accuracy:.4f}')
    return avg_loss, accuracy

### 2.10 Model training Loop

**Task** : Add model evaluation (use `test_data_loader`)

In [63]:
n_epochs = 10
for ep in range(n_epochs):
    model.train()
    epoch_loss = []
    epoch_acc = []
    
    for batch in tqdm.tqdm(train_data_loader, desc="training..."):
        optimizer.zero_grad()
        ids, label = batch[0], batch[1]
        ids, label = ids.to(device), label.to(device)
        
        prediction = model(ids)
        loss = criterion(prediction, label)
        
        preds = torch.argmax(prediction, dim=1)
        acc = get_accuracy(preds, label)
        
        loss.backward()
        optimizer.step()
        
        epoch_loss.append(loss.item())
        epoch_acc.append(acc)
    
    avg_train_loss = np.mean(epoch_loss)
    avg_train_acc = np.mean(epoch_acc)
    
    test_loss, test_acc = evaluate(test_data_loader, model, criterion=criterion, device=device)
    print(f'[Epoch {ep+1}] Train Loss: {avg_train_loss:.3f}, Train Acc: {avg_train_acc:.3f}, Test Loss: {test_loss:.3f}, Test Acc: {test_acc:.3f}')

training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 72.50it/s]


Accuracy: 0.5356
[Epoch 1] Train Loss: 0.694, Train Acc: 0.515, Test Loss: 0.690, Test Acc: 0.536


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 74.29it/s]


Accuracy: 0.5692
[Epoch 2] Train Loss: 0.671, Train Acc: 0.596, Test Loss: 0.679, Test Acc: 0.569


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 74.96it/s]


Accuracy: 0.7010
[Epoch 3] Train Loss: 0.587, Train Acc: 0.705, Test Loss: 0.590, Test Acc: 0.701


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 74.98it/s]


Accuracy: 0.7479
[Epoch 4] Train Loss: 0.452, Train Acc: 0.798, Test Loss: 0.548, Test Acc: 0.748


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 73.70it/s]


Accuracy: 0.7596
[Epoch 5] Train Loss: 0.348, Train Acc: 0.858, Test Loss: 0.543, Test Acc: 0.760


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 75.90it/s]


Accuracy: 0.7578
[Epoch 6] Train Loss: 0.271, Train Acc: 0.897, Test Loss: 0.576, Test Acc: 0.758


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 74.67it/s]


Accuracy: 0.7710
[Epoch 7] Train Loss: 0.214, Train Acc: 0.923, Test Loss: 0.617, Test Acc: 0.771


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:05<00:00, 67.26it/s]


Accuracy: 0.7684
[Epoch 8] Train Loss: 0.156, Train Acc: 0.947, Test Loss: 0.642, Test Acc: 0.768


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:06<00:00, 60.09it/s]


Accuracy: 0.7572
[Epoch 9] Train Loss: 0.143, Train Acc: 0.953, Test Loss: 0.673, Test Acc: 0.757


training...: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 391/391 [00:06<00:00, 60.95it/s]


Accuracy: 0.7612
[Epoch 10] Train Loss: 0.117, Train Acc: 0.962, Test Loss: 0.694, Test Acc: 0.761


## 3. Tasks

```
Task 1
Implement and train a LSTM neural network for sentiment analysis using IMDb dataset and the following architecture:
- LSTM should be bidirectional
- LSTM should be Multi-layered
- LSTM should be use Regularization (i.e Dropout)
```

<hr>

```
Task 2
Implement, train and test a LSTM model for Part-of-speech tagging task.
```

**Task 2 Datasets**: [Train](https://www.dropbox.com/s/x9n6f9o9jl7pno8/train_pos.txt?dl=1), [Test](https://www.dropbox.com/s/v8nccvq7jewcl8s/test_pos.txt?dl=1)


In [64]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import datasets
from collections import Counter
from sklearn.metrics import accuracy_score

In [65]:
class IMDBDataset(Dataset):
    def __init__(self, split='train', min_freq=2, max_len=256):
        self.max_len = max_len
        self.min_freq = min_freq

        data = list(datasets.load_dataset("imdb", split=split))
        self.texts = []
        self.labels = []  # 1 for positive, 0 for negative
        for label, text in data:
            self.texts.append(text)
            self.labels.append(int(label == 'pos'))
        
        self.vocab = self.build_vocab(self.texts, min_freq)
        self.encoded_texts = [self.encode_text(text) for text in self.texts]

    def tokenize(self, text):
        text = text.lower()
        text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
        return text.split()  # Tokenize by whitespace
    
    def build_vocab(self, texts, min_freq):
        counter = Counter()
        for text in texts:
            tokens = self.tokenize(text)
            counter.update(tokens)
        vocab = {'<pad>': 0, '<unk>': 1}
        idx = 2
        for token, count in counter.items():
            if count >= min_freq:
                vocab[token] = idx
                idx += 1
        return vocab
    
    def encode_text(self, text):
        tokens = self.tokenize(text)
        return [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]
    
    def __len__(self):
        return len(self.encoded_texts)
    
    def __getitem__(self, idx):
        encoded = self.encoded_texts[idx]
        if len(encoded) < self.max_len:
            pad_len = self.max_len - len(encoded)
            encoded = encoded + [self.vocab['<pad>']] * pad_len
        else:
            encoded = encoded[:self.max_len]
        label = self.labels[idx]
        return torch.tensor(encoded, dtype=torch.long), torch.tensor(label, dtype=torch.long)

In [66]:
class SentimentLSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, output_dim, dropout=0.5):
        super(SentimentLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            bidirectional=True,
                            dropout=dropout,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text: [batch_size, seq_len]
        embedded = self.embedding(text)           # [batch_size, seq_len, embedding_dim]
        embedded = self.dropout(embedded)
        output, (hidden, cell) = self.lstm(embedded)
        forward_hidden = hidden[-2, :, :]          # [batch_size, hidden_dim]
        backward_hidden = hidden[-1, :, :]         # [batch_size, hidden_dim]
        final_hidden = torch.cat((forward_hidden, backward_hidden), dim=1)  # [batch_size, hidden_dim*2]
        final_hidden = self.dropout(final_hidden)
        logits = self.fc(final_hidden)            # [batch_size, output_dim]
        return logits

In [67]:
class SentimentTrainer:
    def __init__(self, model, learning_rate):
        self.model = model
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        print(f"Using device: {self.device}")

    def train(self, dataset, num_epochs=5, batch_size=64):
        self.model.to(self.device)
        self.model.train()
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        for epoch in range(num_epochs):
            total_loss = 0.0
            for text, label in dataloader:
                text, label = text.to(self.device), label.to(self.device)
                self.optimizer.zero_grad()
                logits = self.model(text)  # [batch_size, output_dim]
                loss = self.criterion(logits, label)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()
            print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

    def evaluate(self, dataset, batch_size=64):
        self.model.eval()
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
        total_correct = 0
        total = 0
        with torch.no_grad():
            for text, label in dataloader:
                text, label = text.to(self.device), label.to(self.device)
                logits = self.model(text)
                predictions = torch.argmax(logits, dim=1)
                total_correct += (predictions == label).sum().item()
                total += label.size(0)
        accuracy = total_correct / total
        print(f'IMDb Sentiment Accuracy: {accuracy:.4f}')

In [68]:
imdb_train = IMDBDataset(split='train', min_freq=2, max_len=256)
imdb_test = IMDBDataset(split='test', min_freq=2, max_len=256)

vocab_size_imdb = len(imdb_train.vocab)
output_dim = 2  # Binary classification (neg, pos)
embedding_dim = 150
hidden_dim = 128
num_layers = 2
dropout = 0.5
learning_rate = 0.0001

sentiment_model = SentimentLSTMModel(vocab_size_imdb, embedding_dim, hidden_dim, num_layers, output_dim, dropout)
print(sentiment_model)

sentiment_trainer = SentimentTrainer(sentiment_model, learning_rate)
sentiment_trainer.train(imdb_train, num_epochs=5, batch_size=128)
sentiment_trainer.evaluate(imdb_test, batch_size=128)

SentimentLSTMModel(
  (embedding): Embedding(3, 150)
  (lstm): LSTM(150, 128, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=256, out_features=2, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)
Using device: cuda
Epoch 1/5, Loss: 0.0751
Epoch 2/5, Loss: 0.0004
Epoch 3/5, Loss: 0.0002
Epoch 4/5, Loss: 0.0001
Epoch 5/5, Loss: 0.0001
IMDb Sentiment Accuracy: 1.0000


## Task 2

In [None]:
class POSTagDataset(Dataset):
    def __init__(self, filepath, min_freq=1, max_len=50, vocab=None, tag_vocab=None):
        self.min_freq = min_freq
        self.max_len = max_len
        self.sentences, self.tags = self.read_file(filepath)
        if vocab is None:
            self.vocab = self.build_vocab(self.sentences, min_freq)
        else:
            self.vocab = vocab
        if tag_vocab is None:
            self.tag_vocab = self.build_tag_vocab(self.tags)
        else:
            self.tag_vocab = tag_vocab
        self.encoded_data = self.encode_data()

    def read_file(self, filepath):
        sentences = []
        tags = []
        current_sentence = []
        current_tags = []
        with open(filepath, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line == "":
                    if current_sentence:
                        sentences.append(current_sentence)
                        tags.append(current_tags)
                        current_sentence = []
                        current_tags = []
                else:
                    parts = line.split()
                    if len(parts) == 2:
                        word, tag = parts
                        current_sentence.append(word)
                        current_tags.append(tag)
        if current_sentence:
            sentences.append(current_sentence)
            tags.append(current_tags)
        return sentences, tags

    def build_vocab(self, sentences, min_freq):
        counter = Counter()
        for sent in sentences:
            counter.update(sent)
        vocab = {'<pad>': 0, '<unk>': 1}
        idx = 2
        for word, count in counter.items():
            if count >= min_freq:
                vocab[word] = idx
                idx += 1
        return vocab

    def build_tag_vocab(self, tag_lists):
        counter = Counter()
        for tag_list in tag_lists:
            counter.update(tag_list)
        tag_vocab = {'<pad>': 0}
        idx = 1
        for tag in sorted(counter.keys()):
            tag_vocab[tag] = idx
            idx += 1
        return tag_vocab

    def encode_sentence(self, sentence, vocab):
        return [vocab.get(word, vocab['<unk>']) for word in sentence]

    def encode_tags(self, tag_list, tag_vocab):
        return [tag_vocab[tag] for tag in tag_list]

    def encode_data(self):
        encoded_data = []
        for sent, tag_list in zip(self.sentences, self.tags):
            encoded_sent = self.encode_sentence(sent, self.vocab)
            encoded_tag = self.encode_tags(tag_list, self.tag_vocab)
            if len(encoded_sent) < self.max_len:
                pad_len = self.max_len - len(encoded_sent)
                encoded_sent = encoded_sent + [self.vocab['<pad>']] * pad_len
                encoded_tag = encoded_tag + [self.tag_vocab['<pad>']] * pad_len
            else:
                encoded_sent = encoded_sent[:self.max_len]
                encoded_tag = encoded_tag[:self.max_len]
            encoded_data.append({'input_ids': encoded_sent, 'tags': encoded_tag})
        return encoded_data

    def __len__(self):
        return len(self.encoded_data)

    def __getitem__(self, idx):
        example = self.encoded_data[idx]
        input_ids = torch.tensor(example['input_ids'], dtype=torch.long)
        tags = torch.tensor(example['tags'], dtype=torch.long)
        return input_ids, tags

In [40]:
class POSTagLSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, num_tags, dropout=0.5):
        super(POSTagLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            bidirectional=True,
                            dropout=dropout,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, num_tags)

    def forward(self, input_ids):
        # input_ids: [batch_size, seq_len]
        embedded = self.embedding(input_ids)   # [batch_size, seq_len, embedding_dim]
        outputs, _ = self.lstm(embedded)          # [batch_size, seq_len, hidden_dim*2]
        logits = self.fc(outputs)                # [batch_size, seq_len, num_tags]
        return logits

In [41]:
class POSTrainer:
    def __init__(self, model, learning_rate):
        self.model = model
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        print(f"Using device: {self.device}")

    def train(self, dataset, num_epochs=5, batch_size=32):
        self.model.to(self.device)
        self.model.train()
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        for epoch in range(num_epochs):
            total_loss = 0.0
            for input_ids, tags in dataloader:
                input_ids, tags = input_ids.to(self.device), tags.to(self.device)
                self.optimizer.zero_grad()
                logits = self.model(input_ids)  # [batch_size, seq_len, num_tags]
                loss = self.criterion(logits.view(-1, logits.shape[-1]), tags.view(-1))
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()
            print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

    def evaluate(self, dataset, batch_size=32):
        self.model.eval()
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
        all_preds = []
        all_tags = []
        with torch.no_grad():
            for input_ids, tags in dataloader:
                input_ids, tags = input_ids.to(self.device), tags.to(self.device)
                logits = self.model(input_ids)  # [batch_size, seq_len, num_tags]
                preds = logits.argmax(dim=-1)
                mask = tags != 0
                all_preds.extend(preds[mask].cpu().numpy())
                all_tags.extend(tags[mask].cpu().numpy())
        accuracy = accuracy_score(all_tags, all_preds)
        print(f'POS Tagging Accuracy: {accuracy:.4f}')

In [73]:
train_pos_filepath = "Labs/lab5/train_pos.txt"
test_pos_filepath = "Labs/lab5/test_pos.txt"


pos_train_dataset = POSTagDataset(train_pos_filepath, min_freq=1, max_len=50)
pos_test_dataset = POSTagDataset(test_pos_filepath, min_freq=1, max_len=50,
                                    vocab=pos_train_dataset.vocab,
                                    tag_vocab=pos_train_dataset.tag_vocab)

vocab_size_pos = len(pos_train_dataset.vocab)
num_tags = len(pos_train_dataset.tag_vocab)
pos_embedding_dim = 120
pos_hidden_dim = 128
pos_num_layers = 2
pos_dropout = 0.5
pos_learning_rate = 0.001

pos_model = POSTagLSTMModel(vocab_size_pos, pos_embedding_dim, pos_hidden_dim, pos_num_layers, num_tags, pos_dropout)
pos_model

POSTagLSTMModel(
  (embedding): Embedding(19124, 120)
  (lstm): LSTM(120, 128, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=256, out_features=45, bias=True)
)

In [74]:
pos_trainer = POSTrainer(pos_model, pos_learning_rate)
pos_trainer.train(pos_train_dataset, num_epochs=5, batch_size=32)
pos_trainer.evaluate(pos_test_dataset, batch_size=32)

Using device: cuda
Epoch 1/5, Loss: 1.3322
Epoch 2/5, Loss: 0.4983
Epoch 3/5, Loss: 0.3163
Epoch 4/5, Loss: 0.2223
Epoch 5/5, Loss: 0.1635
POS Tagging Accuracy: 0.9261
