# PyTorch POS Tagging

## Requirements
- PyTorch
- huggingface datasets
- tqdm
- spacy

In [None]:
%pip install spacy
# %conda install spacy # or install using conda
%pip install datasets

In [None]:
# download resources for english
# `run` has to be replaced by `python` if run in a shell
%run -m spacy download en_core_web_sm

In [41]:
import os
import zipfile
import random
from functools import partial

from tqdm import tqdm
import torch
from datasets import load_dataset
from huggingface_hub import hf_hub_download
from torch.utils.data import DataLoader
from tqdm import trange

print("Torch Version: ", torch.__version__)

Torch Version:  2.5.1+cu124


Loads the POS tagging dataset from the Hugging Face hub and prepares it for further processing.

In [5]:
dataset = load_dataset("batterydata/pos_tagging")

Displays the loaded dataset followed by its training and test splits.

In [None]:
print(dataset)
print(dataset["train"])
print(dataset["test"])

### Some global settings

In [7]:
EMB_CACHE = os.path.expanduser("./glove/")
DATASET_ROOT = os.path.expanduser("./")
BATCH_SIZE = 16 # make sure that batches fit into your device's memory but note that the batch size influences your training (it is a hyperparameter)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 'cuda' for GPU (optional specify device id) and 'cpu' for CPU

## Our neural network consists of one fully connected linear layer

The softmax is part of the loss function in PyTorch, so you can omit this in the forward function.

The embedding layer
- maps from indices to vectors
- is not trained (freezed)

In [9]:
class Net(torch.nn.Module):
    # this resembles a really simple neural network: an embedding layer followed by a fully
    # connected linear layer such that predictions are computed for each token in the sequence
    # and batch independently
    def __init__(self, embedding_vectors, num_classes):
        super().__init__()
        # PyTorch's embedding layer maps from indices to embeddings, freeze will tell PyTorch to
        # not train this layer, i.e. not modifying any weight
        self.embedding = torch.nn.Embedding.from_pretrained(embedding_vectors, freeze=True)
        # a fully connected linear layer mapping the embedded vector to a vector of fixed size
        # (num_classes in this case)
        self.fc = torch.nn.Linear(embedding_vectors.size(1), num_classes)

    def forward(self, inputs):
        # simple forwarding through our model
        # PyTorch takes care of keeping track of the operations for the backward pass
        emmedded_inputs = self.embedding(inputs)
        outputs = self.fc(emmedded_inputs)
        return outputs

### GloVe
GloVe embeddings were trained with a special objective.
Word pairs share the same underlying concept: Vector differences should be roughly equal.

<img src="https://nlp.stanford.edu/projects/glove/images/man_woman.jpg" width=500/>\
source: https://nlp.stanford.edu/projects/glove/



### Create iterator such that each iteration returns a batch from shuffled data

In [None]:
# Download the GloVe embeddings
glove = hf_hub_download("stanfordnlp/glove", "glove.6B.zip")

with zipfile.ZipFile(glove, "r") as f:
    print(f.namelist())

Open the 'glove.6B.300d.txt' file from the downloaded GloVe archive and print the first few lines for inspection.

In [None]:
# There are multiple files with different dimensionality of the features in the zip archive: 50d, 100d, 200d, 300d
filename = "glove.6B.300d.txt"
with zipfile.ZipFile(glove, "r") as f:
    for idx, line in enumerate(f.open(filename)):
        print(line)
        if idx == 5:
            break

Unpack GloVe embeddings from a zip file, build a word-to-index dictionary, and store each word's embedding vector in a list.

In [13]:
# Unpack the downloaded file
word_to_index = dict()
embeddings = []

with zipfile.ZipFile(glove, "r") as f:
    for idx, line in enumerate(f.open(filename)):
        values = line.split()
        word = values[0].decode("utf-8")
        features = torch.tensor([float(value) for value in values[1:]])
        word_to_index[word] = idx
        embeddings.append(features)

Unpack the GloVe embeddings, create a word-to-index dictionary, and store each word's embedding vector in a list.

In [None]:
# Last token in the vocabulary is '<unk>' which is used for out-of-vocabulary words
# We also add a '<pad>' token to the vocabulary for padding sequences
word_to_index["<pad>"] = len(word_to_index)
padding_token_id = word_to_index["<pad>"]
unk_token_id = word_to_index["<unk>"]

embeddings.append(torch.zeros(embeddings[0].shape))

# Convert the list of tensors to a single tensor
embeddings = torch.stack(embeddings)

print(f"Embedding shape: {embeddings.size(1)}")
print(f"Padding token id: {padding_token_id}")
print(f"Unknown token id: {unk_token_id}")

Create dictionaries to map labels to indices and vice versa, and print the number of unique classes.

In [None]:
labels_unique = list(set([label for sample in dataset["train"] for label in sample["labels"]]))
print(labels_unique)
print(f"Number of classes: {len(labels_unique)}")
ctoi = {label: idx for idx, label in enumerate(labels_unique)}
itoc = {idx: label for label, idx in ctoi.items()}
print(ctoi)
print(itoc)

Create functions to tokenize text, map tokens and labels to indices, and prepare the dataset for training.

In [None]:
def tokenize(text: str):
    return text.lower().split()


def map_token_to_index(token):
    # Return the index of the token or the index of the '<unk>' token if the token is not in the vocabulary
    return word_to_index.get(token, unk_token_id)


def map_text_to_indices(text: str):
    return [map_token_to_index(token.lower()) for token in text]


def map_labels_to_indices(labels: list):
    return [ctoi[label] for label in labels]


def prepare_dataset(dataset):
    #return map(lambda x: {"token_ids": map_text_to_indices(x["words"])}, dataset)
    dataset = dataset.map(lambda x: {"token_ids": map_text_to_indices(x["words"])}, num_proc=4)
    dataset = dataset.map(lambda x: {"label_ids": map_labels_to_indices(x["labels"])}, num_proc=4)
    return dataset


dataset_train_tokenized = prepare_dataset(dataset["train"])
dataset_valid_tokenized = prepare_dataset(dataset["test"])

# Print the first sample in the tokenized training dataset
print(dataset_train_tokenized[0].keys())

Tokenize the text, map tokens and labels to indices, and prepare the dataset for training.

In [None]:
def pad_inputs(batch, keys_to_pad=["token_ids", "label_ids"], padding_value=-1):
    # Pad keys_to_pad to the maximum length in batch
    padded_batch = {}
    for key in keys_to_pad:
        # Get maximum length in batch
        max_len = max([len(sample[key]) for sample in batch])
        # Pad all samples to the maximum length
        padded_batch[key] = torch.tensor(
            [
                sample[key] + [padding_value] * (max_len - len(sample[key]))
                for sample in batch
            ]
        )
    # Add remaining keys to the batch
    for key in batch[0].keys():
        if key not in keys_to_pad:
            padded_batch[key] = torch.tensor([sample[key] for sample in batch])
    return padded_batch


def get_dataloader(dataset, batch_size=32, shuffle=False):
    # Create a DataLoader for the dataset
    return DataLoader(
        dataset,
        batch_size=batch_size,
        collate_fn=partial(pad_inputs, padding_value=padding_token_id),
        shuffle=shuffle,
    )


# We select the columns that we want to keep in the dataset
dataset_train_tokenized = dataset_train_tokenized.with_format(
    columns=["token_ids", "label_ids"]
)

dataset_valid_tokenized = dataset_train_tokenized = dataset_valid_tokenized.with_format(
    columns=["token_ids", "label_ids"]
)

# Create a DataLoader for the training dataset
dataloader_train = get_dataloader(dataset_train_tokenized, batch_size=8, shuffle=True)
dataloader_valid = get_dataloader(dataset_valid_tokenized, batch_size=8, shuffle=True)

for batch in dataloader_train:
    token_ids = batch["token_ids"]
    labels = batch["label_ids"]
    print(token_ids)
    print(labels)
    break

## Set up model, loss and optimizer
- Cross Entropy is Softmax + Negative Log Likelihood
- As optimizer we use Adam (adapts the learning rate per weight)

(run this only once as Jupyter keeps the model (including the weights) and the optimizer in memory)

In [18]:
# set up model and optimizer
model = Net(embedding_vectors=embeddings, num_classes=len(ctoi)).to(DEVICE)
criterion = torch.nn.CrossEntropyLoss(reduction='mean',ignore_index=padding_token_id)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
metric_dict = {'loss': '------', 'accuracy': '------'}

## Evaluation function comparing prediction with gold label

In [19]:
def evaluate(data_iter, net):
    correct_count = 0
    total_count = 0
    for i, batch in enumerate(data_iter):
        # extract input and labels
        
        token_ids = batch["token_ids"].to(device=DEVICE)
        labels = batch["label_ids"].to(device=DEVICE)

        # predict only
        with torch.no_grad():
            outputs = net(token_ids)
        outputs_classes = outputs.argmax(dim=2)

        # compute amount of correct predictions
        # sequence lengths within the batch might be different, so we need to take care of that
        inputs_lengths = (token_ids != 400001).sum(dim=1)
        
        total_count += inputs_lengths.sum()
        # iterate over each sample of the batch
        batch_size = outputs_classes.size(0)
        for i in range(batch_size):
            for j in range(inputs_lengths[i]):
                correct_count += int(outputs_classes[i][j] == labels[i][j])
    return correct_count/total_count.float().item()

## The actual training loop

- runs several epochs
- in each epoch
 - forward the batch
 - computes the loss for the output of the whole batch
 - reduces (e.g. average, sum) the loss
 - computes derivatives of weights by backpropagation
 - optimizer updates weights
 - evaluate on validation/development dataset

In [None]:
NUM_EPOCHS = 5

# a nice progress bar to make the waiting time much better
pbar = tqdm(total=NUM_EPOCHS*len(dataloader_train), postfix=metric_dict)

# run for NUM_EPOCHS epochs
for epoch in range(NUM_EPOCHS):
    # run for every data (in batches) of our iterator
    
    pbar.set_description(f"Epoch {epoch + 1}/{NUM_EPOCHS}")
    for i, batch in enumerate(dataloader_train):
        # extract input and labels
        token_ids = batch["token_ids"].to(device=DEVICE)
        labels = batch["label_ids"].to(device=DEVICE)

        # forward + backward + optimize
        outputs = model(token_ids)
        
        # 2D loss function expects input as (batch, prediction, sequence) and target as (batch, sequence) containing the class index
        loss = criterion(outputs.permute(0,2,1), labels)
        # otherwise use view function to get rid of sequence dimension by effectively concatenating all sequence items
        # loss = criterion(outputs.view(-1, len(classes)), labels.view(-1))

        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print statistics
        pbar.update(labels.size(0))
        metric_dict.update({'loss': f'{loss.item():6.3f}'})
        pbar.set_postfix(metric_dict)
        
    # evaluate on validation set after each epoch
    metric_dict.update({'accuracy': f'{100*evaluate(dataloader_valid, model):6.2f}%'})
    pbar.set_postfix(metric_dict)

## Randomly predict sample from test set

In [23]:
def map_list(list_: list, mapping: dict):
    return [mapping[item] for item in list_]

def tokens_to_index(tokens: list, vocabulary: dict):
    return map_list(tokens, vocabulary)

def indices_to_class(indices: list, classes: dict):
    return map_list(indices, classes)

In [None]:
dataset_valid_tokenized = prepare_dataset(dataset["test"])
sample_idx = random.randint(1, len(dataset_valid_tokenized))
sample = dataset_valid_tokenized[sample_idx]
# map tokens to index using vocabulary
# sample_tokens_indexed = tokens_to_index(sample.Text, vocab)
# build input vector and add batch dimension
sample_tensor = torch.tensor(sample["token_ids"]).unsqueeze(dim=0).to(DEVICE)

# forward / predict
with torch.no_grad():
    # get rid of batch dimension (is set to 1)
    outputs = model(sample_tensor).squeeze(dim=0)

predictions = [itoc[output.argmax(dim=0).item()] for output in outputs]
print("Input:", ' '.join(sample["words"]))
print(f"Prediction:   {predictions}")
print(f"Ground truth: {sample["labels"]}")
accuracy = sum([1 for pred, gt in zip(predictions, sample["labels"]) if pred == gt]) / len(sample["labels"])
print(f"Accuracy: {accuracy*100:.2f}%")

### Interactive prediction

In [None]:
text = input("Please enter your text: ")

# map tokens to index using vocabulary
tokens = tokenizer(text)
tokens_indexed = tokens_to_index(tokens, vocab)
# build input vector and add batch dimension
tensor = torch.tensor(tokens_indexed).unsqueeze(dim=0).to(DEVICE)

# forward / predict
with torch.no_grad():
    # get rid of batch dimension (is set to 1)
    outputs = model(tensor).squeeze(dim=0)

print("Input:", tokens)
print("Prediction:", indices_to_class(outputs.argmax(dim=1), classes))

# Training Frameworks

* Until now: we wrote the training and evaluation loops ourselves
* However, a lot of the code is repetitive
* Frameworks help you avoid repetitive code and some solve common problems like
    * Training and evaluation loops
    * Multi-GPU / Multi-Node training
    * Early stopping
    * Creating model checkpoints
    * Hyper parameter search
    * ...
    

* Some examples:
    * [Pytorch Ignite](https://pytorch.org/ignite/index.html) ("high-level library to help with training and evaluating neural networks ")
    * [Pytorch Lightning](https://www.pytorchlightning.ai) ("The ultimate PyTorch research framework")
    * [Skorch](https://github.com/skorch-dev/skorch) ("scikit-learn compatible neural network library that wraps PyTorch")
    * Huggingface libraries ("State-of-the-art Natural Language Processing"): [Transformers](https://huggingface.co/transformers/index.html), [Datasets](https://huggingface.co/docs/datasets/)

# Example: Pytorch Ignite (taken from [website](https://pytorch.org/ignite/index.html))

```python
from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, Loss

model = Net()
train_loader, val_loader = get_data_loaders(train_batch_size, val_batch_size)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.8)
criterion = nn.NLLLoss()

trainer = create_supervised_trainer(model, optimizer, criterion)

val_metrics = {
    "accuracy": Accuracy(),
    "nll": Loss(criterion)
}
evaluator = create_supervised_evaluator(model, metrics=val_metrics)

@trainer.on(Events.ITERATION_COMPLETED(every=log_interval))
def log_training_loss(trainer):
    print(f"Epoch[{trainer.state.epoch}] Loss: {trainer.state.output:.2f}")

@trainer.on(Events.EPOCH_COMPLETED)
def log_training_results(trainer):
    evaluator.run(train_loader)
    metrics = evaluator.state.metrics
    print(f"Training Results - Epoch: {trainer.state.epoch}  Avg accuracy: {metrics['accuracy']:.2f} Avg loss: {metrics['nll']:.2f}")

@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(trainer):
    evaluator.run(val_loader)
    metrics = evaluator.state.metrics
    print(f"Validation Results - Epoch: {trainer.state.epoch}  Avg accuracy: {metrics['accuracy']:.2f} Avg loss: {metrics['nll']:.2f}")

trainer.run(train_loader, max_epochs=100)

```

# Tips  & Tricks

* Log the magnitude of your gradients. If you encounter problems with exploding gradients, you can try to clip the gradient values to a specific range using [torch.nn.utils.clip_grad_value_](https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_value_.html) or the norm over all parameters using [torch.nn.utils.clip_grad_norm_](https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html#torch.nn.utils.clip_grad_norm_) 
* [Dropout](https://pytorch.org/docs/stable/generated/torch.nn.Dropout.html)
* Ensembling
* Try [more sophisticated initialisations](https://pytorch.org/docs/stable/nn.init.html), e.g. Xavier 
* [Batch normalization](https://pytorch.org/docs/stable/nn.html#normalization-layers) (can reduce dependence on initialization, higher learning rates)
* Dataset augmentation (e.g. preprocess pictures with rotations, color shifts, mirroring, ...)
* Look for existing datasets, architectures and pre-trained models (github, pytorch model zoo, huggingface model hub, fastai, ...)
* Transfer-learning / fine-tuning pretrained models for your data