Evaluating and Extending an RNN based Part-of-Speech Tagging


For this assignment, you start with a working pipeline for PoS tagging using LSTMs and mini-batch training. This task is to evaluate the model on a number of sources in different languages, and extend it with new functionality. First, the neural model represents your tokens as non-pretrained embedding vectors. Then the sequences of embedding vectors are passed through an LSTM layer. Finally, the outputs from the recurrent layer are transformed to probabilities over PoS tags by passing them through a fully connected layer and a softmax. You will have to refactor (i.e. rearrange the code) and extend the model by adding some commonly used properties, e.g. bi-directionality (see the list of suggestions below).



### Extensions

1. There are other types of RNNs layers commonly used in NLP. Add the option to use a GRU layer instead of an LSTM layer, and include this in your performance comparison.
2. The given implementation only allows for dependencies from left to right. Add the option to use a bi-directional RNN layer.
3. Use pyTorch's `Dataset` and `DataLoader` classes for loading the data. [This tutorial](https://pytorch.org/tutorials/beginner/data_loading_tutorial.html) is a good starting point. Proper data loaders makes it easier to loop over data sets.
4. Try some type of data augmentation in your training data (e.g. masking random tokens). This should theoretically increase the generalizability of your model. How much augmentation is too much?
5. Implement some level of regularization in your model. This can be implemented in several ways, e.g. dropout or weight decay. Briefly argue for the choices you made and show how network preformance might change with the rate of regularization.
6. Add more sources. Either from different genres and/or adding more languages. A suggestion is to try how languages with very different levels of morphological richness (e.g. English vs Finnish) requires more or less training data, or how well a model trained on academic english does on news text. The total number of sources should be above 10.
7. Compare performace using UD's universal vs language specific tag sets.

*Note that fully evaluating all combinations of the extensions is not required. It is enough to do some structured testing of extensions and then, for example, go on to compare the tag sets on the best model configuration. This is an exercise in extending and evaluating a model, not in finding the patience to wait for your computer to finish grid searching over alternatives.*


In [None]:
# Our standard imports for maths and basic methodology
import numpy as np
from sklearn.model_selection import train_test_split

# For user feedback
from tqdm import tqdm
import matplotlib.pyplot as plt

# Imports for pytorch
import torch
import torch.nn as nn

Let's see if we have a GPU.

In [None]:
if torch.cuda.is_available():
  for i in range(torch.cuda.device_count()):
    print(torch.cuda.get_device_name(i))
else:
  print("No GPU available")

## Load tagging data

The following downloads the [Brown corpus](https://en.wikipedia.org/wiki/Brown_Corpus). This data is only here to demonstrate the network below.

In [None]:
import requests
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np


def parse_conllu_from_url(url):
    response = requests.get(url)
    response.raise_for_status()

    text = response.text
    lines = text.split('\n')

    words = []
    for line in lines:
        line = line.strip()
        if not line or line.startswith('#'):
            continue
        columns = line.split('\t')
        if len(columns) > 1:
            word = columns[1]
            words.append(word)
    return words


english_url = "https://raw.githubusercontent.com/UniversalDependencies/UD_English-EWT/master/en_ewt-ud-train.conllu"
spanish_url = "https://raw.githubusercontent.com/UniversalDependencies/UD_Spanish-GSD/master/es_gsd-ud-train.conllu"
chinese_url = "https://raw.githubusercontent.com/UniversalDependencies/UD_Chinese-GSDSimp/master/zh_gsdsimp-ud-train.conllu"
german_url = "https://raw.githubusercontent.com/UniversalDependencies/UD_German-GSD/master/de_gsd-ud-train.conllu"

english_words = parse_conllu_from_url(english_url)
spanish_words = parse_conllu_from_url(spanish_url)
chinese_words = parse_conllu_from_url(chinese_url)
german_words = parse_conllu_from_url(german_url)

print("English sample:", english_words[:20])
print("Spanish sample:", spanish_words[:20])
print("Chinese sample:", chinese_words[:20])
print("German sample:", german_words[:20])

In [None]:
import nltk
nltk.download('brown')
from nltk.corpus import brown
nltk.download('universal_tagset')

sentences = brown.tagged_sents(tagset='universal')
sentences = [sentence for sentence in sentences if len(sentence) > 2]

print("Loaded %i sentences" % len(sentences))
print(sentences[0])

Preprocessing for the brow corpus. This splits the data into our standard X and y format.

In [None]:
X = [[token for token, tag in sentence] for sentence in sentences]
y = [[tag for token, tag in sentence] for sentence in sentences]

assert len(X) == len(y)

print(X[0])
print(y[0])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=0)

assert len(X_train) == len(y_train)
assert len(X_test) == len(y_test)
assert len(X_train)+len(X_test) == len(X)

print("The training set includes %i sentences" % len(X_train))
print("The test set includes %i sentences" % len(X_test))

Most sentences are short, but some are very long.

In [None]:
l = np.asarray([len(x) for x in X], dtype=int)
plt.figure(figsize=(8, 4))
x = np.unique(l)
plt.bar(x, [np.sum(l==e) for e in x], width=1)
plt.xlabel("Sentence length")
plt.ylabel("# sentences")
plt.show()

## Data encoding and padding

In [None]:
tokens = {token for sentence in X_train for token in sentence}
idx2token = list(tokens)
idx2token.insert(0, '<UNK>')
idx2token.append('<PAD>')
token2idx = {token:idx for idx, token in enumerate(idx2token)}

tags = {tag for tags in y_train for tag in tags}
idx2tag = list(tags)
idx2tag.append('<PAD>')
tag2idx = {tag:idx for idx, tag in enumerate(idx2tag)}

print(idx2token[:15])
print(idx2tag)

In [None]:
def pad_and_encode(sentences, labels):
  assert len(sentences)==len(labels)
  assert np.all([len(sentence)==len(tags) for sentence, tags in zip(sentences, labels)])
  max_sentence_length = np.max([len(sentence) for sentence in sentences])
  padded_sentences = torch.zeros(len(sentences), max_sentence_length,
                                 dtype=torch.long)
  padded_sentences[:] = token2idx['<PAD>']
  padded_labels = torch.zeros(len(sentences), max_sentence_length,
                              dtype=torch.long)
  padded_labels[:] = tag2idx['<PAD>']
  for i, (sentence, tags) in enumerate(zip(sentences, labels)):
    for j, token in enumerate(sentence):
      if token in token2idx.keys():
        padded_sentences[i, j] = token2idx[token]
      else:
        padded_sentences[i, j] = token2idx['<UNK>']
    for j, tag in enumerate(tags):
      padded_labels[i, j] = tag2idx[tag]
  return padded_sentences, padded_labels

a, b = pad_and_encode(X_train[:5], y_train[:5])
print(a)
print(b)

In [None]:
def batch_iterator(sentences, labels, batch_size=64):
  assert len(sentences) == len(labels)
  for i in range(0, len(sentences), batch_size):
    X, y = pad_and_encode(sentences[i:min(i+batch_size, len(sentences))],
                          labels[i:min(i+batch_size, len(sentences))])
    if torch.cuda.is_available():
      yield (X.cuda(), y.cuda())
    else:
      yield (X, y)

next(batch_iterator(X_train, y_train, batch_size=5))

## Model

In [None]:
class LSTMTagger(nn.Module):
  def __init__(self, word_embedding_dim, lstm_hidden_dim, vocabulary_size, tagset_size):
    super(LSTMTagger, self).__init__()
    self.lstm_hidden_dim_ = lstm_hidden_dim
    self.vocabulary_size_ = vocabulary_size
    self.tagset_size_ = tagset_size

    self._word_embedding = nn.Embedding(num_embeddings=vocabulary_size,
                                         embedding_dim=word_embedding_dim,
                                         padding_idx=token2idx['<PAD>'])
    self._lstm = nn.LSTM(input_size=word_embedding_dim,
                         hidden_size=lstm_hidden_dim,
                         batch_first=True)
    self._fc = nn.Linear(lstm_hidden_dim, tagset_size)
    self._softmax = nn.LogSoftmax(dim=1)

    self.training_loss_ = list()
    self.training_accuracy_ = list()

    if torch.cuda.is_available():
      self.cuda()

  def forward(self, padded_sentences):
    """The forward pass through the network"""
    batch_size, max_sentence_length = padded_sentences.size()

    embedded_sentences = self._word_embedding(padded_sentences)

    sentence_lengths = (padded_sentences!=token2idx['<PAD>']).sum(dim=1)
    sentence_lengths = sentence_lengths.long().cpu()
    X = nn.utils.rnn.pack_padded_sequence(embedded_sentences, sentence_lengths,
                                          batch_first=True, enforce_sorted=False)
    lstm_out, _ = self._lstm(X)
    X, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)

    X = X.contiguous().view(-1, X.shape[2])
    tag_space = self._fc(X)
    tag_scores = self._softmax(tag_space)
    return tag_scores.view(batch_size, max_sentence_length, self.tagset_size_)


model = LSTMTagger(word_embedding_dim=32,
                   lstm_hidden_dim=64,
                   vocabulary_size=len(token2idx),
                   tagset_size=len(tag2idx)-1)
print(model)

## Network training

In [None]:
loss_function = nn.NLLLoss(ignore_index=tag2idx['<PAD>'])

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

batch_size = 256
for epoch in range(5):
  with tqdm(batch_iterator(X_train, y_train, batch_size=batch_size),
            total=len(X_train)//batch_size+1, unit="batch", desc="Epoch %i" % epoch) as batches:
    for inputs, targets in batches:
      model.zero_grad()
      scores = model(inputs)
      loss = loss_function(scores.view(-1, model.tagset_size_),
                           targets.view(-1))
      loss.backward()
      optimizer.step()
      predictions = scores.argmax(dim=2, keepdim=True).squeeze()
      mask = targets!=tag2idx['<PAD>']
      correct = (predictions[mask] == targets[mask]).sum().item()
      accuracy = correct / mask.sum().item()*100
      model.training_accuracy_.append(accuracy)
      model.training_loss_.append(loss.item())
      batches.set_postfix(loss=loss.item(), accuracy=accuracy)

We can plot the stored loss over epochs.

In [None]:
fig = plt.figure(figsize=(6, 4))
ax = plt.subplot()
ax.set_title("Plot of the (hopefully) decreasing loss over epochs")
ax.plot(model.training_loss_, 'b-')
ax.set_ylabel("Training Loss", color='b')
ax.set_xlabel("Epoch")
# ax.set_yscale('log')
ax.tick_params(axis='y', labelcolor='b')
ax = ax.twinx()
ax.plot(model.training_accuracy_, 'r-')
ax.set_ylabel("Accuracy [%]", color='r')
ax.tick_params(axis='y', labelcolor='r')
a = list(ax.axis())
a[2] = 0
a[3] = 100
ax.axis(a)
t = np.arange(0, len(model.training_accuracy_), len(X_train)//batch_size+1)
ax.set_xticks(ticks=t)
ax.set_xticklabels(labels=np.arange(len(t)))
fig.tight_layout()
plt.show()

## Test data accuracy

This shows that the model sort of works. A per sentence accuracy would be better while being able to also analyse the predictions qualitatively would be best.

In [None]:
with torch.no_grad():
  n_correct = 0
  n_total = 0
  for inputs, targets in batch_iterator(X_test, y_test, batch_size=batch_size):
    scores = model(inputs)
    predictions = scores.argmax(dim=2, keepdim=True).squeeze()
    mask = targets!=tag2idx['<PAD>']
    n_correct += (predictions[mask] == targets[mask]).sum().item()
    n_total += mask.sum().item()
print("Test accuracy %.1f%%" % (100*n_correct/n_total))

### Assignment2 postagger

In [None]:
!pip install datasets
!pip install conllu

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt

class POSTagger:
    def __init__(self, word_embedding_dim=32, lstm_hidden_dim=64, batch_size=256, learning_rate=0.01, epochs=5):
        self.word_embedding_dim = word_embedding_dim
        self.lstm_hidden_dim = lstm_hidden_dim
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epochs = epochs

        self.X = []
        self.y = []
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

        self.token2idx = None
        self.idx2token = None
        self.tag2idx = None
        self.idx2tag = None

        self.model = None

    def load_data(self):
        # Load the dataset
        dataset = load_dataset("universal_dependencies", "en_ewt")

        print(dataset)

        for sentence in dataset['train']:
            tokens = sentence["tokens"]
            tags = sentence["upos"]
            self.X.append(tokens)
            self.y.append(tags)

        assert len(self.X) == len(self.y)

        print(self.X[0])
        print(self.y[0])


        sentences = [sentence for sentence in zip(self.X, self.y) if len(sentence[0]) > 2]
        self.X = [s[0] for s in sentences]
        self.y = [s[1] for s in sentences]

        print("Loaded %i sentences" % len(sentences))
        print(sentences[0])

        #
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=0.1, random_state=0
        )

        assert len(self.X_train) == len(self.y_train)
        assert len(self.X_test) == len(self.y_test)
        assert len(self.X_train) + len(self.X_test) == len(self.X)

        print("The training set includes %i sentences" % len(self.X_train))
        print("The test set includes %i sentences" % len(self.X_test))

    def create_vocabulary(self):
        tokens = set()
        upos = set()


        for sentence in self.X_train:
            for token in sentence:
                tokens.add(token)

        for tags in self.y_train:
            for tag in tags:
                upos.add(tag)

        #
        self.idx2token = list(tokens)
        self.idx2token.insert(0, '<UNK>')
        self.idx2token.append('<PAD>')
        self.token2idx = {token: idx for idx, token in enumerate(self.idx2token)}

        self.idx2tag = list(upos)
        self.idx2tag.append('<PAD>')
        self.tag2idx = {tag: idx for idx, tag in enumerate(self.idx2tag)}


        print("Token to Index Mapping:", self.token2idx)
        print("Index to Token Mapping:", self.idx2token)
        print("Tag to Index Mapping:", self.tag2idx)
        print("Index to Tag Mapping:", self.idx2tag)

    def pad_and_encode(self, sentences, labels):
        assert len(sentences) == len(labels)
        assert np.all([len(sentence) == len(tags) for sentence, tags in zip(sentences, labels)])

        max_sentence_length = np.max([len(sentence) for sentence in sentences])
        padded_sentences = torch.zeros(len(sentences), max_sentence_length,
                                     dtype=torch.long)
        padded_sentences[:] = self.token2idx['<PAD>']
        padded_labels = torch.zeros(len(sentences), max_sentence_length,
                                  dtype=torch.long)
        padded_labels[:] = self.tag2idx['<PAD>']

        for i, (sentence, tags) in enumerate(zip(sentences, labels)):
            for j, token in enumerate(sentence):
                if token in self.token2idx.keys():
                    padded_sentences[i, j] = self.token2idx[token]
                else:
                    padded_sentences[i, j] = self.token2idx['<UNK>']
            for j, tag in enumerate(tags):
                padded_labels[i, j] = self.tag2idx[tag]

        return padded_sentences, padded_labels

    def batch_iterator(self, sentences, labels, batch_size=64):
        """Helper function for iterating over batches of the data"""
        assert len(sentences) == len(labels)
        for i in range(0, len(sentences), batch_size):
            X, y = self.pad_and_encode(
                sentences[i:min(i+batch_size, len(sentences))],
                labels[i:min(i+batch_size, len(sentences))]
            )
            if torch.cuda.is_available():
                yield (X.cuda(), y.cuda())
            else:
                yield (X, y)

    def initialize_model(self):
        self.model = LSTMTagger(
            word_embedding_dim=self.word_embedding_dim,
            lstm_hidden_dim=self.lstm_hidden_dim,
            vocabulary_size=len(self.token2idx),
            tagset_size=len(self.tag2idx)-1,
            token2idx=self.token2idx,
            tag2idx=self.tag2idx
        )
        print(self.model)

    def train(self):
        loss_function = nn.NLLLoss(ignore_index=self.tag2idx['<PAD>'])
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        for epoch in range(self.epochs):
            with tqdm(
                self.batch_iterator(self.X_train, self.y_train, batch_size=self.batch_size),
                total=len(self.X_train)//self.batch_size+1, unit="batch", desc="Epoch %i" % epoch
            ) as batches:
                for inputs, targets in batches:
                    self.model.zero_grad()
                    scores = self.model(inputs)
                    loss = loss_function(
                        scores.view(-1, self.model.tagset_size_),
                        targets.view(-1)
                    )
                    loss.backward()
                    optimizer.step()

                    predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                    mask = targets != self.tag2idx['<PAD>']
                    correct = (predictions[mask] == targets[mask]).sum().item()
                    accuracy = correct / mask.sum().item() * 100
                    self.model.training_accuracy_.append(accuracy)
                    self.model.training_loss_.append(loss.item())
                    batches.set_postfix(loss=loss.item(), accuracy=accuracy)

    def plot_training_progress(self):
        fig = plt.figure(figsize=(6, 4))
        ax = plt.subplot()
        ax.set_title("Plot of the (hopefully) decreasing loss over epochs")
        ax.plot(self.model.training_loss_, 'b-')
        ax.set_ylabel("Training Loss", color='b')
        ax.set_xlabel("Epoch")
        ax.tick_params(axis='y', labelcolor='b')
        ax = ax.twinx()
        ax.plot(self.model.training_accuracy_, 'r-')
        ax.set_ylabel("Accuracy [%]", color='r')
        ax.tick_params(axis='y', labelcolor='r')
        a = list(ax.axis())
        a[2] = 0
        a[3] = 100
        ax.axis(a)
        t = np.arange(0, len(self.model.training_accuracy_), len(self.X_train)//self.batch_size+1)
        ax.set_xticks(ticks=t)
        ax.set_xticklabels(labels=np.arange(len(t)))
        fig.tight_layout()
        plt.show()

    def evaluate(self):
        with torch.no_grad():
            n_correct = 0
            n_total = 0
            for inputs, targets in self.batch_iterator(self.X_test, self.y_test, batch_size=self.batch_size):
                scores = self.model(inputs)
                predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                mask = targets != self.tag2idx['<PAD>']
                n_correct += (predictions[mask] == targets[mask]).sum().item()
                n_total += mask.sum().item()
        print("Test accuracy %.1f%%" % (100*n_correct/n_total))

    def run(self):
        self.load_data()
        self.create_vocabulary()
        self.initialize_model()
        self.train()
        self.plot_training_progress()
        self.evaluate()


class LSTMTagger(nn.Module):
    def __init__(self, word_embedding_dim, lstm_hidden_dim, vocabulary_size, tagset_size, token2idx, tag2idx):

        super(LSTMTagger, self).__init__()
        self.lstm_hidden_dim_ = lstm_hidden_dim
        self.vocabulary_size_ = vocabulary_size
        self.tagset_size_ = tagset_size
        self.token2idx = token2idx

        self._word_embedding = nn.Embedding(num_embeddings=vocabulary_size,
                                           embedding_dim=word_embedding_dim,
                                           padding_idx=token2idx['<PAD>'])
        self._lstm = nn.LSTM(input_size=word_embedding_dim,
                           hidden_size=lstm_hidden_dim,
                           batch_first=True)
        self._fc = nn.Linear(lstm_hidden_dim, tagset_size)
        self._softmax = nn.LogSoftmax(dim=1)

        self.training_loss_ = list()
        self.training_accuracy_ = list()

        if torch.cuda.is_available():
            self.cuda()

    def forward(self, padded_sentences):
        """The forward pass through the network"""
        batch_size, max_sentence_length = padded_sentences.size()

        embedded_sentences = self._word_embedding(padded_sentences)

        sentence_lengths = (padded_sentences!=self.token2idx['<PAD>']).sum(dim=1)
        sentence_lengths = sentence_lengths.long().cpu()
        X = nn.utils.rnn.pack_padded_sequence(embedded_sentences, sentence_lengths,
                                            batch_first=True, enforce_sorted=False)
        lstm_out, _ = self._lstm(X)
        X, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)

        X = X.contiguous().view(-1, X.shape[2])
        tag_space = self._fc(X)
        tag_scores = self._softmax(tag_space)
        return tag_scores.view(batch_size, max_sentence_length, self.tagset_size_)


# Example usage:
if __name__ == "__main__":
    pos_tagger = POSTagger(
        word_embedding_dim=32,
        lstm_hidden_dim=64,
        batch_size=256,
        learning_rate=0.01,
        epochs=5
    )
    pos_tagger.run()

###  Use a GRU layer

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt

class POSTagger:
    def __init__(self, word_embedding_dim=32, hidden_dim=64, batch_size=256, learning_rate=0.01, epochs=5, rnn_type='lstm'):
        self.word_embedding_dim = word_embedding_dim
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.rnn_type = rnn_type.lower()

        if self.rnn_type not in ['lstm', 'gru']:
            raise ValueError("rnn_type must be either 'lstm' or 'gru'")

        self.X = []
        self.y = []
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

        self.token2idx = None
        self.idx2token = None
        self.tag2idx = None
        self.idx2tag = None

        self.model = None

    def compute_baseline_accuracy(self):
        from collections import defaultdict, Counter

        token2tags = defaultdict(Counter)
        for sentence, tags in zip(self.X_train, self.y_train):
            for tk, tg in zip(sentence, tags):
                token2tags[tk][tg] += 1

        token2best = {}
        global_tag_counter = Counter()
        for tk, c in token2tags.items():
          best_tag, _ = c.most_common(1)[0]
          token2best[tk] = best_tag
          global_tag_counter.update(c.keys())
        global_most_common_tag, _ = global_tag_counter.most_common(1)[0]


        n_total = 0
        n_correct = 0
        for sentence, tags in zip(self.X_test, self.y_test):
          for tk, tg in zip(sentence, tags):
            if tk == "<PAD>":
                continue
            if tk in token2best:
                pred = token2best[tk]
            else:
                pred = global_most_common_tag
            if pred == tg:
                n_correct += 1
            n_total += 1

        baseline_acc = 100.0 * n_correct / n_total if n_total > 0 else 0.0
        print(f"Baseline accuracy: {baseline_acc:.1f}%")
        return baseline_acc


    def load_data(self, language="en_ewt"):
        print(f"Loading dataset [universal_dependencies] with config = '{language}'")
        dataset = load_dataset("universal_dependencies", language)

        print(dataset)

        for sentence in dataset['train']:
            tokens = sentence["tokens"]
            tags = sentence["upos"]
            self.X.append(tokens)
            self.y.append(tags)

        assert len(self.X) == len(self.y)

        print(self.X[0])
        print(self.y[0])


        sentences = [sentence for sentence in zip(self.X, self.y) if len(sentence[0]) > 2]
        self.X = [s[0] for s in sentences]
        self.y = [s[1] for s in sentences]

        print("Loaded %i sentences" % len(sentences))
        print(sentences[0])


        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=0.1, random_state=0
        )

        assert len(self.X_train) == len(self.y_train)
        assert len(self.X_test) == len(self.y_test)
        assert len(self.X_train) + len(self.X_test) == len(self.X)

        print("The training set includes %i sentences" % len(self.X_train))
        print("The test set includes %i sentences" % len(self.X_test))

    def create_vocabulary(self):

        tokens = set()
        upos = set()


        for sentence in self.X_train:
            for token in sentence:
                tokens.add(token)

        for tags in self.y_train:
            for tag in tags:
                upos.add(tag)


        self.idx2token = list(tokens)
        self.idx2token.insert(0, '<UNK>')
        self.idx2token.append('<PAD>')
        self.token2idx = {token: idx for idx, token in enumerate(self.idx2token)}

        self.idx2tag = list(upos)
        self.idx2tag.append('<PAD>')
        self.tag2idx = {tag: idx for idx, tag in enumerate(self.idx2tag)}


        print("Token to Index Mapping:", self.token2idx)
        print("Index to Token Mapping:", self.idx2token)
        print("Tag to Index Mapping:", self.tag2idx)
        print("Index to Tag Mapping:", self.idx2tag)

    def pad_and_encode(self, sentences, labels):
      assert all(len(s) == len(l) for s, l in zip(sentences, labels))
      max_sentence_length = np.max([len(sentence) for sentence in sentences])

      padded_sentences = torch.zeros(len(sentences), max_sentence_length, dtype=torch.long)   # Use dict.get(token, token2idx["<UNK>"]) to handle out-of-vocabulary (OOV) tokens.
                                                                                              # If a token is not in the vocabulary, it automatically defaults to the <UNK> index.
      padded_labels = torch.zeros(len(sentences), max_sentence_length, dtype=torch.long)

      for i, (sentence, tags) in enumerate(zip(sentences, labels)):
        for j, token in enumerate(sentence):
            padded_sentences[i, j] = self.token2idx.get(token, self.token2idx["<UNK>"])

        for j, tag in enumerate(tags):
            padded_labels[i, j] = self.tag2idx[tag]

      return padded_sentences, padded_labels


    def batch_iterator(self, sentences, labels, batch_size=64):
        assert len(sentences) == len(labels)
        for i in range(0, len(sentences), batch_size):
            X, y = self.pad_and_encode(
                sentences[i:min(i+batch_size, len(sentences))],
                labels[i:min(i+batch_size, len(sentences))]
            )
            if torch.cuda.is_available():
                yield (X.cuda(), y.cuda())
            else:
                yield (X, y)

    def initialize_model(self):
        self.model = RNNTagger(
            word_embedding_dim=self.word_embedding_dim,
            hidden_dim=self.hidden_dim,
            vocabulary_size=len(self.token2idx),
            tagset_size=len(self.tag2idx)-1,
            token2idx=self.token2idx,
            tag2idx=self.tag2idx,
            rnn_type=self.rnn_type
        )
        print(self.model)

    def train(self):
        loss_function = nn.NLLLoss(ignore_index=self.tag2idx['<PAD>'])
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        for epoch in range(self.epochs):
            with tqdm(
                self.batch_iterator(self.X_train, self.y_train, batch_size=self.batch_size),
                total=len(self.X_train)//self.batch_size+1, unit="batch", desc=f"Epoch {epoch} ({self.rnn_type.upper()})"
            ) as batches:
                for inputs, targets in batches:
                    self.model.zero_grad()
                    scores = self.model(inputs)
                    loss = loss_function(
                        scores.view(-1, self.model.tagset_size_),
                        targets.view(-1)
                    )
                    loss.backward()
                    optimizer.step()

                    predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                    mask = targets != self.tag2idx['<PAD>']
                    correct = (predictions[mask] == targets[mask]).sum().item()
                    accuracy = correct / mask.sum().item() * 100
                    self.model.training_accuracy_.append(accuracy)
                    self.model.training_loss_.append(loss.item())
                    batches.set_postfix(loss=loss.item(), accuracy=accuracy)

    def plot_training_progress(self):
        fig = plt.figure(figsize=(6, 4))
        ax = plt.subplot()
        ax.set_title(f"Training Progress with {self.rnn_type.upper()}")
        ax.plot(self.model.training_loss_, 'b-')
        ax.set_ylabel("Training Loss", color='b')
        ax.set_xlabel("Epoch")
        ax.tick_params(axis='y', labelcolor='b')
        ax = ax.twinx()
        ax.plot(self.model.training_accuracy_, 'r-')
        ax.set_ylabel("Accuracy [%]", color='r')
        ax.tick_params(axis='y', labelcolor='r')
        a = list(ax.axis())
        a[2] = 0
        a[3] = 100
        ax.axis(a)
        t = np.arange(0, len(self.model.training_accuracy_), len(self.X_train)//self.batch_size+1)
        ax.set_xticks(ticks=t)
        ax.set_xticklabels(labels=np.arange(len(t)))
        fig.tight_layout()
        plt.show()

    def evaluate(self):
        baseline_acc = self.compute_baseline_accuracy()
        with torch.no_grad():
            n_correct = 0
            n_total = 0
            for inputs, targets in self.batch_iterator(self.X_test, self.y_test, batch_size=self.batch_size):
                scores = self.model(inputs)
                predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                mask = targets != self.tag2idx['<PAD>']
                n_correct += (predictions[mask] == targets[mask]).sum().item()
                n_total += mask.sum().item()
        print(f"Test accuracy with {self.rnn_type.upper()}: {100*n_correct/n_total:.1f}%")
        print(f"Baseline Accuracy is: {baseline_acc:.1f}%")

    def run(self, language="en_ewt"):
        self.load_data(language=language)
        self.create_vocabulary()
        self.initialize_model()
        self.train()
        self.plot_training_progress()
        self.evaluate()


class RNNTagger(nn.Module):
    def __init__(self, word_embedding_dim, hidden_dim, vocabulary_size, tagset_size, token2idx, tag2idx, rnn_type='lstm'):
        super(RNNTagger, self).__init__()
        self.hidden_dim_ = hidden_dim
        self.vocabulary_size_ = vocabulary_size
        self.tagset_size_ = tagset_size
        self.token2idx = token2idx
        self.rnn_type = rnn_type.lower()

        self._word_embedding = nn.Embedding(
            num_embeddings=vocabulary_size,
            embedding_dim=word_embedding_dim,
            padding_idx=token2idx['<PAD>']
        )


        if self.rnn_type == 'lstm':
            self._rnn = nn.LSTM(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True
            )
        elif self.rnn_type == 'gru':
            self._rnn = nn.GRU(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True
            )

        self._fc = nn.Linear(hidden_dim, tagset_size)
        self._softmax = nn.LogSoftmax(dim=1)

        self.training_loss_ = list()
        self.training_accuracy_ = list()

        if torch.cuda.is_available():
            self.cuda()

    def forward(self, padded_sentences):
        """The forward pass through the network"""
        batch_size, max_sentence_length = padded_sentences.size()

        embedded_sentences = self._word_embedding(padded_sentences)

        sentence_lengths = (padded_sentences != self.token2idx['<PAD>']).sum(dim=1)
        sentence_lengths = sentence_lengths.long().cpu()

        X = nn.utils.rnn.pack_padded_sequence(
            embedded_sentences,
            sentence_lengths,
            batch_first=True,
            enforce_sorted=False
        )

        rnn_out, _ = self._rnn(X)
        X, _ = nn.utils.rnn.pad_packed_sequence(rnn_out, batch_first=True)

        X = X.contiguous().view(-1, X.shape[2])
        tag_space = self._fc(X)
        tag_scores = self._softmax(tag_space)

        return tag_scores.view(batch_size, max_sentence_length, self.tagset_size_)



if __name__ == "__main__":
    rnn_type = 'gru'

    languages = ["en_ewt", "es_gsd", "zh_gsdsimp", "de_gsd"]
    for lang in languages:
        print(f"=== Extension1: RNN={rnn_type}, Language={lang} ===")
        pos_tagger = POSTagger(
            word_embedding_dim=32,
            hidden_dim=64,
            batch_size=256,
            learning_rate=0.01,
            epochs=5,
            rnn_type=rnn_type
        )
        pos_tagger.run(language=lang)
        print("\n")

In these training curves and test results, the models using GRU layers exhibit a good convergence trend across four languages (English, Spanish, Chinese, and German). As the training epochs progress, the training loss drops rapidly, while the accuracy curve rises significantly.

This variation in accuracy may be related to differences in language structure and corpus size. Spanish and English have relatively clear morphological variations and syntactic rules, making it easier for the model to learn stable part-of-speech features, leading to higher accuracy. In contrast, Chinese lacks morphological markers and requires word segmentation, which means the model needs deeper contextual information to accurately distinguish part-of-speech categories, resulting in slightly lower accuracy. Although German belongs to the same Indo-European language family as English, its complex inflectional morphology poses higher demands on the memory and generalization capabilities of RNN models.



 ### Use a bi-directional RNN layer

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt

class POSTagger:
    def __init__(self, word_embedding_dim=32, hidden_dim=64, batch_size=256, learning_rate=0.01, epochs=5, rnn_type='lstm', bidirectional=True):
        self.word_embedding_dim = word_embedding_dim
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.rnn_type = rnn_type.lower()
        self.bidirectional = bidirectional
        if self.rnn_type not in ['lstm', 'gru']:
            raise ValueError("rnn_type must be either 'lstm' or 'gru'")

        self.X = []
        self.y = []
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

        self.token2idx = None
        self.idx2token = None
        self.tag2idx = None
        self.idx2tag = None

        self.model = None

    def compute_baseline_accuracy(self):
        from collections import defaultdict, Counter
        token2tags = defaultdict(Counter)
        for sentence, tags in zip(self.X_train, self.y_train):
            for tk, tg in zip(sentence, tags):
                token2tags[tk][tg] += 1

        token2best = {}
        global_tag_counter = Counter()
        for tk, c in token2tags.items():
          best_tag, _ = c.most_common(1)[0]
          token2best[tk] = best_tag
          global_tag_counter.update(c.keys())

        global_most_common_tag, _ = global_tag_counter.most_common(1)[0]

        n_total = 0
        n_correct = 0
        for sentence, tags in zip(self.X_test, self.y_test):
          for tk, tg in zip(sentence, tags):
            if tk == "<PAD>":
                continue
            if tk in token2best:
                pred = token2best[tk]
            else:
                pred = global_most_common_tag
            if pred == tg:
                n_correct += 1
            n_total += 1

        baseline_acc = 100.0 * n_correct / n_total if n_total > 0 else 0.0
        print(f"Baseline accuracy: {baseline_acc:.1f}%")
        return baseline_acc

    def load_data(self, language="en_ewt"):
        print(f"Loading dataset [universal_dependencies] with config = '{language}'")
        dataset = load_dataset("universal_dependencies", language)

        print(dataset)

        for sentence in dataset['train']:
            tokens = sentence["tokens"]
            tags = sentence["upos"]
            self.X.append(tokens)
            self.y.append(tags)

        assert len(self.X) == len(self.y)

        print(self.X[0])
        print(self.y[0])


        sentences = [sentence for sentence in zip(self.X, self.y) if len(sentence[0]) > 2]
        self.X = [s[0] for s in sentences]
        self.y = [s[1] for s in sentences]

        print("Loaded %i sentences" % len(sentences))
        print(sentences[0])


        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=0.1, random_state=0
        )

        assert len(self.X_train) == len(self.y_train)
        assert len(self.X_test) == len(self.y_test)
        assert len(self.X_train) + len(self.X_test) == len(self.X)

        print("The training set includes %i sentences" % len(self.X_train))
        print("The test set includes %i sentences" % len(self.X_test))

    def create_vocabulary(self):

        tokens = set()
        upos = set()


        for sentence in self.X_train:
            for token in sentence:
                tokens.add(token)

        for tags in self.y_train:
            for tag in tags:
                upos.add(tag)


        self.idx2token = list(tokens)
        self.idx2token.insert(0, '<UNK>')
        self.idx2token.append('<PAD>')
        self.token2idx = {token: idx for idx, token in enumerate(self.idx2token)}

        self.idx2tag = list(upos)
        self.idx2tag.append('<PAD>')
        self.tag2idx = {tag: idx for idx, tag in enumerate(self.idx2tag)}


        print("Token to Index Mapping:", self.token2idx)
        print("Index to Token Mapping:", self.idx2token)
        print("Tag to Index Mapping:", self.tag2idx)
        print("Index to Tag Mapping:", self.idx2tag)

    def pad_and_encode(self, sentences, labels):
      assert all(len(s) == len(l) for s, l in zip(sentences, labels))
      max_sentence_length = np.max([len(sentence) for sentence in sentences])

      padded_sentences = torch.zeros(len(sentences), max_sentence_length, dtype=torch.long)
      padded_labels = torch.zeros(len(sentences), max_sentence_length, dtype=torch.long)      # Use dict.get(token, token2idx["<UNK>"]) to handle out-of-vocabulary (OOV) tokens.
                                                                                              # If a token is not in the vocabulary, it automatically defaults to the <UNK> index.

      for i, (sentence, tags) in enumerate(zip(sentences, labels)):
        for j, token in enumerate(sentence):
            padded_sentences[i, j] = self.token2idx.get(token, self.token2idx["<UNK>"])

        for j, tag in enumerate(tags):
            padded_labels[i, j] = self.tag2idx[tag]

      return padded_sentences, padded_labels

    def batch_iterator(self, sentences, labels, batch_size=64):
        assert len(sentences) == len(labels)
        for i in range(0, len(sentences), batch_size):
            X, y = self.pad_and_encode(
                sentences[i:min(i+batch_size, len(sentences))],
                labels[i:min(i+batch_size, len(sentences))]
            )
            if torch.cuda.is_available():
                yield (X.cuda(), y.cuda())
            else:
                yield (X, y)

    def initialize_model(self):
        self.model = RNNTagger(
            word_embedding_dim=self.word_embedding_dim,
            hidden_dim=self.hidden_dim,
            vocabulary_size=len(self.token2idx),
            tagset_size=len(self.tag2idx)-1,
            token2idx=self.token2idx,
            tag2idx=self.tag2idx,
            rnn_type=self.rnn_type,
            bidirectional=self.bidirectional
        )
        print(self.model)

    def train(self):
        loss_function = nn.NLLLoss(ignore_index=self.tag2idx['<PAD>'])
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        for epoch in range(self.epochs):
            with tqdm(
                self.batch_iterator(self.X_train, self.y_train, batch_size=self.batch_size),
                total=len(self.X_train)//self.batch_size+1, unit="batch",
                desc=f"Epoch {epoch} ({self.rnn_type.upper()}, {'Bi' if self.bidirectional else 'Uni'}directional)"
            ) as batches:
                for inputs, targets in batches:
                    self.model.zero_grad()
                    scores = self.model(inputs)
                    loss = loss_function(
                        scores.view(-1, self.model.tagset_size_),
                        targets.view(-1)
                    )
                    loss.backward()
                    optimizer.step()

                    predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                    mask = targets != self.tag2idx['<PAD>']
                    correct = (predictions[mask] == targets[mask]).sum().item()
                    accuracy = correct / mask.sum().item() * 100
                    self.model.training_accuracy_.append(accuracy)
                    self.model.training_loss_.append(loss.item())
                    batches.set_postfix(loss=loss.item(), accuracy=accuracy)

    def plot_training_progress(self):
        fig = plt.figure(figsize=(6, 4))
        ax = plt.subplot()
        direction = 'Bidirectional' if self.bidirectional else 'Unidirectional'
        ax.set_title(f"Training Progress with {self.rnn_type.upper()} ({direction})")
        ax.plot(self.model.training_loss_, 'b-')
        ax.set_ylabel("Training Loss", color='b')
        ax.set_xlabel("Epoch")
        ax.tick_params(axis='y', labelcolor='b')
        ax = ax.twinx()
        ax.plot(self.model.training_accuracy_, 'r-')
        ax.set_ylabel("Accuracy [%]", color='r')
        ax.tick_params(axis='y', labelcolor='r')
        a = list(ax.axis())
        a[2] = 0
        a[3] = 100
        ax.axis(a)
        t = np.arange(0, len(self.model.training_accuracy_), len(self.X_train)//self.batch_size+1)
        ax.set_xticks(ticks=t)
        ax.set_xticklabels(labels=np.arange(len(t)))
        fig.tight_layout()
        plt.show()

    def evaluate(self):
        baseline_acc = self.compute_baseline_accuracy()
        with torch.no_grad():
            n_correct = 0
            n_total = 0
            for inputs, targets in self.batch_iterator(self.X_test, self.y_test, batch_size=self.batch_size):
                scores = self.model(inputs)
                predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                mask = targets != self.tag2idx['<PAD>']
                n_correct += (predictions[mask] == targets[mask]).sum().item()
                n_total += mask.sum().item()

        direction = 'Bidirectional' if self.bidirectional else 'Unidirectional'
        print(f"Test accuracy with {self.rnn_type.upper()} ({direction}): {100*n_correct/n_total:.1f}%")
        print(f"Baseline Accuracy is: {baseline_acc:.1f}%")

    def run(self, language="en_ewt"):
        self.load_data(language=language)
        self.create_vocabulary()
        self.initialize_model()
        self.train()
        self.plot_training_progress()
        self.evaluate()


class RNNTagger(nn.Module):
    def __init__(self, word_embedding_dim, hidden_dim, vocabulary_size, tagset_size, token2idx, tag2idx, rnn_type='lstm', bidirectional=True):
        super(RNNTagger, self).__init__()
        self.hidden_dim_ = hidden_dim
        self.vocabulary_size_ = vocabulary_size
        self.tagset_size_ = tagset_size
        self.token2idx = token2idx
        self.rnn_type = rnn_type.lower()
        self.bidirectional = bidirectional

        self._word_embedding = nn.Embedding(
            num_embeddings=vocabulary_size,
            embedding_dim=word_embedding_dim,
            padding_idx=token2idx['<PAD>']
        )


        self.num_directions = 2 if bidirectional else 1


        if self.rnn_type == 'lstm':
            self._rnn = nn.LSTM(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True,
                bidirectional=bidirectional
            )
        elif self.rnn_type == 'gru':
            self._rnn = nn.GRU(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True,
                bidirectional=bidirectional
            )


        rnn_output_dim = hidden_dim * self.num_directions

        self._fc = nn.Linear(rnn_output_dim, tagset_size)
        self._softmax = nn.LogSoftmax(dim=1)

        self.training_loss_ = list()
        self.training_accuracy_ = list()

        if torch.cuda.is_available():
            self.cuda()

    def forward(self, padded_sentences):
        batch_size, max_sentence_length = padded_sentences.size()

        embedded_sentences = self._word_embedding(padded_sentences)

        sentence_lengths = (padded_sentences != self.token2idx['<PAD>']).sum(dim=1)
        sentence_lengths = sentence_lengths.long().cpu()

        X = nn.utils.rnn.pack_padded_sequence(
            embedded_sentences,
            sentence_lengths,
            batch_first=True,
            enforce_sorted=False
        )

        rnn_out, _ = self._rnn(X)
        X, _ = nn.utils.rnn.pad_packed_sequence(rnn_out, batch_first=True)

        X = X.contiguous().view(-1, X.shape[2])
        tag_space = self._fc(X)
        tag_scores = self._softmax(tag_space)

        return tag_scores.view(batch_size, max_sentence_length, self.tagset_size_)


if __name__ == "__main__":
    rnn_type = 'lstm'
    bidirectional = True

    languages = ["en_ewt", "es_gsd", "zh_gsdsimp", "de_gsd"]
    for lang in languages:
        print(f"=== Extension1: RNN={rnn_type}, Language={lang} ===")
        pos_tagger = POSTagger(
            word_embedding_dim=32,
            hidden_dim=64,
            batch_size=256,
            learning_rate=0.01,
            epochs=5,
            rnn_type=rnn_type
        )
        pos_tagger.run(language=lang)
        print("\n")

In this experiment, we used a Bidirectional LSTM (BiLSTM) model for part-of-speech tagging on English, Spanish, Chinese, and German. It can be observed that English and Spanish achieve relatively higher accuracy, which may be attributed to their more regular morphological variations and word order characteristics, making it easier for the model to extract relevant features. German also achieves a good performance, but due to its complex inflectional morphology, the model requires stronger capabilities to capture case inflections and word order information.

The accuracy for Chinese is noticeably lower, indicating that despite BiLSTM’s ability to capture bidirectional context, it still struggles with Chinese features, particularly in the absence of explicit morphological markers and the ambiguity introduced by word segmentation. Additionally, differences in dataset sizes and annotation standards across languages also affect the model’s learning performance.

### Use pyTorch's Dataset

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt


class POSDataset(Dataset):
    def __init__(self, sentences, tags, token2idx, tag2idx):
        self.sentences = sentences
        self.tags = tags
        self.token2idx = token2idx
        self.tag2idx = tag2idx

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        tag_seq = self.tags[idx]


        token_indices = []
        for token in sentence:
            if token in self.token2idx:
                token_indices.append(self.token2idx[token])
            else:
                token_indices.append(self.token2idx['<UNK>'])


        tag_indices = [self.tag2idx[tag] for tag in tag_seq]

        return {
            'sentence': sentence,
            'tag_seq': tag_seq,
            'token_indices': token_indices,
            'tag_indices': tag_indices,
            'length': len(sentence)
        }


def collate_fn(batch):
    batch = sorted(batch, key=lambda x: x['length'], reverse=True)

    max_len = max([item['length'] for item in batch])

    batch_size = len(batch)
    token_indices_batch = torch.zeros(batch_size, max_len, dtype=torch.long)
    tag_indices_batch = torch.zeros(batch_size, max_len, dtype=torch.long)
    lengths = torch.LongTensor([item['length'] for item in batch])

    for i, item in enumerate(batch):
        tokens = torch.tensor(item['token_indices'], dtype=torch.long)
        tags = torch.tensor(item['tag_indices'], dtype=torch.long)
        token_indices_batch[i, :len(tokens)] = tokens
        tag_indices_batch[i, :len(tags)] = tags

    return {
        'token_indices': token_indices_batch,
        'tag_indices': tag_indices_batch,
        'lengths': lengths,
        'sentences': [item['sentence'] for item in batch],
        'tag_seqs': [item['tag_seq'] for item in batch]
    }


class POSTagger:
    def __init__(self, word_embedding_dim=32, hidden_dim=64, batch_size=256, learning_rate=0.01, epochs=5, rnn_type='lstm', bidirectional=False):
        self.word_embedding_dim = word_embedding_dim
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.rnn_type = rnn_type.lower()
        self.bidirectional = bidirectional

        if self.rnn_type not in ['lstm', 'gru']:
            raise ValueError("rnn_type must be either 'lstm' or 'gru'")

        self.X = []
        self.y = []
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

        self.token2idx = None
        self.idx2token = None
        self.tag2idx = None
        self.idx2tag = None

        self.model = None


        self.train_dataset = None
        self.test_dataset = None
        self.train_loader = None
        self.test_loader = None

    def load_data(self, language="en_ewt"):
        print(f"Loading dataset [universal_dependencies] with config = '{language}'")
        dataset = load_dataset("universal_dependencies", language)

        print(dataset)

        for sentence in dataset['train']:
            tokens = sentence["tokens"]
            tags = sentence["upos"]
            self.X.append(tokens)
            self.y.append(tags)

        assert len(self.X) == len(self.y)

        print(self.X[0])
        print(self.y[0])

        sentences = [sentence for sentence in zip(self.X, self.y) if len(sentence[0]) > 2]
        self.X = [s[0] for s in sentences]
        self.y = [s[1] for s in sentences]

        print("Loaded %i sentences" % len(sentences))
        print(sentences[0])

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=0.1, random_state=0
        )

        assert len(self.X_train) == len(self.y_train)
        assert len(self.X_test) == len(self.y_test)
        assert len(self.X_train) + len(self.X_test) == len(self.X)

        print("The training set includes %i sentences" % len(self.X_train))
        print("The test set includes %i sentences" % len(self.X_test))

    def create_vocabulary(self):
        tokens = set()
        upos = set()

        for sentence in self.X_train:
            for token in sentence:
                tokens.add(token)

        for tags in self.y_train:
            for tag in tags:
                upos.add(tag)

        self.idx2token = list(tokens)
        self.idx2token.insert(0, '<UNK>')
        self.idx2token.append('<PAD>')
        self.token2idx = {token: idx for idx, token in enumerate(self.idx2token)}

        self.idx2tag = list(upos)
        self.idx2tag.append('<PAD>')
        self.tag2idx = {tag: idx for idx, tag in enumerate(self.idx2tag)}

        print("Token to Index Mapping:", self.token2idx)
        print("Index to Token Mapping:", self.idx2token)
        print("Tag to Index Mapping:", self.tag2idx)
        print("Index to Tag Mapping:", self.idx2tag)

    def setup_dataloaders(self):
        self.train_dataset = POSDataset(
            self.X_train,
            self.y_train,
            self.token2idx,
            self.tag2idx
        )

        self.test_dataset = POSDataset(
            self.X_test,
            self.y_test,
            self.token2idx,
            self.tag2idx
        )

        self.train_loader = DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            collate_fn=collate_fn,
            num_workers=0
        )

        self.test_loader = DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            collate_fn=collate_fn,
            num_workers=0
        )

        print(f"Created DataLoaders - Training batches: {len(self.train_loader)}, Test batches: {len(self.test_loader)}")

    def initialize_model(self):
        self.model = RNNTagger(
            word_embedding_dim=self.word_embedding_dim,
            hidden_dim=self.hidden_dim,
            vocabulary_size=len(self.token2idx),
            tagset_size=len(self.tag2idx)-1,
            token2idx=self.token2idx,
            tag2idx=self.tag2idx,
            rnn_type=self.rnn_type,
            bidirectional=self.bidirectional
        )
        print(self.model)

    def train(self):
        loss_function = nn.NLLLoss(ignore_index=self.tag2idx['<PAD>'])
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        for epoch in range(self.epochs):
            self.model.train()
            running_loss = 0.0
            running_accuracy = 0.0
            total_predictions = 0

            progress_bar = tqdm(
                self.train_loader,
                desc=f"Epoch {epoch+1}/{self.epochs} ({self.rnn_type.upper()}, {'Bi' if self.bidirectional else 'Uni'}directional)",
                unit="batch"
            )

            for batch in progress_bar:
                token_indices = batch['token_indices']
                tag_indices = batch['tag_indices']
                lengths = batch['lengths']

                if torch.cuda.is_available():
                    token_indices = token_indices.cuda()
                    tag_indices = tag_indices.cuda()


                optimizer.zero_grad()


                scores = self.model(token_indices, lengths)


                loss = loss_function(
                    scores.view(-1, self.model.tagset_size_),
                    tag_indices.view(-1)
                )


                loss.backward()
                optimizer.step()


                predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                mask = tag_indices != self.tag2idx['<PAD>']
                correct = (predictions[mask] == tag_indices[mask]).sum().item()
                total = mask.sum().item()


                running_loss += loss.item()
                running_accuracy += correct
                total_predictions += total


                batch_accuracy = 100 * correct / total if total > 0 else 0
                progress_bar.set_postfix(
                    loss=f"{loss.item():.4f}",
                    accuracy=f"{batch_accuracy:.2f}%"
                )


                self.model.training_loss_.append(loss.item())
                self.model.training_accuracy_.append(batch_accuracy)


            epoch_loss = running_loss / len(self.train_loader)
            epoch_accuracy = 100 * running_accuracy / total_predictions if total_predictions > 0 else 0
            print(f"Epoch {epoch+1}/{self.epochs} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")

    def plot_training_progress(self):
        fig = plt.figure(figsize=(6, 4))
        ax = plt.subplot()
        direction = 'Bidirectional' if self.bidirectional else 'Unidirectional'
        ax.set_title(f"Training Progress with {self.rnn_type.upper()} ({direction})")
        ax.plot(self.model.training_loss_, 'b-')
        ax.set_ylabel("Training Loss", color='b')
        ax.set_xlabel("Batch")
        ax.tick_params(axis='y', labelcolor='b')
        ax = ax.twinx()
        ax.plot(self.model.training_accuracy_, 'r-')
        ax.set_ylabel("Accuracy [%]", color='r')
        ax.tick_params(axis='y', labelcolor='r')
        a = list(ax.axis())
        a[2] = 0
        a[3] = 100
        ax.axis(a)
        fig.tight_layout()
        plt.show()

    def evaluate(self):
        self.model.eval()
        n_correct = 0
        n_total = 0

        with torch.no_grad():
            for batch in tqdm(self.test_loader, desc="Evaluating", unit="batch"):

                token_indices = batch['token_indices']
                tag_indices = batch['tag_indices']
                lengths = batch['lengths']

                if torch.cuda.is_available():
                    token_indices = token_indices.cuda()
                    tag_indices = tag_indices.cuda()


                scores = self.model(token_indices, lengths)


                predictions = scores.argmax(dim=2, keepdim=True).squeeze()
                mask = tag_indices != self.tag2idx['<PAD>']
                n_correct += (predictions[mask] == tag_indices[mask]).sum().item()
                n_total += mask.sum().item()

        accuracy = 100 * n_correct / n_total if n_total > 0 else 0
        direction = 'Bidirectional' if self.bidirectional else 'Unidirectional'
        print(f"Test accuracy with {self.rnn_type.upper()} ({direction}): {accuracy:.2f}%")

        return accuracy

    def run(self, language="en_ewt"):
        self.load_data(language=language)
        self.create_vocabulary()
        self.setup_dataloaders()
        self.initialize_model()
        self.train()
        self.plot_training_progress()
        self.evaluate()


class RNNTagger(nn.Module):
    def __init__(self, word_embedding_dim, hidden_dim, vocabulary_size, tagset_size, token2idx, tag2idx, rnn_type='lstm', bidirectional=False):
        super(RNNTagger, self).__init__()
        self.hidden_dim_ = hidden_dim
        self.vocabulary_size_ = vocabulary_size
        self.tagset_size_ = tagset_size
        self.token2idx = token2idx
        self.rnn_type = rnn_type.lower()
        self.bidirectional = bidirectional

        self._word_embedding = nn.Embedding(
            num_embeddings=vocabulary_size,
            embedding_dim=word_embedding_dim,
            padding_idx=token2idx['<PAD>']
        )


        self.num_directions = 2 if bidirectional else 1


        if self.rnn_type == 'lstm':
            self._rnn = nn.LSTM(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True,
                bidirectional=bidirectional
            )
        elif self.rnn_type == 'gru':
            self._rnn = nn.GRU(
                input_size=word_embedding_dim,
                hidden_size=hidden_dim,
                batch_first=True,
                bidirectional=bidirectional
            )


        rnn_output_dim = hidden_dim * self.num_directions

        self._fc = nn.Linear(rnn_output_dim, tagset_size)
        self._softmax = nn.LogSoftmax(dim=1)

        self.training_loss_ = list()
        self.training_accuracy_ = list()

        if torch.cuda.is_available():
            self.cuda()

    def forward(self, padded_sentences, lengths):
        """The forward pass through the network"""
        batch_size, max_sentence_length = padded_sentences.size()

        embedded_sentences = self._word_embedding(padded_sentences)


        X = nn.utils.rnn.pack_padded_sequence(
            embedded_sentences,
            lengths,
            batch_first=True,
            enforce_sorted=True
        )


        rnn_out, _ = self._rnn(X)


        X, _ = nn.utils.rnn.pad_packed_sequence(rnn_out, batch_first=True)

        X = X.contiguous().view(-1, X.shape[2])
        tag_space = self._fc(X)
        tag_scores = self._softmax(tag_space)

        return tag_scores.view(batch_size, max_sentence_length, self.tagset_size_)



if __name__ == "__main__":
    rnn_type = 'lstm'
    bidirectional = True

    languages = ["en_ewt", "es_gsd", "zh_gsdsimp", "de_gsd"]
    for lang in languages:
        print(f"=== Extension1: RNN={rnn_type}, Language={lang} ===")
        pos_tagger = POSTagger(
            word_embedding_dim=32,
            hidden_dim=64,
            batch_size=256,
            learning_rate=0.01,
            epochs=5,
            rnn_type=rnn_type
        )
        pos_tagger.run(language=lang)
        print("\n")

After using the PyTorch Dataset for data loading, the training curves and final accuracy for all four languages performed well: English and Spanish achieved over 97%, German reached 96%+, while Chinese was slightly lower at around 92%.  

Overall, the training loss (blue line) decreased rapidly with the number of batches, and the accuracy (red line) continued to rise, indicating that the model was able to converge stably during mini-batch training. In comparison, English and Spanish exhibit higher morphological or syntactic predictability, making them easier for the model to learn. Chinese, on the other hand, lacks morphological markers and involves word segmentation ambiguities, leading to slightly lower accuracy. German falls between these two but still achieves strong results.  

This experiment demonstrates that with a standardized Dataset/DataLoader processing pipeline, the model can still achieve high accuracy in multilingual part-of-speech tagging tasks.

### Model Complexity Comparison

In [None]:
model_complexities = [
    {"name": "LowComplex", "embedding_dim": 32, "hidden_dim": 64},
    {"name": "HighComplex", "embedding_dim": 64, "hidden_dim": 128},
]

languages = ["en_ewt", "es_gsd", "zh_gsdsimp", "de_gsd"]

for mc in model_complexities:
    print(f"=== Model Complexity: {mc['name']} (emb={mc['embedding_dim']}, hid={mc['hidden_dim']}) ===")
    for lang in ["en_ewt", "es_gsd", "zh_gsdsimp", "de_gsd"]:
        print(f"--- Language: {lang} ---")

        tagger = POSTagger(
            word_embedding_dim=mc["embedding_dim"],
            hidden_dim=mc["hidden_dim"],
            batch_size=256,
            learning_rate=0.01,
            epochs=5,
            rnn_type='lstm'
        )
        tagger.run(language=lang)
        print()


Under the low-complexity model, the accuracy for the four languages mostly falls between 90% and 97%. With the high-complexity model, accuracy generally improves further, with some languages reaching around 98% to 99%.  

Overall, increasing the model dimensions allows for better capturing of morphological, word order, and contextual features across different languages, leading to faster convergence or higher final accuracy. However, for some languages, the improvement is relatively limited, suggesting that under this task and dataset size, a smaller network is already capable of learning the key patterns. On the other hand, while larger models tend to reach high accuracy more quickly in the early training stages, it is also important to consider the potential risk of overfitting.