In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.nn.utils.rnn import pad_sequence
from torch.optim import Adam
import torchtext
from torchtext.data import get_tokenizer

from IPython.display import HTML, display
import time

In [None]:
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/NER/ner_dataset.csv', encoding='unicode_escape')
df.head()

Unnamed: 0,Sentence #,Word,POS,Tag
0,Sentence: 1,Thousands,NNS,O
1,,of,IN,O
2,,demonstrators,NNS,O
3,,have,VBP,O
4,,marched,VBN,O


In [None]:
df = df.fillna(method='ffill')
df = df.applymap(str.lower)

In [None]:
class Sentence(object):
  def __init__(self, df):
    self.n_sent = 1
    self.df = df
    self.empty = False
    agg = lambda s : [(w, p, t) for w, p, t in zip(s['Word'].values.tolist(), s['POS'].values.tolist(), s['Tag'].values.tolist())]
    self.grouped = self.df.groupby('Sentence #').apply(agg)
    self.sentences = [s for s in self.grouped]

  def get_text(self):
    try:
      s = self.grouped['Sentence: {}'.format(self.n_sent)]
      self.n_sent += 1
      return s
    except:
      return None

In [None]:
getter = Sentence(df)
sentences = getter.sentences

In [None]:
max_len = max(len(s) for s in sentences)

In [None]:
words = list(df['Word'].unique())
tags = list(df['Tag'].unique())

In [None]:
word2idx = {w: i + 2 for i, w in enumerate(words)}
word2idx['UNK'] = 1
word2idx['PAD'] = 0

tag2idx = {t: i + 1 for i, t in enumerate(tags)}
tag2idx['PAD'] = 0

In [None]:
idx2word = {i: w for w, i in word2idx.items()}
idx2tag = {i: t for t, i in tag2idx.items()}

In [None]:
X = [torch.tensor([word2idx[w[0]] for w in s]) for s in sentences]
X = pad_sequence(sequences = X, batch_first = True, padding_value = word2idx["PAD"])

In [None]:
y = [torch.tensor([tag2idx[w[2]] for w in s]) for s in sentences]
y = pad_sequence(sequences = y, batch_first = True, padding_value = tag2idx["PAD"])

In [None]:
num_tags = df['Tag'].nunique()

In [None]:
#train test split
X_train, X_validation, y_train, y_validation = train_test_split(X, y, test_size = 0.10)

trainset = list(zip(X_train, y_train))
validationset = list(zip(X_validation, y_validation))

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True, num_workers=2)
validationloader = torch.utils.data.DataLoader(validationset, batch_size=32, shuffle=False, num_workers=2)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [None]:
def progress(value:int, max:int=100):
    return HTML("""
        <progress
            value='{value}'
            max='{max}',
            style='width: 50%'
        >
            {value}
        </progress>
        {value} / {max}
    """.format(value=value, max=max))

In [None]:
class BiLSTM(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, lstm_layers,
               emb_dropout, lstm_dropout, fc_dropout):
    super().__init__()
    #embedding layer
    self.embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
    self.emb_dropout = nn.Dropout(emb_dropout)

    #BiLSTM
    self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=lstm_layers, bidirectional=True, dropout=lstm_dropout if lstm_layers > 1 else 0)
    
    #Fully Connected
    self.fc_dropout = nn.Dropout(fc_dropout)
    self.fc = nn.Linear(hidden_dim * 2, output_dim)

  def forward(self, inputs):
    embedding_out = self.emb_dropout(self.embeddings(inputs))
    lstm_out, _ = self.lstm(embedding_out)
    ner_out = self.fc(self.fc_dropout(lstm_out))
    return ner_out

In [None]:
model = BiLSTM(
    vocab_size=len(words) + 2, 
    embedding_dim=int(len(words) ** 0.56), 
    hidden_dim=64, 
    output_dim=len(tags)+1, 
    lstm_layers=2, 
    emb_dropout=0.5, 
    lstm_dropout=0.1, 
    fc_dropout=0.25)

In [None]:
class NER(object):
  def __init__(self, model, trainloader, validationloader, optimizer_cls, loss_fn_cls):
    self.model = model
    self.trainloader = trainloader
    self.validationloader = validationloader
    self.optimizer = optimizer_cls(model.parameters())
    #ignored padding to contribute to the input gradient
    self.loss_fn = loss_fn_cls(ignore_index=0)

  def evaluate(self):
    epoch_loss = 0
    epoch_accuracy = 0
    validation_size = len(self.validationloader)
    display_out = display(progress(0, self.validationloader), display_id=True)

    self.model.eval()

    with torch.no_grad():
      for i, data in enumerate(self.validationloader, 0):
        text, tag = data
        pred_tags = model(text)
        pred_tags = pred_tags.view(-1, pred_tags.shape[-1])
        true_tags = tag.view(-1)
        loss = self.loss_fn(pred_tags, true_tags)
        accuracy = self.accuracy(i, pred_tags, true_tags)

        '''
        evaluate
        '''
        _, predicted = torch.max(pred_tags, 1)
        non_pad_idx = (true_tags != 0).nonzero(as_tuple = True)
        correct = predicted[non_pad_idx].eq(true_tags[non_pad_idx])
        accuracy = correct.sum() / torch.FloatTensor([true_tags[non_pad_idx].shape[0]])

        epoch_loss += loss.item()
        epoch_accuracy += accuracy.item()

        display_out.update(progress(i+1, validation_size))

      return epoch_loss / len(self.validationloader), epoch_accuracy / len(self.validationloader)


  def epoch(self):
    epoch_loss = 0
    epoch_accuracy = 0
    train_size = len(self.trainloader)
    display_out = display(progress(0, train_size), display_id=True)

    self.model.train()

    torch.set_printoptions(profile='full')
    for i, data in enumerate(self.trainloader, 0):
      text, tag = data
      #zero the parameter gradients
      self.optimizer.zero_grad()

      pred_tags = model(text)
      pred_tags = pred_tags.view(-1, pred_tags.shape[-1])
      true_tags = tag.view(-1)
      loss = self.loss_fn(pred_tags, true_tags)
      accuracy = self.accuracy(pred_tags, true_tags)

      loss.backward()
      self.optimizer.step()

      epoch_loss += loss.item()
      epoch_accuracy += accuracy.item()

      display_out.update(progress(i+1, train_size))

    return epoch_loss / len(self.trainloader), epoch_accuracy / len(self.trainloader)

  def accuracy(self, preds, actual):
    _, predicted = torch.max(preds, 1)
    non_pad_idx = (actual != 0).nonzero(as_tuple = True)
    correct = predicted[non_pad_idx].eq(actual[non_pad_idx])
    return correct.sum() / torch.FloatTensor([actual[non_pad_idx].shape[0]])

  def train(self, n_epochs:int):
    for i in range(n_epochs):
      print(f'Epoch {i+1}')
      print('Training Phase')
      train_loss, train_accuracy = self.epoch()
      print(f"\t\tTrain Loss: {train_loss:.5f} | Train Accuracy: {train_accuracy: .5f}")
      print('Validation Phase')
      eval_loss, eval_accuracy = self.evaluate()
      print(f"\t\tValidation Loss: {eval_loss:.5f} | Validation Accuraccy: {eval_accuracy: .5f}")

  def predict(self, sentence:str, actual_tags:list=[], display=True):
    tokenizer = get_tokenizer('basic_english')
    tokens = [token.lower() for token in tokenizer(sentence)]
    tok2idx = torch.IntTensor([[word2idx[t] if t in word2idx else word2idx['UNK'] for t in tokens]])
    pred_tags = self.model(tok2idx)
    pred_tags = pred_tags.view(-1, pred_tags.shape[-1])
    _, predicted = torch.max(pred_tags, 1)

    if display:
      if len(actual_tags) != 0:
        print('\t{:<10} {:>15} {:>10}'.format('words', 'predicted', 'actual'))
        for t, p, at in zip(tokens, predicted, actual_tags):
          print('\t{:<10} {:>15} {:>10}'.format(t, idx2tag[p.item()], at))
      else :
        print('\t{:<10} {:>15}'.format('words', 'predicted'))
        for t, p in zip(tokens, predicted):
          print('\t{:<10} {:>15}'.format(t, idx2tag[p.item()]))

    return tokens, predicted

In [None]:
ner = NER(model=model, 
          trainloader=trainloader, 
          validationloader=validationloader,
          optimizer_cls=Adam,
          loss_fn_cls=nn.CrossEntropyLoss)
 
ner.train(5)