Artem Chernitsa, B20-AI-01, a.chernitsa@innopolis.university

# POS tagging

During labs, we have covered HMM, LSTM and BERT models, the goal of this assignment is to evaluate and compare these models on POS tagging task. The input is a text line and the outputs are POS tags for every word (token) in the input line.

- You should already have the code for the models from labs
- Use validation split to decide when to stop training
- Evaluate all models on test data
- You can use any PoS tagging dataset to train and test your models

Refer to:
- Lab 4 - HMM for Tagging
- Lab 10 - LSTM for Tagging
- Lab 5 - Hugging Face and BERT fine-tuning
- [Datasets](https://universaldependencies.org/)
- [Dataset from Labs 4 and 10](https://raw.githubusercontent.com/Gci04/AML-DS-2021/main/data/PosTagging/train_pos.txt)


Grading:
- 30 points - HMM
- 30 points - BiLSTM
- 30 points - BERT (for masters only)
- 40 points - Evaluation and conclusions 


Remarks: 
- Use Python 3
- Max is 100 points for bachelors, 130 points for masters

In [None]:
import requests
import numpy as np

from functools import partial

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm.notebook import tqdm
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import LambdaLR

# Hidden Markov Models for POS Tagging

You can use the Viterbi algorithm implementation from Lab 4.

In [None]:
def data_preparing(data: str) -> tuple[dict, list, set, list]:
    tokens: list[list[str]] = [line.split() for line in data.split("\n")]

    tags: set[str] = set()
    vocab: dict[str, int] = {}
    sent_tags: list[str] = []
    sents_tags: list[list[list[str]]] = []

    for token in tokens:
        if len(token):
            word = token[0].lower()
            if word not in vocab:
                vocab[word] = len(vocab)
            sent_tags.append(token)

            tags.add(token[1])
        else:
            if len(sent_tags):
                sents_tags.append(sent_tags)
            sent_tags = []

    sents = [tuple(zip(*sent)) for sent in sents_tags]
    
    return vocab, sents, tags, sents_tags


In [None]:
TRAIN_URL = "https://raw.githubusercontent.com/" \
    "Gci04/AML-DS-2021/main/data/PosTagging/train_pos.txt"
TRAIN_DATA = requests.get(TRAIN_URL).text

TEST_URL = "https://raw.githubusercontent.com/" \
    "Gci04/AML-DS-2021/main/data/PosTagging/test_pos.txt"
TEST_DATA = requests.get(TEST_URL).text

In [None]:
# data preparing
train_vocab, train_sents, train_tags, train_sents_tags = \
    data_preparing(TRAIN_DATA)

test_vocab, test_sents, test_tags, test_sents_tags = \
    data_preparing(TEST_DATA)

len(train_sents), len(test_sents)

(8936, 2012)

In [None]:
TAGS = train_tags | test_tags
VOCAB = set((train_vocab | test_vocab).keys())

TAGS_DICT = {k: v for v, k in enumerate(TAGS)}
VOCAB_DICT = {k: v for v, k in enumerate(VOCAB)}

In [None]:
# viterbi
def viterbi(y, A, B, Pi=None):
    """
    Return the MAP estimate of state trajectory of Hidden Markov Model.

    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype.
    A : array (K, K)
        State transition matrix. See HiddenMarkovModel.state_transition  for
        details.
    B : array (K, M)
        Emission matrix. See HiddenMarkovModel.emission for details.
    Pi: optional, (K,)
        Initial state probabilities: Pi[i] is the probability x[0] == i. If
        None, uniform initial distribution is assumed (Pi[:] == 1/K).

    Returns
    -------
    x : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory,
        conditioned on observation sequence y under the model parameters A, B,
        Pi.
    T1: array (K, T)
        the probability of the most likely path so far
    T2: array (K, T)
        the x_j-1 of the most likely path so far
    """
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2

In [None]:
def viterbi_decode(tokens, A, B, Pi, vocab_dict, tags):
    indexes = [vocab_dict.get(word.lower(), None) for word in tokens]
    indexes = [i for i in indexes if i is not None]  # remove None values
    if not indexes:
        return None
    x, _, _ = viterbi(np.array(indexes), A, B, Pi)
    return [tags[tag_num] for tag_num in x if tag_num < len(tags)]

In [None]:
# matrices

In [None]:
A = np.zeros((44, 44))

for sentence in train_sents_tags:
    prev_tag = ''
    for word, tag in sentence:
        if prev_tag:
            A[TAGS_DICT[prev_tag]][TAGS_DICT[tag]] += 1
        prev_tag = tag

A /= A.sum(axis=1, keepdims=True)

print(A[0])

[2.97464739e-02 5.27263940e-04 3.03176765e-03 6.21292675e-02
 4.39386616e-04 1.88936245e-03 3.66887825e-02 1.80148513e-03
 3.51509293e-04 1.44997583e-03 1.80148513e-03 4.39386616e-04
 3.08010018e-02 3.26244563e-01 0.00000000e+00 2.77252955e-02
 1.49830836e-02 0.00000000e+00 1.71360780e-03 1.36737115e-01
 1.11560262e-01 1.31815985e-03 5.84384200e-03 0.00000000e+00
 2.41662639e-03 0.00000000e+00 1.23028253e-03 4.39386616e-04
 1.75754647e-04 2.78571115e-02 3.25146096e-03 7.90895909e-04
 4.48174349e-03 0.00000000e+00 5.36051672e-03 1.75754647e-04
 8.41864757e-02 4.83325278e-04 4.39386616e-05 2.19693308e-04
 6.14701876e-02 3.42721561e-03 7.03018586e-04 6.06353530e-03]


In [None]:
B = np.zeros((44, len(VOCAB)))

for sentence in train_sents_tags:
    for word, tag in sentence:
        B[TAGS_DICT[tag]][VOCAB_DICT[word.lower()]] += 1

B /= B.sum(axis=1, keepdims=True)

print(B)

[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [None]:
Pi = np.zeros(44)
for sentence in train_sents_tags:
    tag = sentence[0][1]
    Pi[TAGS_DICT[tag]] += 1

Pi /= (len(train_sents_tags))

Pi

array([1.28916741e-01, 4.47627574e-04, 4.92390331e-03, 1.62264996e-02,
       3.13339302e-03, 2.23813787e-04, 8.61683080e-03, 5.61772605e-02,
       3.35720680e-04, 3.24529991e-03, 0.00000000e+00, 4.81199642e-03,
       5.83034915e-02, 2.12399284e-01, 0.00000000e+00, 1.34288272e-02,
       6.03178156e-02, 0.00000000e+00, 6.37869293e-03, 1.91920322e-01,
       4.52103850e-02, 1.90241719e-03, 5.25962399e-03, 6.71441361e-04,
       3.24529991e-03, 1.11906893e-04, 0.00000000e+00, 2.12623098e-03,
       1.11906893e-03, 6.71441361e-04, 7.83348254e-04, 0.00000000e+00,
       0.00000000e+00, 4.47627574e-04, 1.11906893e-03, 5.59534467e-04,
       4.48746643e-02, 2.12623098e-03, 1.11906893e-04, 0.00000000e+00,
       4.36436885e-02, 3.35720680e-03, 1.11906893e-04, 7.27394808e-02])

# LSTM for POS Tagging

Use a 2-layer BiLSTM from pytorch as we did in Lab 10
- nn.LSTM(..., num_layers=2, bidirectional=True)

In [None]:
# data preparing
for tag in {"<PAD>", "<UNK>"}:
    if tag not in TAGS_DICT:
        TAGS_DICT[tag] = len(TAGS_DICT)
    if tag not in VOCAB_DICT:
        VOCAB_DICT[tag] = len(VOCAB_DICT)

TAGS_DICT["<UNK>"], VOCAB_DICT["<UNK>"]

(44, 19460)

In [None]:
# model
def collate_fn(batch):
    batch_input, batch_output = [], []

    for sent in batch:
        tokens_batch = [token.lower() for token in sent[0]]
        tags_batch = sent[1]
        
        input_ = [
            VOCAB_DICT.get(token, VOCAB_DICT['<UNK>']) 
            for token in tokens_batch[:20]
        ]
        output = [
            TAGS_DICT.get(tag, TAGS_DICT['<UNK>']) 
            for tag in tags_batch[:20]
        ]

        input_ += [VOCAB_DICT['<PAD>']] * (20 - len(input_))
        output += [TAGS_DICT['<PAD>']] * (20 - len(output))

        batch_input.append(input_)
        batch_output.append(output)

    return \
        torch.tensor(batch_input, dtype=torch.int), \
        torch.tensor(batch_output, dtype=torch.int)

In [None]:
VALIDATION_RATIO = 0.1
BATCH_SIZE = 128

stop_validation = int(len(train_sents) * VALIDATION_RATIO)

train_dataloader = DataLoader(
    train_sents[stop_validation:],
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
)

validation_dataloader = DataLoader(
    train_sents[:stop_validation],
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
)

test_dataloader = DataLoader(
    test_sents,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
)

In [None]:
class LSTMTagger(nn.Module):
    def __init__(self, 
                 embed_dim, 
                 hidden_dim, 
                 num_layers, 
                 is_bidirectional, 
                 vocab_size, 
                 tagset_size):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.word_embeds = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            batch_first=True,
            num_layers=num_layers,
            bidirectional=is_bidirectional
        )
        lin_dim = (is_bidirectional + 1) * hidden_dim
        self.hidden2tag = nn.Linear(lin_dim, tagset_size)

    def forward(self, sent):
        embeds = self.word_embeds(sent)
        lstm_out, _ = self.lstm(embeds)
        tag_space = self.hidden2tag(lstm_out)
        tag_scores = F.log_softmax(tag_space, dim=2)
        return tag_scores

In [None]:
# training

EMBEDDING_DIM = 256
HIDDEN_DIM = 256
NUM_LAYERS = 2
BIDIRECTIONAL = True
VOCAB_SIZE = len(VOCAB_DICT)
TARGET_SIZE = len(TAGS_DICT)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

model = LSTMTagger(
    EMBEDDING_DIM,
    HIDDEN_DIM,
    NUM_LAYERS,
    BIDIRECTIONAL,
    VOCAB_SIZE,
    TARGET_SIZE
)

optimizer = optim.Adam(model.parameters())
criterion = nn.NLLLoss()

model = model.to(device)
criterion = criterion.to(device)

cuda


In [None]:
def accuracy_calculator(preds, y):
    return (preds == y).sum() / y.shape[0]

def train(model, dataloader, optimizer, criterion, device):
    epoch_loss = 0
    epoch_acc = 0

    model.train()
    
    for text, tags in dataloader:
        tags = tags.type(torch.LongTensor)
        text = text.to(device)
        tags = tags.to(device)

        # initialize optimizer
        optimizer.zero_grad()

        # predict tags
        predictions = model(text)
        predictions = predictions.view(-1, 46)
        tags = tags.view(-1)
        
        # calculate loss
        loss = criterion(predictions, tags)
        acc = accuracy_calculator(torch.argmax(predictions.view(-1, 46), dim=1), tags)
        
        # backpropagate loss and optimize weights (2 lines)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item() * tags.shape[0]
        epoch_acc += acc.item()
        
    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

In [None]:
def evaluate_model(model, data_batches, criterion, device):
    eval_loss = 0
    eval_acc = 0

    model.eval()
    with torch.no_grad():
        for text, tags in data_batches:

            tags = tags.type(torch.LongTensor)

            text = text.to(device)
            tags = tags.to(device)

            predictions = model(text)

            predictions = predictions.view(-1, 46)
            tags = tags.view(-1)

            loss = criterion(predictions, tags)

            acc = accuracy_calculator(
                torch.argmax(predictions.view(-1, 46), dim=1), tags
            )

            eval_loss += loss.item() * tags.shape[0]
            eval_acc += acc.item()

    return eval_loss / len(data_batches), eval_acc / len(data_batches)

In [None]:
PROGRESS_EPOCH = 5
epochs = 100
no_inc = 0
best_valid_acc = 0

for epoch in range(epochs):
    tr_loss, tr_acc = train(model, train_dataloader, optimizer, criterion, device)
    val_loss, val_acc = evaluate_model(model, validation_dataloader, criterion, device)
    print(f"Epoch: {epoch+1}")
    print(f"Train Loss:  {tr_loss:.3f}\t\tTrain Accuracy: {tr_acc*100:.2f}%")
    print(f"Validation Loss:  {val_loss:.3f}\tValidation Accuracy: {val_acc*100:.2f}%\n")

    if val_acc > best_valid_acc:
        best_valid_acc = val_acc
        no_inc = 0
    else:
        no_inc += 1
        if no_inc == PROGRESS_EPOCH:
            break

Epoch: 1
Train Loss:  4457.457		Train Accuracy: 54.44%
Validation Loss:  1991.179	Validation Accuracy: 76.93%

Epoch: 2
Train Loss:  1403.806		Train Accuracy: 83.79%
Validation Loss:  1126.834	Validation Accuracy: 86.75%

Epoch: 3
Train Loss:  787.530		Train Accuracy: 90.95%
Validation Loss:  836.890	Validation Accuracy: 90.25%

Epoch: 4
Train Loss:  450.680		Train Accuracy: 95.00%
Validation Loss:  719.180	Validation Accuracy: 91.60%

Epoch: 5
Train Loss:  244.373		Train Accuracy: 97.53%
Validation Loss:  675.168	Validation Accuracy: 92.47%

Epoch: 6
Train Loss:  124.109		Train Accuracy: 98.94%
Validation Loss:  689.650	Validation Accuracy: 92.77%

Epoch: 7
Train Loss:  60.558		Train Accuracy: 99.62%
Validation Loss:  708.833	Validation Accuracy: 93.07%

Epoch: 8
Train Loss:  29.924		Train Accuracy: 99.88%
Validation Loss:  729.245	Validation Accuracy: 93.26%

Epoch: 9
Train Loss:  16.474		Train Accuracy: 99.96%
Validation Loss:  745.157	Validation Accuracy: 93.41%

Epoch: 10
Train Lo

# BERT for POS Tagging

You can fine-tune a pretrained model from HuggingFace or train a model from zero. You **don't** need to implement the model from scratch.

Refer to the fine-tuning BERT part of Lab 5.

In [None]:
# data preparing

In [None]:
# model

In [None]:
# training

# Evaluation

In [83]:
# evaluate HMM
def evaluate_hmm(test_words, test_tags):
    correct, total= 0, 0
    incorrect_ids = []

    for i, test_sent in enumerate(test_words):
        pred_tags = viterbi_decode(test_sent, A, B, Pi, VOCAB_DICT, list(TAGS))
        correct_tags = test_tags[i]
        
        for j, tag in enumerate(pred_tags):
            if tag == correct_tags[j]:
                correct += 1
            total += 1
        
        if pred_tags != correct_tags:
            incorrect_ids.append(i)
    
    accuracy = correct / total
    return accuracy, incorrect_ids

In [None]:
train_word_list = []
train_tag_list = []

for sentence in train_sents_tags:
    s_words, s_tags = zip(*sentence)
    train_word_list.append(list(s_words))
    train_tag_list.append(list(s_tags))

In [76]:
hmm_acc_train, hmm_ids_train = evaluate_hmm(train_word_list, train_tag_list)
print(f'Accuracy on train data for HMM: {hmm_acc_train*100:.2f}%')

Accuracy on train data for HMM: 97.44%


In [None]:
test_word_list = []
test_tag_list = []

for sentence in test_sents_tags:
    s_words, s_tags = zip(*sentence)
    test_word_list.append(list(s_words))
    test_tag_list.append(list(s_tags))

In [84]:
hmm_acc_test, hmm_ids_test = evaluate_hmm(test_word_list, test_tag_list)
print(f'Accuracy on test data for HMM: {hmm_acc_test*100:.2f}%')

Accuracy on test data for HMM: 51.21%


In [65]:
# evaluate LSTM
test_loss, test_acc = evaluate_model(model, test_dataloader, criterion, device)
print(f'Accuracy on test data for LSTM: {test_acc*100:.2f}%')

Accuracy on test data for LSTM: 92.79%


In [None]:
# evaluate BERT

In [88]:
# print some samples where the models make mistakes
print("HMM mistakes:\n")
for idx in hmm_ids_test[:20]:
    result = viterbi_decode(
        test_word_list[idx], A, B, Pi, VOCAB_DICT, list(TAGS)
    )
    print(*zip(test_word_list[idx], result, test_tag_list[idx]))
    print("======")

HMM mistakes:

('Rockwell', 'IN', 'NNP') ('International', 'IN', 'NNP') ('Corp.', 'IN', 'NNP') ("'s", 'IN', 'POS') ('Tulsa', 'IN', 'NNP') ('unit', 'IN', 'NN') ('said', 'IN', 'VBD') ('it', 'IN', 'PRP') ('signed', 'IN', 'VBD') ('a', 'IN', 'DT') ('tentative', 'IN', 'JJ') ('agreement', 'IN', 'NN') ('extending', 'IN', 'VBG') ('its', 'IN', 'PRP$') ('contract', 'IN', 'NN') ('with', 'IN', 'IN') ('Boeing', 'IN', 'NNP') ('Co.', 'IN', 'NNP') ('to', 'IN', 'TO') ('provide', 'IN', 'VB') ('structural', 'IN', 'JJ') ('parts', 'IN', 'NNS') ('for', 'IN', 'IN') ('Boeing', 'IN', 'NNP') ("'s", 'IN', 'POS') ('747', 'IN', 'CD') ('jetliners', 'IN', 'NNS') ('.', 'IN', '.')
('Rockwell', 'IN', 'NNP') ('said', 'IN', 'VBD') ('the', 'IN', 'DT') ('agreement', 'IN', 'NN') ('calls', 'IN', 'VBZ') ('for', 'IN', 'IN') ('it', 'IN', 'PRP') ('to', 'IN', 'TO') ('supply', 'IN', 'VB') ('200', 'IN', 'CD') ('additional', 'IN', 'JJ') ('so-called', 'IN', 'JJ') ('shipsets', 'IN', 'NNS') ('for', 'IN', 'IN') ('the', 'IN', 'DT') ('plan

In [104]:
inv_vocab = {v: k for k, v in VOCAB_DICT.items()}
inv_tags_vocab = {v: k for k, v in TAGS_DICT.items()}

def show_model_issues(model, data_batches, device):
    eval_acc = 0
    count = 0

    model.eval()
    with torch.no_grad():
        for text, tags in data_batches:
            tags = tags.type(torch.LongTensor)
            text = text.to(device)
            tags = tags.to(device)
            predictions = model(text)
            predictions = predictions.view(-1, 46)
            tags = tags.view(-1)
            acc = accuracy_calculator(
                torch.argmax(predictions.view(-1, 46), dim=1), tags
            )
            if acc < 1:
                words = []
                for p in text[0]:
                    words.append(inv_vocab[p.item()])
                ans_tags = torch.argmax(predictions.view(-1, 46), dim=1)
                output = []
                for idx in range(len(tags)):
                    token = words[idx]
                    pred_token = inv_tags_vocab[ans_tags[idx].item()]
                    tag = inv_tags_vocab[tags[idx].item()]
                    output.append((token, pred_token, tag))
                print(*output)
                print("======")

In [106]:
test_dataloader_eval = DataLoader(
    test_sents[:20],
    batch_size = 1,
    collate_fn=collate_fn)
show_model_issues(model, test_dataloader_eval, device)

('rockwell', 'NNP', 'NNP') ('international', 'NNP', 'NNP') ('corp.', 'NNP', 'NNP') ("'s", 'POS', 'POS') ('tulsa', 'JJ', 'NNP') ('unit', 'NN', 'NN') ('said', 'VBD', 'VBD') ('it', 'PRP', 'PRP') ('signed', 'VBD', 'VBD') ('a', 'DT', 'DT') ('tentative', 'JJ', 'JJ') ('agreement', 'NN', 'NN') ('extending', 'NN', 'VBG') ('its', 'PRP$', 'PRP$') ('contract', 'NN', 'NN') ('with', 'IN', 'IN') ('boeing', 'NNP', 'NNP') ('co.', 'NNP', 'NNP') ('to', 'TO', 'TO') ('provide', 'VB', 'VB')
('rockwell', 'NNP', 'NNP') ('said', 'VBD', 'VBD') ('the', 'DT', 'DT') ('agreement', 'NN', 'NN') ('calls', 'VBZ', 'VBZ') ('for', 'IN', 'IN') ('it', 'PRP', 'PRP') ('to', 'TO', 'TO') ('supply', 'VB', 'VB') ('200', 'CD', 'CD') ('additional', 'JJ', 'JJ') ('so-called', 'JJ', 'JJ') ('shipsets', 'NN', 'NNS') ('for', 'IN', 'IN') ('the', 'DT', 'DT') ('planes', 'NNS', 'NNS') ('.', '.', '.') ('<PAD>', '<PAD>', '<PAD>') ('<PAD>', '<PAD>', '<PAD>') ('<PAD>', '<PAD>', '<PAD>')
('these', 'DT', 'DT') ('include', 'VBP', 'VBP') (',', ',', 

# Conclusion

Write your opinions and conclusions about the application of HMM, LSTM and BERT to PoS Tagging
- discuss the results
- pros and cons of each model
- 4-6 sentences

Answer:

The results of the evaluation reveal that LSTM model performs much better than HMM in PoS tagging task, with accuracy rates of 51.21% and 92.79%, respectively. The performance of LSTM is consistent across both validation and training sets, at around 93%. In contrast, HMM has a significantly high training set accuracy of 97.44%, but its generalization ability is lower than that of LSTM. One limitation of HMM is its **inability** to predict tags for unknown words, whereas LSTM can use **context** to make predictions. 

HMM has some advantages, such as **quick training** and the ability to perform well with a **large corpus**. However, it fails to capture context, which affects its generalization ability negatively. On the other hand, LSTM has a great **generalization ability**, which makes it more suitable for practical use. The context capturing of LSTM is limited when compared to BERT. Nonetheless, LSTM remains an effective model that performs well in PoS tagging tasks, but its training process **requiring more resources** than HMM.


