In [3]:
import torch
import numpy as np
from gensim.models import Word2Vec
import gensim
from nltk.tokenize import sent_tokenize, word_tokenize
from conllu import parse
import torch.nn as nn
from torch.utils.data import Dataset
from torchtext.vocab import build_vocab_from_iterator, Vocab
from torch.nn.utils.rnn import pad_sequence
from torchmetrics.classification import MulticlassF1Score as multiclass_f1_score

In [4]:
train_file = "./ud-treebanks-v2.13/UD_English-Atis/en_atis-ud-train.conllu"
dev_file = "./ud-treebanks-v2.13/UD_English-Atis/en_atis-ud-dev.conllu"
test_file = "./ud-treebanks-v2.13/UD_English-Atis/en_atis-ud-test.conllu"

def read_conllu(file):
    with open(file, "r") as f:
        data = f.read()
    return parse(data)

train_data = read_conllu(train_file)
dev_data = read_conllu(dev_file)
test_data = read_conllu(test_file)

In [3]:
def get_sentences(data):
    sentences = []
    for sentence in data:
        sentence = [word['form'] for word in sentence]
        sentences.append(sentence)
    return sentences

train_sentences = get_sentences(train_data)
dev_sentences = get_sentences(dev_data)
test_sentences = get_sentences(test_data)


tags_to_num = {
    "ADJ": 0,
    "ADP": 1,
    "ADV": 2,
    "AUX": 3,
    "CCONJ": 4,
    "DET": 5,
    "INTJ": 6,
    "NOUN": 7,
    "NUM": 8,
    "PART": 9,
    "PRON": 10,
    "PROPN": 11,
    "PUNCT": 12,
    "SCONJ": 13,
    "SYM": 14,
    "VERB": 15,
    "X": 16,
    "PAD": 17,
}

num_to_tag = {
    0: "ADJ",
    1: "ADP",
    2: "ADV",
    3: "AUX",
    4: "CCONJ",
    5: "DET",
    6: "INTJ",
    7: "NOUN",
    8: "NUM",
    9: "PART",
    10: "PRON",
    11: "PROPN",
    12: "PUNCT",
    13: "SCONJ",
    14: "SYM",
    15: "VERB",
    16: "X",
    17: "PAD",

}

def get_tags(data):
    tags = []
    for sentence in data:
        sentence = [tags_to_num[word['upostag']] for word in sentence]
        tags.append(sentence)
    return tags

train_tags = get_tags(train_data)
dev_tags = get_tags(dev_data)
test_tags = get_tags(test_data)




In [32]:
START_TOKEN = "<s>"
END_TOKEN = "</s>"
UNKNOWN_TOKEN = "<unk>"
PAD_TOKEN = "<pad>"

class POSDataset(Dataset):
  def __init__(self, data: list[tuple[list[str], list[int]]], vocabulary:Vocab|None=None):
    """Initialize the dataset. Setup Code goes here"""
    self.sentences = [i[0] for i in data]
    self.labels = [i[1] for i in data]


    if vocabulary is None:
      self.vocabulary = build_vocab_from_iterator(self.sentences, specials=[START_TOKEN, END_TOKEN, UNKNOWN_TOKEN, PAD_TOKEN]) # use min_freq for handling unkown words better
      self.vocabulary.set_default_index(self.vocabulary[UNKNOWN_TOKEN])
    else:
      # if vocabulary provided use that
      self.vocabulary = vocabulary

    self.sentences = []
    self.labels = []
    for j,(sentence, label) in enumerate(data):
      sentence = [START_TOKEN] + sentence + [END_TOKEN]
      label = [tags_to_num["PAD"]] + label + [tags_to_num["PAD"]]

      # split into p+s+1 chunks
      self.sentences.append(sentence)
      self.labels.append(torch.nn.functional.one_hot(torch.tensor(label), num_classes=len(tags_to_num)))


  def __len__(self) -> int:
    """Returns number of datapoints."""
    return len(self.sentences)

  def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
    """Get the datapoint at `index`."""
    return torch.tensor(self.vocabulary.lookup_indices(self.sentences[index])), torch.tensor(self.labels[index])

  def collate(self, batch: list[tuple[torch.Tensor, torch.Tensor]]) -> tuple[torch.Tensor, torch.Tensor]:
    """Given a list of datapoints, batch them together"""
    sentences = [i[0] for i in batch]
    labels = [i[1] for i in batch]
    padded_sentences = pad_sequence(sentences, batch_first=True, padding_value=self.vocabulary[PAD_TOKEN]) # pad sentences with pad token id
    padded_labels = pad_sequence(labels, batch_first=True, padding_value=torch.tensor(tags_to_num["PAD"])) # pad labels with 17

    return padded_sentences, padded_labels

In [59]:
class RNN_POS_Tagger(nn.Module):
    def __init__(self, vocabulary_size: int):
        super().__init__()
        self.embedding = nn.Embedding(vocabulary_size, 100)
        self.lstm = nn.LSTM(100, 100, batch_first=True)
        self.linear = nn.Sequential(
            nn.Linear(100, 18),
            nn.LogSoftmax(dim=2)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = self.linear(x)
        return x

In [60]:

train_dataset = POSDataset( list(zip(train_sentences, train_tags)))
dev_dataset = POSDataset(list(zip(dev_sentences, dev_tags)), vocabulary=train_dataset.vocabulary)
test_dataset = POSDataset(list(zip(test_sentences, test_tags)), vocabulary=train_dataset.vocabulary)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, collate_fn=train_dataset.collate, shuffle=True)
dev_loader = torch.utils.data.DataLoader(dev_dataset, batch_size=32, collate_fn=dev_dataset.collate)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, collate_fn=test_dataset.collate)

In [65]:
rnn = RNN_POS_Tagger(len(train_dataset.vocabulary))
criterion = nn.NLLLoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# rnn = torch.nn.DataParallel(rnn)
rnn.to(device)

from tqdm import tqdm

for epoch in range(10):
    for step, (word, tag) in enumerate(train_loader):
        word, tag = word.to(device), tag.to(device)
        optimizer.zero_grad()
        output = rnn(word)
        tag = tag.float()
        loss = 0
        for i in range(tag.shape[1]):
            loss += criterion(output[:,i,:], torch.argmax(tag[:,i,:], dim=1))
        loss.backward()
        optimizer.step()

        if step%1000 == 0:
            print(f"Epoch {epoch} Step {step} Loss: {loss.item():.3f}")

    correct = 0
    total = 0
    with torch.no_grad():
        dev_predictions = []
        dev_labels = []
        for word, tag in dev_loader:
            word, tag = word.to(device), tag.to(device)
            output = rnn(word)
            output = torch.argmax(output, dim=2)
            tag = torch.argmax(tag, dim=2)
            for i in range(tag.shape[1]):
                correct += (output[:,i] == tag[:,i]).sum()
                dev_predictions.extend(output[:,i].tolist())
                dev_labels.extend(tag[:,i].tolist())
                total += tag.shape[0]

        # caclulate f1 score
        dev_f1_score = multiclass_f1_score(torch.tensor(dev_predictions), torch.tensor(dev_labels), num_classes=len(tags_to_num), average='macro')



    print()
    print(f"Epoch {epoch} Accuracy: {correct/total:.3f} Dev F1 Score: {dev_f1_score:.3f}")
    print()
        

correct = 0
total = 0
with torch.no_grad(): 
    test_predictions = []
    test_labels = []
    for word, tag in test_loader:
        word, tag = word.to(device), tag.to(device)
        output = rnn(word)
        output = torch.argmax(output, dim=2)
        tag = torch.argmax(tag, dim=2)
        for i in range(tag.shape[1]):
            correct += (output[:,i] == tag[:,i]).sum()
            test_predictions.extend(output[:,i].tolist())
            test_labels.extend(tag[:,i].tolist())
            total += tag.shape[0]

    # caclulate f1 score
    test_f1_score = multiclass_f1_score(torch.tensor(test_predictions), torch.tensor(test_labels),num_classes=len(tags_to_num), average='macro')



print()
print(f"Test Accuracy: {correct/total:.3f} Test F1 Score: {test_f1_score:.3f}")

  return torch.tensor(self.vocabulary.lookup_indices(self.sentences[index])), torch.tensor(self.labels[index])


Epoch 0 Step 0 Loss: 60.592





Epoch 0 Accuracy: 0.934 Dev F1 Score: 0.602

Epoch 1 Step 0 Loss: 5.027





Epoch 1 Accuracy: 0.975 Dev F1 Score: 0.854

Epoch 2 Step 0 Loss: 2.526





Epoch 2 Accuracy: 0.981 Dev F1 Score: 0.871

Epoch 3 Step 0 Loss: 1.735





Epoch 3 Accuracy: 0.983 Dev F1 Score: 0.880

Epoch 4 Step 0 Loss: 0.784





Epoch 4 Accuracy: 0.982 Dev F1 Score: 0.882

Epoch 5 Step 0 Loss: 0.866





Epoch 5 Accuracy: 0.985 Dev F1 Score: 0.893

Epoch 6 Step 0 Loss: 0.684





Epoch 6 Accuracy: 0.986 Dev F1 Score: 0.896

Epoch 7 Step 0 Loss: 0.859





Epoch 7 Accuracy: 0.987 Dev F1 Score: 0.898

Epoch 8 Step 0 Loss: 0.831





Epoch 8 Accuracy: 0.987 Dev F1 Score: 0.898

Epoch 9 Step 0 Loss: 0.629





Epoch 9 Accuracy: 0.987 Dev F1 Score: 0.899


Test Accuracy: 0.989 Test F1 Score: 0.962


In [72]:
sentence = "An apple a day keeps the doctor away"
sentence = sentence.lower()
sentence = word_tokenize(sentence)
sentence = test_sentences[1]
sentence = [START_TOKEN] + sentence + [END_TOKEN]

sentence = torch.tensor(train_dataset.vocabulary.lookup_indices(sentence)).to(device)
# split into chunks of p+s+1
tags = [tags_to_num["PAD"]] + test_tags[1] + [tags_to_num["PAD"]]
output = rnn(sentence.unsqueeze(0))
output = torch.argmax(output, dim=2)
for i in range(output.shape[1]):
    print(train_dataset.vocabulary.get_itos()[sentence[i].item()], num_to_tag[output[0,i].item()], num_to_tag[tags[i]])

<s> PAD PAD
i PRON PRON
want VERB VERB
a DET DET
flight NOUN NOUN
from ADP ADP
nashville PROPN PROPN
to ADP ADP
seattle PROPN PROPN
that ADP ADP
arrives VERB VERB
no DET DET
later ADJ ADJ
than ADP ADP
3 NUM NUM
pm NOUN NOUN
</s> PAD PAD
