## Import needed packages

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
from typing import List

torch.manual_seed(1)

<torch._C.Generator at 0x7fb7fc18aad0>

## Prepare data and create mapping dictionaries

In [2]:
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)


training_data = [
    ('I work at The University of British Columbia'.split(), ['O','O','O','B-org','I-org','I-org','I-org','I-org']),
    ('Vancouver is a beautiful city'.split(), ['B-geo', 'O', 'O', 'O','O']),
    ('Apple is about to unveil the newest device'.split(), ['B-org','O','O','O','O','O','O','O'])
]

# get word-to-id and tag-to-id dictionary
word_to_ix = {}
for sent, tags in training_data:
    for word in sent:
        if word not in word_to_ix:  
            word_to_ix[word] = len(word_to_ix)  

tag_to_ix = {'O':0, 'B-org':1, 'I-org':2, 'B-geo':3}

# get id-to-word and id-to-tag dictionary
ix_to_word = {ix:word for word, ix in word_to_ix.items()}
ix_to_tag = {ix:tag for tag, ix in tag_to_ix.items()}

## Define BiLSTM model architecture

In [3]:
class LSTMTagger(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                            num_layers=1, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

## Define evaluation function for precison, recall, accuracy, F score

In [4]:
def evaluate(predictions: List[List[int]], ground_truths: List[List[int]]) -> tuple: #NER precision, recall have many different definitions, refer to https://github.com/MantisAI/nervaluate
  # TP: this token is a named entity of type X and it is predicted X
  # FP: this token is not a named entity but is predicted to be one
  # TN: this token is not a named entity and it is predicted as O
  # FN: this token is a name entity but is not predicted to be one
  TP, FP, TN, FN = 0, 0, 0, 0
  named_entity_tags = ix_to_tag.keys() - {0} # all named entity tags but 0 which represents the id of non-named-entity token
  for i in range(len(predictions)):
    prediction = predictions[i]
    for j in range(len(prediction)):
      token_tag_pred = prediction[j]
      token_tag_ground_truth = ground_truths[i][j]

      if (token_tag_pred in named_entity_tags):
        if token_tag_pred == token_tag_ground_truth: # TP
          TP += 1
        else:
          FP += 1
      else: #not predicted as named entity
        if token_tag_pred == token_tag_ground_truth: # TP
          TN += 1
        else:
          FN += 1
  # precision = TP / (TP + FP) if (TP + FP) != 0 else 1 #avoid devide by zero
  # recall = TP / (TP + FN) if (TP + FN) != 0 else 1 #avoid devide by zero
  acc = (TP + TN) / (TP + FP + TN + FN)
  # f_score = 2 * (recall * precision) / (recall + precision) if (recall + precision) != 0 else 0 #avoid devide by zero
  # return precision, recall, acc, f_score
  return acc

## Start training, compare metrics bofore and after training

In [5]:
EMBEDDING_DIM = 32
HIDDEN_DIM = 100
model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

Y = [[tag_to_ix[tag] for tag in y] for x,y in training_data]

# doing inference before training
with torch.no_grad():
    Y_hat = []
    for sentence, tags in training_data:
      inputs = prepare_sequence(sentence, word_to_ix)
      # (log) prob distribution of tags for each token
      tag_scores = model(inputs) # a matrix, e.g. [[-5.7220e+00, -9.6667e-02, -4.2361e+00, -3.5866e+00, -3.6130e+00, -3.9250e+00],[-7.2715e+00, -2.7125e-02, -5.5188e+00, -4.0830e+00, -5.5646e+00,-6.5950e+00],...], each sublist is a (log) distribution of each token
      # argmax to extract the index of highest-prob tag; and convert tensor back to list for further procedure (there is no map() for tensor to convert id back to tag and apply_() is in-place operation which requires converted dtype to be identical to what tensor is declared)
      pred = torch.argmax(tag_scores, dim=1).tolist() 
      Y_hat.append(pred)

# precision, recall, acc, f_score = evaluate(Y_hat, Y)
# print('Before training:', {'precision':precision, 'recall':recall, 'acc':acc, 'f_score': f_score})
before_training_acc = evaluate(Y_hat, Y)
print('Before training acc:', before_training_acc)

for epoch in range(300): 
    for sentence, tags in training_data:
        model.zero_grad()

        # convert word/tag to ids and convert list to tensor
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)

        # forward pass
        tag_scores = model(sentence_in)

        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()

# doing inference after training
with torch.no_grad():
    Y_hat = []
    for sentence, tags in training_data:
      inputs = prepare_sequence(sentence, word_to_ix)
      # (log) prob distribution of tags for each token
      tag_scores = model(inputs) # a matrix, e.g. [[-5.7220e+00, -9.6667e-02, -4.2361e+00, -3.5866e+00, -3.6130e+00, -3.9250e+00],[-7.2715e+00, -2.7125e-02, -5.5188e+00, -4.0830e+00, -5.5646e+00,-6.5950e+00],...], each sublist is a (log) distribution of each token
      # argmax to extract the index of highest-prob tag; and convert tensor back to list for further procedure (there is no map() for tensor to convert id back to tag and apply_() is in-place operation which requires converted dtype to be identical to what tensor is declared)
      pred = torch.argmax(tag_scores, dim=1).tolist() 
      Y_hat.append(pred)

# precision, recall, acc, f_score = evaluate(Y_hat, Y)
# print('After training:', {'precision':precision, 'recall':recall, 'acc':acc, 'f_score': f_score})
after_training_acc = evaluate(Y_hat, Y)
print('After training acc:', after_training_acc)

Before training acc: 0.14285714285714285
After training acc: 1.0


## References: https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html