In [47]:
import torch
import torchtext
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torchtext import data, datasets
import spacy
import numpy as np
import time
import random
import csv
from torchtext.datasets import UDPOS
import spacy_udpipe
from torch.utils.data import IterableDataset
import torchdata
from torchdata.datapipes.iter import IterableWrapper
from torchtext.vocab import GloVe
from nltk.tokenize import word_tokenize
from nltk.corpus import treebank
from nltk.tag.util import untag
from conllu import parse
import spacy
spacy.load('en_core_web_sm')
from spacy.lang.en import English
from functools import partial
from torch.nn import init
import re

In [2]:
#loading data
train_data = open('en_ewt-ud-train.conllu', 'r', encoding='utf-8').read().strip().split('\n\n')
test_data = open('en_ewt-ud-test.conllu', 'r', encoding='utf-8').read().strip().split('\n\n')
dev_data = open('en_ewt-ud-dev.conllu', 'r', encoding='utf-8').read().strip().split('\n\n')

In [3]:
en_core_web_sm = spacy.load('en_core_web_sm')

TEXT = torchtext.data.Field(tokenize=partial(en_core_web_sm.tokenizer, keep_spacy_tokens=True))
UD_TAGS = torchtext.data.Field(unk_token=None)

# Load the CoNLL-U data
train_data, valid_data, test_data = torchtext.datasets.UDPOS.splits(fields=(('text', TEXT), ('udtags', UD_TAGS)))


In [4]:
# building vocabulary
# using GloVe prebuilt embeddings to initialize (they perform better than ones we initialize)
freq = 5
TEXT.build_vocab(train_data,
                 min_freq=freq,
                 vectors=GloVe(name='6B', dim=100),
                 unk_init=torch.Tensor.normal_)


UD_TAGS.build_vocab(train_data)


In [5]:
BATCH_SIZE = 128

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), batch_size = BATCH_SIZE, device = device)


In [6]:
class POS_TAGGER(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx = pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers = 2)
        self.fc = nn.Linear(hidden_dim, output_dim)
        # if bidirectional:
        #     self.fc = nn.Linear(hidden_dim * 2, output_dim)
        # else:
        #     self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, text):        
        embedded = self.dropout(self.embedding(text))
        outputs, (hidden, cell) = self.lstm(embedded)
        hidden = self.dropout(outputs)
        return self.fc(hidden)

    
    def init_weights(self):
        for name, param in self.named_parameters():
            if 'embedding' not in name:
                nn.init.normal_(param.data, mean=0, std=0.1)

In [7]:
model = POS_TAGGER(len(TEXT.vocab), 100, 128, len(UD_TAGS.vocab), TEXT.vocab.stoi[TEXT.pad_token])
model.init_weights()
model.embedding.weight.data.copy_(TEXT.vocab.vectors)
model.embedding.weight.data[TEXT.vocab.stoi[TEXT.pad_token]] = torch.zeros(100)
optimizer = optim.Adam(model.parameters())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

padding_idx = UD_TAGS.vocab['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index = padding_idx)
criterion = criterion.to(device)

In [8]:
def accuracy_(pred, targ, tag_pad_idx):
    # Step 1: Flatten the inputs
    pred = pred.view(-1, pred.shape[-1])
    targ = targ.view(-1)  
    
    # Step 2: Ignore the pad elements
    non_pad_elements = targ != tag_pad_idx
    
    # Step 3: Get the index of the max probability
    max_preds = pred.argmax(dim=1) 
    
    # Step 4: Count the number of correct predictions
    correct = max_preds[non_pad_elements].eq(targ[non_pad_elements])
    num_correct = correct.sum().item()
    
    # Step 5: Calculate the accuracy
    num_total = non_pad_elements.sum().item()
    acc = num_correct / num_total
    
    return torch.tensor(acc)

In [9]:
def train(model, iterator, optimizer, criterion, padding_idx):
    epoch_loss = 0
    epoch_acc = 0
    model.train()
    
    for batch in iterator: 
        text = batch.text
        tags = batch.udtags
        
        # Step 6.1: zero the gradients
        optimizer.zero_grad()
        # Step 6.2: insert the batch of text into the model to get predictions
        predictions = model(text)
        # Step 6.3: reshape the predictions
        predictions = predictions.view(-1, predictions.shape[-1])
        tags = tags.view(-1)
        # Step 6.4: calculate loss and accuracy between the predicted tags and actual tags
        loss = criterion(predictions, tags)
        acc = accuracy_(predictions, tags, padding_idx)
        # Step 6.5: call backward to calculate the gradients of the parameters w.r.t. the loss        
        loss.backward()
        # Step 6.6: optimizer step to update the parameters
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [48]:
def eval_(model, iterator, criterion, padding_idx):
    # model (nn.Module): The model to evaluate.
    # iterator (torchtext.data.Iterator): The data iterator to evaluate on.
    # criterion (nn.Module): The loss function to use for evaluation.
    # padding_idx(int): The padding index for the tags.
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    true_labels = []
    predicted_labels = []

    with torch.no_grad():
        for batch in iterator:
            text, tags = batch.text, batch.udtags
            predictions = model(text)
            batch_size, seq_len, n_tags = predictions.shape
            predictions = predictions.view(-1, n_tags)
            tags = tags.view(-1)
            loss = criterion(predictions, tags)
            acc = accuracy_(predictions, tags, padding_idx)
            epoch_loss += loss.item() * batch.batch_size
            epoch_acc += acc.item() * batch.batch_size
            predicted_labels.extend(torch.argmax(predictions, dim=-1).tolist())
            true_labels.extend(tags.tolist())

        target_names = [UD_TAGS.vocab.itos[i] for i in range(len(UD_TAGS.vocab))]
        report = classification_report(true_labels, predicted_labels, target_names=target_names, output_dict=True)



    return epoch_loss / len(iterator), epoch_acc / len(iterator), report

In [49]:
# N_EPOCHS = 10

# best_valid_loss = float('inf')

# for epoch in range(N_EPOCHS):
#     train_loss, train_acc = train(model, train_iterator, optimizer, criterion, padding_idx)
#     valid_loss, valid_acc, REPORT_ = eval_(model, valid_iterator, criterion, padding_idx)
    
#     if valid_loss < best_valid_loss:
#         best_valid_loss = valid_loss
#         torch.save(model.state_dict(), 'trained_model.pt')

    
#     print(f'Epoch: {epoch+1:02}')
#     print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
#     print(f'\tVal. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')

  _warn_prf(average, modifier, msg_start, len(result))


Epoch: 01
	Train Loss: 0.381 | Train Acc: 87.53%
	Val. Loss: 67.372 | Val. Acc: 10408.52%
Epoch: 02
	Train Loss: 0.370 | Train Acc: 87.87%
	Val. Loss: 67.012 | Val. Acc: 10512.13%
Epoch: 03
	Train Loss: 0.363 | Train Acc: 88.05%
	Val. Loss: 67.032 | Val. Acc: 10382.02%
Epoch: 04
	Train Loss: 0.355 | Train Acc: 88.33%
	Val. Loss: 66.682 | Val. Acc: 10443.39%
Epoch: 05
	Train Loss: 0.349 | Train Acc: 88.49%
	Val. Loss: 67.038 | Val. Acc: 10423.22%
Epoch: 06
	Train Loss: 0.345 | Train Acc: 88.54%
	Val. Loss: 66.265 | Val. Acc: 10517.83%
Epoch: 07
	Train Loss: 0.337 | Train Acc: 88.77%
	Val. Loss: 65.761 | Val. Acc: 10536.38%
Epoch: 08
	Train Loss: 0.331 | Train Acc: 89.00%
	Val. Loss: 65.682 | Val. Acc: 10540.01%
Epoch: 09
	Train Loss: 0.327 | Train Acc: 89.10%
	Val. Loss: 65.414 | Val. Acc: 10459.50%
Epoch: 10
	Train Loss: 0.322 | Train Acc: 89.22%
	Val. Loss: 65.546 | Val. Acc: 10545.77%


In [50]:
# print(REPORT_)

{'<pad>': {'precision': 0.0, 'recall': 0.0, 'f1-score': 0.0, 'support': 4938}, 'NOUN': {'precision': 0.5814134495641345, 'recall': 0.8899213724088635, 'f1-score': 0.7033236041803974, 'support': 4197}, 'PUNCT': {'precision': 0.9931707317073171, 'recall': 0.990593577684074, 'f1-score': 0.991880480675544, 'support': 3083}, 'VERB': {'precision': 0.7133333333333334, 'recall': 0.8129522431259045, 'f1-score': 0.7598917822117011, 'support': 2764}, 'PRON': {'precision': 0.6624843161856964, 'recall': 0.9522091974752029, 'f1-score': 0.781354051054384, 'support': 2218}, 'ADP': {'precision': 0.8756428237494156, 'recall': 0.925852694018784, 'f1-score': 0.9000480538202789, 'support': 2023}, 'DET': {'precision': 0.956386292834891, 'recall': 0.9720316622691293, 'f1-score': 0.9641455116461659, 'support': 1895}, 'PROPN': {'precision': 0.38359543632439097, 'recall': 0.6624068157614483, 'f1-score': 0.48584260886545594, 'support': 1878}, 'ADJ': {'precision': 0.765695067264574, 'recall': 0.7635550586920067, 

In [51]:
def tag_sentence(model, device, sentence, text_field, tag_field):
    model.eval()
    tokens = sentence.split()
    token_emb = [text_field.vocab.stoi[t] for t in tokens]
    token_tensor = (torch.LongTensor(token_emb)).unsqueeze(-1).to(device)         
    predictions = (model(token_tensor)).argmax(-1)    
    predicted_tags = [tag_field.vocab.itos[t.item()] for t in predictions]
    
    return tokens, predicted_tags

In [55]:
input_sent = input("input sentence: \n")
input_sent = input_sent.lower()
text = input_sent
# Replace multiple spaces with a single space
text = re.sub(r'\s+', ' ', text)

# replace numbers with <NUM>
text = re.sub("^\d+\s|\s\d+\s|\s\d+$", " <NUM> ", text)
text = re.sub(r'\b\d+\w*\b', '<NUM>', text)
text = re.sub(r'\w*\d\w*', '<NUM>', text)

# contractions
text = re.sub(r"can't", "can not", text)
text = re.sub(r"won't", "will not", text)

# hypens and underscore characters at beginning and end of words
text = re.sub(r'(\b|\-|_)(\w+)\-?(\b|\-|_)', r'\2 ', text)
text = re.sub(r'(\b|\-|_)(\w+)\-?(\b|\-|_)', r'\2 ', text)

# Ensure that there's a space between punctuation and words
text = re.sub(r'(\w)([.,!?])', r'\1 \2', text)
text = re.sub(r'([.,!?])(\w)', r'\1 \2', text)

# Replace URLs with <URL>
text = re.sub(r'(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?', '<URL>', text)

# Replace hashtags with <HASHTAG>
text = re.sub(r'#\w+', '<HASHTAG>', text)

# Replace mentions with <MENTION>
text = re.sub(r'@\w+', '<MENTION>', text)

# Replace with <PERCENT>
text = re.sub(r'(\d+(\.\d+)?%)', "<PERCENT>", text)


In [56]:
print(input_sent)

too many! it/they felt the force of the gravity of an earth even though the stuff was kinda heavy...


In [57]:
tokens, pred_tags = tag_sentence(model, device, input_sent, TEXT, UD_TAGS)
print("Pred. Tag\tToken\n")
for token, pred_tag in zip(tokens, pred_tags):
    print(f"{pred_tag}\t\t{token}")

Pred. Tag	Token

ADV		too
ADJ		many!
NOUN		it/they
VERB		felt
DET		the
NOUN		force
ADP		of
DET		the
NOUN		gravity
ADP		of
DET		an
NOUN		earth
ADV		even
SCONJ		though
DET		the
NOUN		stuff
AUX		was
ADJ		kinda
VERB		heavy...
