# Simple RNN PoS Tagger

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchtext import data
from torchtext import datasets

import time

import random
import numpy as np

SEED = 1234

## Setting up a seed value ensures that each time you run the model you get same random initializations. 
## i.e. same results each time you run the model. 

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

## The dataset is structured as it is divided into 3 sections, and to store these three sections
## into three different objects, we make 3 fields: TEXT, UD_TAGS and PTB_TAGS on which we will be loading our dataset. 
TEXT = data.Field(lower = True)
UD_TAGS = data.Field(unk_token = None)
PTB_TAGS = data.Field(unk_token = None)
fields = (("text", TEXT), ("udtags", UD_TAGS), ("ptbtags", PTB_TAGS))

In [3]:
## Loading the UDPOS dataset, and dividing it into training, validation and testing datasets. 
train_data, valid_data, test_data = datasets.UDPOS.splits(fields)

## You can check length datasets by following command. 
print(f"Number of training examples: {len(train_data)}")
print(f"Number of validation examples: {len(valid_data)}")
print(f"Number of testing examples: {len(test_data)}")

downloading en-ud-v2.zip


en-ud-v2.zip: 100%|██████████| 688k/688k [00:01<00:00, 421kB/s]  


extracting
Number of training examples: 12543
Number of validation examples: 2002
Number of testing examples: 2077


In [4]:
## You can look at the dataset which we have loaded by using the below commands. 
print('Tokenized_text                     :',(train_data.examples[3].text))
print('Respective UD POS Tags             :',(train_data.examples[3].udtags))
print('Respective Penn Tree Bank POS Tags :',(train_data.examples[3]).ptbtags)

Tokenized_text                     : ['two', 'of', 'them', 'were', 'being', 'run', 'by', '2', 'officials', 'of', 'the', 'ministry', 'of', 'the', 'interior', '!']
Respective UD POS Tags             : ['NUM', 'ADP', 'PRON', 'AUX', 'AUX', 'VERB', 'ADP', 'NUM', 'NOUN', 'ADP', 'DET', 'PROPN', 'ADP', 'DET', 'PROPN', 'PUNCT']
Respective Penn Tree Bank POS Tags : ['CD', 'IN', 'PRP', 'VBD', 'VBG', 'VBN', 'IN', 'CD', 'NNS', 'IN', 'DT', 'NNP', 'IN', 'DT', 'NNP', '.']


In [5]:
'''
########################################################################################################################################################
## This cell will take around 10 minutes to run, as it downloads the glove.6B.100d vocabulary locally before using it. And it's size is around 822MB. ##
########################################################################################################################################################
'''

## The data currently is in string format, and as we know neural network only take numbers as input. 

## So, we need to map the words to certain numerical space where it's semantic remains intact and the
## corresponding vectors in that space will be representation of the corresponding words and are called "WORD VECTORS" or "WORD EMBEDDINGS".

## Their are several techniques available to find good quality word vectors. But we would be just taking the word vectors from a standardized source.
## GLOVE is a standardized vocabulary of word vectors. 

## MIN_FREQ denotes that the minimum frequency of words in your dataset you would like to consider, or upload word embeddings for. 
MIN_FREQ = 2

## The command below extract the vocabulary of word embeddings of all the words occuring in the training data whose frequency is more then MIN_FREQ.
TEXT.build_vocab(train_data, 
                 min_freq = MIN_FREQ,
                 vectors = "glove.6B.100d", 
                 unk_init = torch.Tensor.normal_)

## The below command forms the vocabulary of all the tags availble in the training dataset. 
UD_TAGS.build_vocab(train_data)
PTB_TAGS.build_vocab(train_data)

.vector_cache/glove.6B.zip: 862MB [12:27, 1.15MB/s]                               
100%|█████████▉| 399907/400000 [00:40<00:00, 6675.30it/s] 

In [None]:
## You can check the number of unique words in your vocabulary.
print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in UD_TAG vocabulary: {len(UD_TAGS.vocab)}")
print(f"Unique tokens in PTB_TAG vocabulary: {len(PTB_TAGS.vocab)}")

In [None]:
## You can look at the 20 most frequent words occuring in your dataset, and the set of all POS tags available in the dataset.
print('20 most frequent words in vocabulary :' ,TEXT.vocab.freqs.most_common(20))
print('All the POS tags available           :',UD_TAGS.vocab.itos)

In [None]:
## A utlity function to calculate the percentage of all the tag_counts. 
def tag_percentage(tag_counts):    
    total_count = sum([count for tag, count in tag_counts])
    tag_counts_percentages = [(tag, count, count/total_count) for tag, count in tag_counts]
    return tag_counts_percentages

print("Tag  Occurences Percentage\n")
for tag, count, percent in tag_percentage(UD_TAGS.vocab.freqs.most_common()):
    print(f"{tag}\t{count}\t{percent*100:4.1f}%")

In [None]:
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

In [None]:
class RNNPOSTagger(nn.Module):
    def __init__(self, 
                 vocab_size, 
                 embedding_dim, 
                 hidden_dim, 
                 output_dim, 
                 n_layers, 
                 bidirectional, 
                 dropout, 
                 pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers = n_layers, 
                           bidirectional = bidirectional)
        
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):

        #text = [sent len, batch size]
        
        embedded = self.dropout(self.embedding(text))
        
        #embedded = [sent len, batch size, emb dim]
        
        outputs, (hidden, cell) = self.rnn(embedded)
        
        #output = [sent len, batch size, hid dim * n directions]
        #hidden/cell = [n layers * n directions, batch size, hid dim]
        
        predictions = self.fc(self.dropout(outputs))
        
        #predictions = [sent len, batch size, output dim]
        
        return predictions


In [None]:
## NETWORK ARCHITECTURE DEFINITION

# LOOK UP THE DOCUMENTATION OF EACH TO FILL IN THE PARAMETERS
class RNNPOSTagger(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx):
        super().__init__()

        '''
        Here, 
        vocab_size    : Size of the input vocabulary you are using. 
        embedding_dim : Size of the word embeddings being used. 
        hidden_dim    : Hidden Layer dimensions. 
        output_dim    : Dimension of the output you require. 
        n_layers      : Number of stacked up recurrent layers you require 
        bidirectional : Whether you want your network to be bidirectional or not. 
        dropout       : Dropout rate you want.
        pad_idx       : Padding index, padding occurs if it is 1 else not. 
        '''

        ### TASK 1: Define a generalized network for POS Tagging using above parameters. 

        ## Define embedding layer: It takes your one hot vectors to your less dimensional word embeddings. 
        self.embedding = nn.Embedding() 

        ## Define LSTM layer. 
        self.rnn = nn.LSTM()

        ## Final fully connected layer to map hidden space to output space.  
        ## Note that you have to consider whether or not your network is bidirectional for defining the layer dimensions. 
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)

        ## This layer define the drop out rate. (No need to do anything)
        self.dropout = nn.Dropout(dropout)
        


    def forward(self, text):
        
        ## TASK 2: Define the forward pass of the network. 

        ## text_dimensions = [sent len, batch size] 
        ## Add the initial input inside the self.dropout, it will allow dropouts in this layer.
        embedded = self.dropout()
        #embedded_dimensions = [sent len, batch size, emb dim]
        
        ## Forward pass through LSTM Layer. 
        outputs, (hidden, cell) =  self.rnn(embedded)
        ## output_dimensions = [sent len, batch size, hid dim * n directions]
        ## hidden/cell_dimensions = [n layers * n directions, batch size, hid dim]
        
        # Final Output Layer
        predictions =  self.fc(self.dropout(outputs))
        #predictions = [sent len, batch size, output dim]
        
        return predictions



In [None]:
## INPUT_DIM is the length of one-hot vectors of size of your Vocabulary. 
INPUT_DIM = len(TEXT.vocab)

## Embedding dimensions is the dimension of pretrained embeddings we have.
EMBEDDING_DIM = 100

## The Hidden Layer Dimensions. 
HIDDEN_DIM = 128

## THe dimension of output we require. 
OUTPUT_DIM = len(UD_TAGS.vocab)

## Number of stacked recurrent layers you require. 
N_LAYERS = 2

## Bidirectional LSTM ensures that we capture both the forward and backward flow of semantics. 
BIDIRECTIONAL = True

## Dropout rate determines proportion of random connections the model can drop for a particular interation.
DROPOUT = 0.25

## The map from vocab to index. 
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
#print(type(PAD_IDX))

model = RNNPOSTagger(INPUT_DIM, 
                     EMBEDDING_DIM, 
                     HIDDEN_DIM, 
                     OUTPUT_DIM, 
                     N_LAYERS, 
                     BIDIRECTIONAL, 
                     DROPOUT, 
                     PAD_IDX)

In [None]:
## Weight Initialization Function
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.normal_(param.data, mean=0, std=0.1)
model.apply(init_weights)


In [None]:
## Parameter Counting Function
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:

pretrained_embeddings = TEXT.vocab.vectors
print(pretrained_embeddings.shape)

## Copying the pretrained embeddings into our model. 
model.embedding.weight.data.copy_(pretrained_embeddings)

## Padding the short sentences to larger size sentences. 
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

print(model.embedding.weight.data)

In [None]:
## Optimization technique to be used. 
optimizer = optim.Adam(model.parameters())

TAG_PAD_IDX = UD_TAGS.vocab.stoi[UD_TAGS.pad_token]
## Criterion definition
criterion = nn.CrossEntropyLoss(ignore_index = TAG_PAD_IDX)

## Transfering the models to the device on which we will train the network
model = model.to(device)
criterion = criterion.to(device)

In [None]:
## Function to calculate the categorical_accuracy. 
def categorical_accuracy(preds, y, tag_pad_idx):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    ## Here we are taking the Tag with highest confidence as the POS tag of that particular word. 
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    non_pad_elements = (y != tag_pad_idx).nonzero()
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / torch.FloatTensor([y[non_pad_elements].shape[0]])

In [None]:
def train(model, iterator, optimizer, criterion, tag_pad_idx):
    
    ### TASK 3: Write the training function
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        text = batch.text
        tags = batch.udtags
        
        optimizer.zero_grad()
        
        ''' Run the model on the text input. '''
        predictions = 
      
        #predictions = [sent len, batch size, output dim]
        #tags = [sent len, batch size]
        predictions = predictions.view(-1, predictions.shape[-1])
        tags = tags.view(-1)
        #predictions = [sent len * batch size, output dim]
        #tags = [sent len * batch size]
        
        ''' Loss Calculation'''
        loss = criterion()
                
        acc = categorical_accuracy(predictions, tags, tag_pad_idx)
        
        ''' Write command for backpropagation'''
        # loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
## Function for evaluating our model while training.
def evaluate(model, iterator, criterion, tag_pad_idx):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            text = batch.text
            tags = batch.udtags
            
            predictions = model(text)
            
            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)
            
            loss = criterion(predictions, tags)
            
            acc = categorical_accuracy(predictions, tags, tag_pad_idx)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
## A utility function to calculate the epoch time. 
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
''' Final training loop '''

N_EPOCHS = 10

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, TAG_PAD_IDX)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, TAG_PAD_IDX)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut1-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
''' Evaluating on test set '''
test_loss, test_acc = evaluate(model, test_iterator, criterion, TAG_PAD_IDX)

print(f'Test Loss: {test_loss:.3f} |  Test Acc: {test_acc*100:.2f}%')