In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import sys
import os
import re
import string

In [None]:
!pip install seqeval


**Importing the dataset for named entity recognition model**

In [None]:
import seqeval

## Data preprocessing

In [None]:
def read_file(filename):
    with open(filename, "r") as file:
        text = file.readlines()
    return text

def process_text(text):
    X = []
    Y = []
    sentenceX = []
    sentenceY = []
    for line in text:
        split = line.split(" ")
        if len(split) > 1:
            sentenceX.append(split[0])
            sentenceY.append(split[1].replace("\n", ""))
        else:
            X.append(sentenceX)
            Y.append(sentenceY)
            sentenceX = []
            sentenceY = []
    return X, Y

text = read_file("/kaggle/input/ner-a2/train.txt")
X, Y = process_text(text)

In [None]:
text_test = read_file("/kaggle/input/ner-a2/test.txt")
X_test, Y_test = process_text(text_test)

In [None]:
testX,testY = process_text(text_test)

**Unique Tags**

In [None]:
unique = {}
tags = []
for i in range(len(Y)):
    for j in range(len(Y[i])):
        tags.append(Y[i][j])
        if Y[i][j] not in unique:
            unique[Y[i][j]] = len(unique)

In [None]:
unique_test = {}
for i in range(len(Y_test)):
    for j in range(len(Y_test[i])):
        if Y_test[i][j] not in unique_test:
            unique_test[Y_test[i][j]] = len(unique_test)

**Vocabulary**

In [None]:
vocab =  {}
max_len_sentence = 0
for i in range(len(X)):
    max_len_sentence = max(len(X[i]), max_len_sentence)
    for j in range(len(X[i])):
        if(X[i][j] not in vocab):
            vocab[X[i][j]] = len(vocab) + 2
n_words = len(vocab)

**Tags not in train set but present in test set**

In [None]:
print(unique)
print(unique_test.keys())
print(n_words)
print(max_len_sentence)
print(len(unique))
nott = [i for i in list(unique_test.keys()) if i not in list(unique.keys())]
print(nott)

**Differentiating sentences by creating list of list of tuples**

In [None]:
class SentenceGetter(object):
    def __init__(self, X,Y):
        self.n_sent = 1
        self.empty = False
        self.sentences = []
        for i in range(len(X)):
            sentence = []
            for word,tag in zip(X[i],Y[i]):
                sentence.append((word,tag))
            (self.sentences).append(sentence)
    
    def get_next(self):
        try:
            s = self.grouped["Sentence: {}".format(self.n_sent)]
            self.n_sent += 1
            return s
        except:
            return None

In [None]:
getter = SentenceGetter(X,Y)
getter_test = SentenceGetter(X_test,Y_test)

In [None]:
sentences = getter.sentences
sentences_test = getter_test.sentences

In [None]:
print(sentences[5])

**Maximum Sequence Length**

In [None]:
maxlen = max([len(s) for s in sentences])
print ('Maximum sequence length:', maxlen)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use("ggplot")

In [None]:
plt.hist([len(s) for s in sentences], bins=50)
plt.title("Number of Samples v/s Sentence length (Train)")
plt.show()

In [None]:
plt.hist([len(s) for s in sentences_test], bins=50)
plt.title("Number of Samples v/s Sentence length (Test)")
plt.show()

In [None]:
len(X[0])
words = list(np.concatenate(X))
print(len(words))

In [None]:
words = list(set(words))
print(len(words))
# words.append("PAD")
# words.append("UNK")
n_words = len(words)+2
n_words # 2 for PAD,UNK 

In [None]:
tags = list(set(unique.keys()))
tags.append('PAD')
print(tags)
n_tags = len(tags)
n_tags

**Converting words, tags to numbers and numbers to words, tags**

In [None]:
word2idx = {w: i+2 for i, w in enumerate(words)}
word2idx['PAD'] = 0
word2idx['UNK'] = 1
# word2idx["NUM"] = 2
idx2word = {w : i for i, w in word2idx.items()}
tag2idx = {t: i+1 for i, t in enumerate(tags)}
tag2idx['PAD'] = 0
idx2tag = {w : i for i, w in tag2idx.items()}

In [None]:
print(idx2tag)

In [None]:
print(tag2idx)

**Padding sequences to get same matching length of all the sequences**

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
X_train = [[word2idx[w[0]] for w in s] for s in sentences]
# X_train = []
# for s in sentences:
#     arr = []
#     for w in s:
#         if(w[0].isnumeric() or w[0][0].isnumeric()):
#             arr.append(word2idx["NUM"])
#         else:
#             arr.append(word2idx[w[0]])
#     X_train.append(arr)

In [None]:
# X_test = [[word2idx[w[0]] for w in s] for s in sentences_test]
X_test = []
for s in sentences_test:
    arr = []
    for w in s:
        if(w[0] in words):
            arr.append(word2idx[w[0]])
        else:
            arr.append(word2idx["UNK"])
#         elif(w[0].isalpha()):
#             arr.append(word2idx["UNK"])
#         else:
#             arr.append(word2idx["NUM"])
    X_test.append(arr)

In [None]:
X_train= pad_sequences(maxlen=maxlen, sequences = X_train,padding="post",value=word2idx['PAD'])
X_test= pad_sequences(maxlen=maxlen, sequences = X_test,padding="post",value=word2idx['PAD'])

In [None]:
print(word2idx[sentences[1][0][0]])
print(X_train[1])
print(sentences[1])

In [None]:
print(X_test[1])
print(sentences_test[1])

In [None]:
Y_train = [[tag2idx[w[1]] for w in s] for s in sentences]
print(np.array(Y_train).shape)

In [None]:
# Y_test = [[tag2idx[w[1]] for w in s] for s in sentences_test]
Y_test = []
for s in sentences_test:
    arr = []
    for w in s:
        if w[1] in tag2idx:
            arr.append(tag2idx[w[1]])
        else:
            arr.append(tag2idx['PAD'])
    Y_test.append(arr)
    
print(np.array(Y_test).shape)

In [None]:
Y_train = pad_sequences(maxlen=maxlen, sequences=Y_train, padding="post", value=tag2idx["PAD"])
print(np.array(Y_train).shape)
Y_test = pad_sequences(maxlen=maxlen, sequences=Y_test, padding="post", value=tag2idx["PAD"])
print(np.array(Y_test).shape)

**Converting labels to one hot vector**

In [None]:
from keras.utils import to_categorical
y_train = [to_categorical(i, num_classes=n_tags) for i in Y_train]
y_test = [to_categorical(i, num_classes=n_tags) for i in Y_test]
print(np.array(y_train).shape)
print(np.array(y_test).shape)

## Model Building

In [None]:
from keras.models import Model, Sequential
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout, Bidirectional,Input
from keras.layers import Conv1D, concatenate, SpatialDropout1D, GlobalMaxPooling1D

from tensorflow.keras.utils import plot_model

In [None]:
# Hyperparams for model training
BATCH_SIZE = 32
EPOCHS = 10
MAX_LEN = maxlen
EMBEDDING = 20

In [None]:
!pip -q install git+https://www.github.com/keras-team/keras-contrib.git sklearn-crfsuite

In [None]:
# input = Input(shape=(MAX_LEN,))
def build_model(n_words, EMBEDDING, MAX_LEN, n_tags):
    model = Sequential()
    model.add(Embedding(input_dim=n_words, output_dim=EMBEDDING, input_length=MAX_LEN))
    model.add(Dropout(0.1))
    model.add(Bidirectional(LSTM(units= 100, return_sequences=True, recurrent_dropout=0.1)))
    model.add((LSTM(units=100, return_sequences=True, dropout=0.5, recurrent_dropout=0.5)))
    model.add(TimeDistributed(Dense(n_tags, activation="softmax"))) # softmax output layer
    return model

In [None]:
# model = Model(input, out)
model = build_model(n_words, EMBEDDING, MAX_LEN, n_tags)
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
history = model.fit(X_train,np.array(y_train),
                    batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.1, verbose=1)

In [None]:
# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss w/o NUM token')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper right')
plt.show()

## Evaluation

In [None]:
model.evaluate(X_test, np.array(y_test))

In [None]:
pred_cat = model.predict(X_test)
pred = np.argmax(pred_cat, axis=-1)
y_test_true = np.argmax(y_test, -1)

In [None]:
# Convert the index to tag
from seqeval.metrics import classification_report, accuracy_score
pred_tag = [[idx2tag[i] for i in row] for row in pred]
y_true_tag = [[idx2tag[i] for i in row] for row in y_test_true] 

report = classification_report(y_true_tag,pred_tag)
print(report)

In [None]:
from seqeval.metrics import classification_report, accuracy_score
pred_tag = [[idx2tag[i] for i in row] for row in pred]
y_true_tag = [[idx2tag[i] for i in row] for row in y_test_true] 

report = classification_report(y_true_tag,pred_tag)
print(report)

def get_scores(predY, trueY):
    from seqeval.metrics import f1_score
    trueY_O = [i for i, x in enumerate(y_true_tag) if x == "O"]
    predY = [predY[i] for i in range(len(predY)) if i not in trueY_O]
    trueY = [trueY[i] for i in range(len(trueY)) if i not in trueY_O]

    print("Micro F1 score: ", f1_score(trueY, predY, average="micro"))
    print("Macro F1 score: ", f1_score(trueY, predY, average="macro"))
    print("Average F1 score: ", (f1_score(trueY, predY, average="micro") + f1_score(trueY, predY, average="macro")) / 2)

get_scores(pred_tag, y_true_tag)

In [None]:
print(pred_tag[0])
print(y_true_tag[0])
words.append("PAD")
words.append("UNK")
words.append("NUM")

In [None]:
i = np.random.randint(0, X_test.shape[0])
p = model.predict(np.array([X_test[i]]))
p = np.argmax(p, axis=-1)
pred_tag = [idx2tag[j] for j in p[0]]
print("Test Sentence for Prediction:",testX[i])
print("Gold Labels:",testY[i])
print("Predicted Labels:", pred_tag[:len(testX[i])])
print("Accuracy:",accuracy_score(testY[i], pred_tag))
print("True accuracy without pad:",accuracy_score(testY[i], pred_tag[: len(testY[i])]))
print("{:14} {:8} {}".format("Word", "True", "Pred"))
for w,tre,pred in zip(X_test[i],Y_test[i], p[0]):
    print("{:14} {:8} {}".format(words[w-2],tags[tre-1],tags[pred-1]))

In [None]:
p[0]

In [None]:
print(words[X_test[0][0]])
words[-2]

In [None]:
idx2word[1]

In [None]:
test_sentence = "I am going to school ."
test_sentence = test_sentence.split(" ")
# test_sentence = ["Hawking", "was", "a", "Fellow", "of", "the", "Royal", "Society", ",", "a", "lifetime", "member",
#                  "of", "the", "Pontifical", "Academy", "of", "Sciences", ",", "and", "a", "recipient", "of",
#                  "the", "Presidential", "Medal", "of", "Freedom", ",", "the", "highest", "civilian", "award",
#                  "in", "the", "United", "States", "."]

In [None]:
x_test_sent = pad_sequences(sequences=[[word2idx[w] for w in test_sentence]],
                            padding="post", value=word2idx['PAD'], maxlen=maxlen)

In [None]:
x_test_sent.shape

In [None]:
p = model.predict(np.array([x_test_sent[0]]))
p = np.argmax(p, axis=-1)
print("{:15} {}".format("Word", "Prediction"))
print(30 * "=")
for w, pred in zip(test_sentence, p[0]):
    print("{:15} {:10}".format(w, tags[pred]))