In [2]:
import os
import tensorflow as tf
from tensorflow import keras
import numpy as np
import math
import pandas as pd

In [4]:
# Function to transform input text files into name and it's tag

def preprocess_data(filename):
    f = open(filename,encoding="utf8",errors='ignore')
    split_text = []
    sent = []

    for row in f:
        if len(row)==0 or row.startswith('-DOCSTART') or row[0]=="\n":
            if len(sent) > 0:
                split_text.append(sent)
                sent = []
            continue

        splits = row.split(' ')

        #Appending the token name and class label
        sent.append([splits[0],splits[-1].rstrip("\n")])
        if len(sent) > 0:
            split_text.append(sent)
            sent = []
    return split_text
   


trainSet = preprocess_data("train.txt")
validationSet = preprocess_data("valid.txt")
testSet = preprocess_data("np.txt")

In [6]:
#Storing unique class labels

classes = set()
words = set()
# words and labels

for data in [trainSet, validationSet, testSet]:
  for labeled_text in data:
    for word, label in labeled_text:
      classes.add(label)
      words.add(word.lower())

In [7]:
# Sort the set to ensure '0' is assigned to 0
ordered_classes = sorted(list(classes), key=len)

# Create mapping for labels
classToIndex = {}
for label in ordered_classes:
  classToIndex[label] = len(classToIndex)

# Storing unique labels index wise in dictionary
count_labels = len(classToIndex)
labelMapping = {v: k for k, v in classToIndex.items()}

# Create mapping for words
wordToIndex = {}
if len(wordToIndex) == 0:
  wordToIndex["PADDING_TOKEN"] = len(wordToIndex)
  wordToIndex["UNKNOWN_TOKEN"] = len(wordToIndex)

# Storing unique words index wise in  
for word in words:
  wordToIndex[word] = len(wordToIndex)

In [8]:
#Storing unique word indexes and labels for all rows in dataset

def createMatrices(data, wordToIndex, classToIndex):
  words = []
  labels = []
  for splittedTokens in data:
     wordIndices = []
     labelIndices = []
     for word, label in splittedTokens:
       if word in wordToIndex:
          wordIdx = wordToIndex[word]
       elif word.lower() in wordToIndex:
          wordIdx = wordToIndex[word.lower()]
       else:
          wordIdx = wordToIndex['UNKNOWN_TOKEN']

       wordIndices.append(wordIdx)
       labelIndices.append(classToIndex[label])
     words.append(wordIndices)
     labels.append(labelIndices)
  return words, labels


training_tokens, train_labels = createMatrices(trainSet, wordToIndex, classToIndex)
valid_tokens, valid_labels = createMatrices(validationSet, wordToIndex, classToIndex)
test_tokens, test_labels = createMatrices(testSet, wordToIndex, classToIndex)

In [9]:

from tensorflow.keras.preprocessing.sequence import pad_sequences
max_seq_len = 128
EMBEDDING_DIM = 100
epochs = 10

def padding(words, labels, max_len, padding='post'):
  padded_words = pad_sequences(words, max_len,padding='post')
  padded_labels = pad_sequences(labels, max_len, padding='post')
  return padded_words, padded_labels


#converting into 2D array using pad-sequence
train_arr, train_labels = padding(training_tokens, train_labels, max_seq_len, padding='post' )
valid_arr, valid_labels = padding(valid_tokens, valid_labels, max_seq_len, padding='post' )
test_arr, test_labels = padding(test_tokens, test_labels, max_seq_len, padding='post' )


In [12]:
# Loading glove embeddings
embeddings_index = {}
f = open('glove.6B.100d.txt', encoding="utf-8")

for line in f:
  values = line.strip().split(' ')
  word = values[0] # the first entry is the word
  coefs = np.asarray(values[1:], dtype='float32') #100d vectors representing the word
  embeddings_index[word] = coefs
    
f.close()
embedding_matrix = np.zeros((len(wordToIndex), EMBEDDING_DIM))

# Word embeddings for the tokens
for word,i in wordToIndex.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

In [14]:
train_batch_size = 32
valid_batch_size = 64
test_batch_size = 64

#Creating datasets for tf input pipeline
train_dataset = tf.data.Dataset.from_tensor_slices((train_arr, train_labels))
valid_dataset = tf.data.Dataset.from_tensor_slices((valid_arr, valid_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_arr, test_labels))

#Converting datasets into batches
shuffled_train_dataset = train_dataset.shuffle(buffer_size=train_arr.shape[0], reshuffle_each_iteration=True)
trainBatch = shuffled_train_dataset.batch(train_batch_size, drop_remainder=True)
validationBatch = valid_dataset.batch(valid_batch_size, drop_remainder=True)
testBatch = test_dataset.batch(test_batch_size, drop_remainder=True)


train_pb_max_len = math.ceil(float(len(train_arr))/float(train_batch_size))
valid_pb_max_len = math.ceil(float(len(valid_arr))/float(valid_batch_size))
test_pb_max_len = math.ceil(float(len(test_arr))/float(test_batch_size))

In [15]:
import tensorflow as tf
from tensorflow.keras import layers

class NERModel(tf.keras.Model):
    def __init__(self, max_seq_len, embed_input_dim, embed_output_dim, count_labels, weights):
        super(NERModel, self).__init__() 
        self.embedding = layers.Embedding(input_dim=embed_input_dim,output_dim=embed_output_dim, weights=weights,input_length=max_seq_len, trainable=False, mask_zero=True)        

        self.bilstm = layers.Bidirectional(layers.LSTM(128,return_sequences=True))
        self.dense = layers.Dense(count_labels)
        
    def call(self, inputs):
        x = self.embedding(inputs) # batchsize, max_seq_len, embedding_output_dim
        x = self.bilstm(x) #batchsize, max_seq_len, hidden_dim_bilstm
        probs = self.dense(x) #batchsize, max_seq_len, count_labels
        return probs

    

In [16]:
#Setting all model parameters for training

model = NERModel(max_seq_len=max_seq_len,embed_input_dim=len(wordToIndex), embed_output_dim=100, weights=[embedding_matrix], count_labels=count_labels)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
lossFunt = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

In [17]:
from fastprogress.fastprogress import master_bar, progress_bar


train_loss_metric = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
valid_loss_metric = tf.keras.metrics.Mean('valid_loss', dtype=tf.float32)
epochs = 5

# Training over calculating the loss function
def training_trainset(words_batch, labels_batch):
    with tf.GradientTape() as tape:
        probs = model(words_batch)
        loss = lossFunt(labels_batch, probs)
        
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads,model.trainable_variables)))
    
    return loss, probs

def training_validationSet(words_batch, labels_batch):
    probs = model(words_batch)
    loss = lossFunt(labels_batch, probs)
    return loss, probs

for epoch in range(epochs):
    for words_batch, labels_batch in progress_bar(trainBatch, total=train_pb_max_len):
        loss, probs = training_trainset(words_batch, labels_batch)
        train_loss_metric(loss)
    train_loss_metric.reset_states()
    
    for words_batch, labels_batch in progress_bar(validationBatch, total=valid_pb_max_len):
        loss, probs = training_validationSet(words_batch, labels_batch)
        valid_loss_metric.update_state(loss)
    valid_loss_metric.reset_states()


model.save_weights("model_weights",save_format='tf')




In [None]:
type(train_loss_metric)

tensorflow.python.keras.metrics.Mean

In [19]:
from fastprogress.fastprogress import master_bar, progress_bar

#Loading the model and assigning the trained weights
test_model =  NERModel(max_seq_len=max_seq_len, embed_input_dim=len(wordToIndex), embed_output_dim=EMBEDDING_DIM, weights=[embedding_matrix], count_labels=count_labels)
test_model.load_weights("model_weights")

# Convert numerical to categorical labels
def transformLabels(predictions, correct, labelMapping):
  predicted = []
  for sentence in predictions:
    for i in sentence:
      predicted.append([labelMapping[item] for item in i ])


  actual = []
  if correct != None:
    for sentence in correct:
      for i in sentence:
        actual.append([labelMapping[item] for item in i ])
  return actual, predicted



actualLabels = []
predictedLabels = []
i = 0


# Predict labels over test data
for words_batch, labels_batch in progress_bar(testBatch, total=test_pb_max_len):
  probs = test_model(words_batch)
  temp1 = tf.nn.softmax(probs)
  preds = tf.argmax(temp1, axis=2)
  actualLabels.append(np.asarray(labels_batch))
  predictedLabels.append(np.asarray(preds))
  i = i+1


In [20]:
import sklearn
from sklearn.metrics import classification_report
from sklearn.metrics import multilabel_confusion_matrix



label_correct, label_pred = transformLabels(predictedLabels, actualLabels, labelMapping)


In [21]:
actualLabels = np.vstack(actualLabels)
predictedLabels = np.vstack(predictedLabels)
performanceReport = classification_report(actualLabels.flatten(), predictedLabels.flatten())
performanceReport

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


'              precision    recall  f1-score   support\n\n           0       1.00      1.00      1.00    155362\n           1       0.00      0.00      0.00        81\n           2       0.00      0.00      0.00       205\n           3       0.00      0.00      0.00         0\n           4       0.00      0.00      0.00         0\n           5       0.00      0.00      0.00         0\n           6       0.00      0.00      0.00         0\n           7       0.00      0.00      0.00         0\n           8       0.00      0.00      0.00         0\n           9       0.00      0.00      0.00         0\n\n    accuracy                           1.00    155648\n   macro avg       0.10      0.10      0.10    155648\nweighted avg       1.00      1.00      1.00    155648\n'