# Natural Language Processing
**Assignment 2**

*Ali Mehrabi - 9912045*

Part-of-Speech Tagging

In [None]:
# import block

import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import lightning as pl
from sklearn.preprocessing import LabelEncoder
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px

np.random.seed(43)
sgen = torch.manual_seed(43)

In [None]:
train_raw = pd.read_json("./data/POS/pos_train.json")
val_raw = pd.read_json("./data/POS/pos_val.json")
test_raw = pd.read_json("./data/POS/pos_test.json")

train_sentences = train_raw["sentences"].tolist()
train_tags = train_raw["pos_tags"].tolist()
train_tags = [" ".join(tags) for tags in train_tags]

val_sentences = val_raw["sentences"].tolist()
val_tags = val_raw["pos_tags"].tolist()
val_tags = [" ".join(tags) for tags in val_tags]

test_sentences = test_raw["sentences"].tolist()
test_tags = test_raw["pos_tags"].tolist()
test_tags = [" ".join(tags) for tags in test_tags]

print(f"Train: {len(train_raw)} Val: {len(val_raw)} Test: {len(test_raw)}")
i = np.random.randint(0, len(train_raw))
print(f"Random Sample ({i}):")
for token, tag in zip(train_sentences[i].split(" "), train_tags[i].split(" ")):
    print(f"{token:-<20}{tag}")

In [None]:
from pos_dataset import POSDataset


def build_vocab(sequences):
    vocab = {}
    for seq in sequences:
        for word in seq.split(" "):
            if word not in vocab:
                vocab[word] = len(vocab) + 1
    vocab["PAD"] = 0
    vocab["UNK"] = len(vocab) + 1
    return vocab


BATCH_SIZE = 32

word2idx = build_vocab(train_sentences)
idx2word = {idx: word for word, idx in word2idx.items()}
tag2idx = build_vocab(train_tags)
idx2tag = {idx: tag for tag, idx in tag2idx.items()}
max_len = max(len(seq) for seq in train_sentences)

train_dataset = POSDataset(train_sentences, train_tags, word2idx, tag2idx, max_len)
val_dataset = POSDataset(val_sentences, val_tags, word2idx, tag2idx, max_len)
test_dataset = POSDataset(test_sentences, test_tags, word2idx, tag2idx, max_len)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=7,
    persistent_workers=True,
    generator=sgen,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=7,
    persistent_workers=True,
    generator=sgen,
)
test_loader = DataLoader(
    test_dataset,
    batch_size=1,
    shuffle=False,
    num_workers=7,
    persistent_workers=True,
    generator=sgen,
)

In [None]:
def print_batch(dataloader, batch_num):
    for b, (words, tags) in enumerate(dataloader):
        if b == batch_num:
            print(f"Batch {b}:")
            for sentence, tags in zip(words, tags):
                for token, tags in zip(sentence, tags):
                    print(f"{idx2word[int(token)]:<15}{idx2tag[int(tags)]}")
                print("=====================================")
            break


print_batch(train_loader, 0)

In [None]:
class BiRNNPOSTagger(pl.LightningModule):
    def __init__(
        self,
        vocab_size,
        tag_size,
        embedding_dim,
        hidden_units,
        num_layers=2,
        padding_idx=0,
        learning_rate=1e-3,
        dropout=0.2,
    ):
        super(BiRNNPOSTagger, self).__init__()
        self.save_hyperparameters()
        self.embedding = torch.nn.Embedding(
            vocab_size, embedding_dim, padding_idx=padding_idx
        )
        self.embedding.weight.data[padding_idx] = torch.zeros(embedding_dim)
        self.birnn = torch.nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_units,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
        )
        self.fc = torch.nn.Linear(hidden_units * 2, tag_size)
        self.dropout = torch.nn.Dropout(dropout)
        self.training_log = []
        self.validation_log = []
        self.test_logs = {"preds": [], "targets": []}
        self.last_epoch = 0

    def forward(self, x):
        x = self.embedding(x)
        x, (_, _) = self.birnn(x)
        x = self.fc(self.dropout(x))
        return x

    def training_step(self, batch, batch_idx):
        self.train()
        x, y = batch
        logits = self.forward(x)
        loss = torch.nn.functional.cross_entropy(
            logits.view(-1, logits.shape[-1]),
            y.view(-1),
            ignore_index=self.hparams.padding_idx,
        )
        self.training_log.append((self.current_epoch, batch_idx, loss.item()))

        if self.last_epoch == self.current_epoch:
            print(
                f"\rTraining Epoch {self.current_epoch} Batch {batch_idx} Loss: {loss.item()}     ",
                end="",
            )
        else:
            print(
                f"\nTraining Epoch {self.current_epoch} Batch {batch_idx} Loss: {loss.item()}      ",
                end="",
            )
        self.last_epoch = self.current_epoch

        return loss

    def validation_step(self, batch, batch_idx):
        self.eval()
        x, y = batch
        logits = self.forward(x)
        loss = torch.nn.functional.cross_entropy(
            logits.view(-1, logits.shape[-1]),
            y.view(-1),
            ignore_index=self.hparams.padding_idx,
        )
        self.validation_log.append((self.current_epoch, batch_idx, loss.item()))

        if self.last_epoch == self.current_epoch:
            print(
                f"\rValidation Epoch {self.current_epoch} Batch {batch_idx} Loss: {loss.item()}     ",
                end="",
            )
        else:
            print(
                f"\nValidation Epoch {self.current_epoch} Batch {batch_idx} Loss: {loss.item()}     ",
                end="",
            )
        self.last_epoch = self.current_epoch

        return loss

    def test_step(self, batch, batch_idx):
        self.eval()
        with torch.inference_mode():
            x, y = batch
            logits = self.forward(x)
            preds = torch.argmax(logits, dim=-1)
            self.test_logs["preds"].append(preds.view(-1).cpu().numpy())
            self.test_logs["targets"].append(y.view(-1).cpu().numpy())

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)

In [None]:
MAX_EPOCHS = 2
vocab_size = len(word2idx)
tag_size = len(tag2idx)
model = BiRNNPOSTagger(vocab_size, tag_size, 8, 2)
trainer = pl.Trainer(max_epochs=MAX_EPOCHS, enable_checkpointing=False, logger=False)
trainer.fit(model, train_loader, val_loader)
trainer.test(model, test_loader)

In [None]:
train_epoch_marks = list({log[0] * len(train_loader)
                         for log in model.training_log})
val_epoch_marks = list({log[0] * len(val_loader)
                       for log in model.validation_log})
train_loss = [
    (log[0] * len(train_loader) + log[1], log[2]) for log in model.training_log
]
val_loss = [
    (log[0] * len(val_loader) + log[1], log[2]) for log in model.validation_log[2:]
]

fig = px.line(
    pd.DataFrame(train_loss, columns=["batch", "loss"]),
    x="batch",
    y="loss",
    title="Training Loss",
    labels={"batch": "Batch", "loss": "Loss"},
)
for mark in range(MAX_EPOCHS):
    fig.add_vline(
        x=train_epoch_marks[mark],
        line_dash="dash",
        line_color="red",
        annotation_text=f"Epoch {mark}",
    )
fig.show()

fig = px.line(
    pd.DataFrame(val_loss, columns=["batch", "loss"]),
    x="batch",
    y="loss",
    title="Validation Loss",
    labels={"batch": "Batch", "loss": "Loss"},
)
for mark in range(MAX_EPOCHS):
    fig.add_vline(
        x=val_epoch_marks[mark],
        line_dash="dash",
        line_color="red",
        annotation_text=f"Epoch {mark}",
    )
fig.show()

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

all_preds = model.test_logs["preds"]
all_labels = model.test_logs["targets"]

filtered_preds = []
filtered_labels = []
for preds, labels in zip(all_preds, all_labels):
    filtered_pred = []
    filtered_label = []
    for pred, label in zip(preds, labels):
        if label == 0 and pred == 0:
            continue
        filtered_pred.append(pred)
        filtered_label.append(label)
    filtered_preds.append(filtered_pred)
    filtered_labels.append(filtered_label)

print("Filtered predictions and labels:")
print(f'{"PRED":<10}{"TRUE":<10}{"TOKEN"}')
for idx, (pred, labels) in enumerate(zip(filtered_preds, filtered_labels)):
    sample = test_raw["sentences"].iloc[idx].split(" ")
    for p, l, s in zip(pred, labels, sample):
        print(f"{idx2tag[p]:<10}{idx2tag[l]:<10}{s}")
    print("=======================")
    if idx == 1:
        break


print("Metrics:")
report = classification_report(
    np.array(filtered_labels).flatten(),
    np.array(filtered_preds).flatten(),
    target_names=list(tag2idx.keys()),
    labels=list(tag2idx.values()),
)
print(report)
cm = confusion_matrix(filtered_labels, filtered_preds, labels=list(tag2idx.values()))
cm