# Named Entity Recognition with Bidirectional LSTM

Named Entity Recognition (NER) is similar to POS tagging (using the same many-to-many classification code) except it only tags specific entities like person name, organisation name, country name etc. TensorFlow uses IOB format to tag named entities:

    I 'inside' a named chunk of text
    O 'outside' a named chunk of text
    B 'beginning' a named chunk of text


## Set up the data

The data is downloaded from the online course in **pickle format**, which is a Python hierarchical object converted to bytes in order to efficiently store large amounts of data. The data has already been split into training and test sets, with each containing the input words (already split) and target named entities.

In [1]:
import pickle

In [2]:
# Read in pickle file to load training data

with open('data/ner_train.pkl', 'rb') as f:
    corpus_train = pickle.load(f)

# Read in pickle file to load test data

with open('data/ner_test.pkl', 'rb') as f:
    corpus_test = pickle.load(f)

In [3]:
corpus_test

In [None]:
train_inputs = []
train_targets = []

for sentence_tag_pairs in corpus_train:
    tokens = [] 
    target = [] 
    
    for token, tag in sentence_tag_pairs:
        tokens.append(token) 
        target.append(tag) 
        
    train_inputs.append(tokens) 
    train_targets.append(target)

In [None]:
test_inputs = []
test_targets = []

for sentence_tag_pairs in corpus_test:
    tokens = [] 
    target = [] 
    
    for token, tag in sentence_tag_pairs:
        tokens.append(token) 
        target.append(tag) 
        
    test_inputs.append(tokens) 
    test_targets.append(target)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

%matplotlib inline

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Dense, Input, Bidirectional
from tensorflow.keras.layers import LSTM, GRU, SimpleRNN, Embedding
from tensorflow.keras.models import Model
from tensorflow.keras.losses import SparseCategoricalCrossentropy

In [None]:
# -------------------------------- Convert sentences of words to sequences of integers

MAX_VOCAB_SIZE = None

# Capitalization might be useful - test it
should_lowercase = False

# Set up tokenizer
word_tokenizer = Tokenizer(num_words=MAX_VOCAB_SIZE, lower=should_lowercase, oov_token='UNK')
# Otherwise unknown tokens will be removed and len(input) != len(target)
# - input words and target words will not be aligned!

# It's ok to "fit" on the whole corpus - it just means some embeddings won't be trained
# This is because for the test set, any unknown tokens will be removed, which will change the length of input (CHECK!!!)
word_tokenizer.fit_on_texts(train_inputs)

train_inputs_int = word_tokenizer.texts_to_sequences(train_inputs)

test_inputs_int = word_tokenizer.texts_to_sequences(test_inputs)

In [None]:
# Get word -> integer mapping for vocab size (V)

word2idx = word_tokenizer.word_index

V = len(word2idx)

print('Found %s unique tokens.' % V)

In [None]:
# Function to flatten list of lists to a single list

def flatten(list_of_lists):
    flattened = [val for sublist in list_of_lists for val in sublist] 
    return flattened

In [None]:
# Check that targets match in training and test sets

all_train_targets = set(flatten(train_targets))

all_train_targets

In [None]:
all_test_targets = set(flatten(test_targets))

all_test_targets

In [None]:
all_train_targets == all_test_targets

In [None]:
# --------------------------------------- Convert lists of targets to sequences of integers

tag_tokenizer = Tokenizer()

tag_tokenizer.fit_on_texts(train_targets)

train_targets_int = tag_tokenizer.texts_to_sequences(train_targets)
test_targets_int = tag_tokenizer.texts_to_sequences(test_targets)

In [None]:
# Save for later use in SciKit-Learn performance metrics (without padding)

train_targets_int_unpadded = train_targets_int
test_targets_int_unpadded = test_targets_int

In [None]:
# Before padding, find max sequence length (T) since we don't want to truncate any inputs which would also truncate targets

maxlen_train = max(len(sent) for sent in train_inputs)
maxlen_test = max(len(sent) for sent in test_inputs)

T = max((maxlen_train, maxlen_test))

In [None]:
# ------------------------------------- Pad sequences to get N x T matrix

train_inputs_int = pad_sequences(train_inputs_int, maxlen=T)

print('Shape of data train tensor:', train_inputs_int.shape)

In [None]:
test_inputs_int = pad_sequences(test_inputs_int, maxlen=T)

print('Shape of data test tensor:', test_inputs_int.shape)

In [None]:
train_targets_int = pad_sequences(train_targets_int, maxlen=T)

print('Shape of train targets tensor:', train_targets_int.shape)

In [None]:
test_targets_int = pad_sequences(test_targets_int, maxlen=T)

print('Shape of test targets tensor:', test_targets_int.shape)

In [None]:
# ------------------------------- Number of classes

K = len(tag_tokenizer.word_index) + 1

K

In [None]:
# ------------------------------------ Create the model

# You choose embedding dimensionality
D = 32

# Note: You actually want size of the embedding matrix to be (V + 1) x D, because the first index starts from 1 and not 0.
# Thus, if the final index of the embedding matrix is V, then it actually must have size V + 1.

i = Input(shape=(T,))

# mask_zero=True way slower on GPU than CPU!
x = Embedding(V + 1, D, mask_zero=True)(i)

x = Bidirectional(LSTM(32, return_sequences=True))(x)
# x = SimpleRNN(32, return_sequences=True)(x)

x = Dense(K)(x)

model = Model(i, x)

In [None]:
# Compile model

model.compile(loss=SparseCategoricalCrossentropy(from_logits=True), optimizer='adam', metrics=['accuracy'])

In [None]:
# Fit model (60 secs per epoch on CPU)

print('Training model...')

r = model.fit(train_inputs_int, train_targets_int, epochs=5, validation_data=(test_inputs_int, test_targets_int))

In [None]:
# Plot loss per epoch

plt.plot(r.history['loss'], label='train loss')
plt.plot(r.history['val_loss'], label='val loss')
plt.legend();

In [None]:
# Plot accuracy per epoch

plt.plot(r.history['accuracy'], label='train acc')
plt.plot(r.history['val_accuracy'], label='val acc')
plt.legend();

In [None]:
# -------------------------------- True model accuracy - below includes unpadded targets

# Get length of each sequence in training and test sets

train_lengths = []

for sentence in train_inputs:
    train_lengths.append(len(sentence))

test_lengths = []

for sentence in test_inputs:
    test_lengths.append(len(sentence))

In [None]:
# Generate predictions from training set
train_probs = model.predict(train_inputs_int) # N x T x K

# Access label with highest probabilty and remove padding
train_predictions = []

for probs, length in zip(train_probs, train_lengths):
    # probs is T x K 
    probs_ = probs[-length:] 
    preds = np.argmax(probs_, axis=1) 
    train_predictions.append(preds)

# Flatten for use in SciKit
flat_train_predictions = flatten(train_predictions)

flat_train_targets = flatten(train_targets_int_unpadded)

In [None]:
# Generate predictions from test set
test_probs = model.predict(test_inputs_int) # N x T x K

# Access labels with highest probability and remove padding
test_predictions = []

for probs, length in zip(test_probs, test_lengths):
    # probs is T x K 
    probs_ = probs[-length:] 
    preds = np.argmax(probs_, axis=1) 
    test_predictions.append(preds)

# Flatten for use in SciKit
flat_test_predictions = flatten(test_predictions)

flat_test_targets = flatten(test_targets_int_unpadded)

In [None]:
from sklearn.metrics import accuracy_score, f1_score

print("Train acc:", accuracy_score(flat_train_targets, flat_train_predictions))
print("Test acc:", accuracy_score(flat_test_targets, flat_test_predictions))

print("Train f1:", f1_score(flat_train_targets, flat_train_predictions, average='macro'))
print("Test f1:", f1_score(flat_test_targets, flat_test_predictions, average='macro'))

In [None]:
# --------------------------------- Baseline model: map word to tag


from collections import Counter

# Function to find the most common element in a list

def most_common(lst):
    data = Counter(lst) 
    return data.most_common(1)[0][0]

In [None]:
token2tags = {k: [] for k, v in word2idx.items()}

# remove UNK token
del token2tags['UNK']

# Map words to tokens
for tokens, tags in zip(train_inputs, train_targets):
    for token, tag in zip(tokens, tags):
        if should_lowercase:
            token = token.lower() 
            
        if token in token2tags:
            token2tags[token].append(tag)

# Print
for k, v in token2tags.items():
    if len(v) == 0:
        print(k)


token2tag = {k: most_common(v) for k, v in token2tags.items()}

In [None]:
# Compute accuracy from training set

train_predictions = []

for sentence in train_inputs:
    predictions = [] 
    
    for token in sentence:
        if should_lowercase:
            token = token.lower() 
            
        predicted_tag = token2tag[token] 
        predictions.append(predicted_tag) 
            
    train_predictions.append(predictions)
            
# Flatten for use in SciKit
flat_train_predictions = flatten(train_predictions)
flat_train_targets = flatten(train_targets)

In [None]:
# Compute accuracy from test set

test_predictions = []

for sentence in test_inputs:
    predictions = [] 
    
    for token in sentence:
        predicted_tag = token2tag.get(token, 'INCORRECT') 
        predictions.append(predicted_tag) 
        
    test_predictions.append(predictions)
    
# Flatten for use in SciKit
flat_test_predictions = flatten(test_predictions)
flat_test_targets = flatten(test_targets)

In [None]:
print("Train acc:", accuracy_score(flat_train_targets, flat_train_predictions))
print("Test acc:", accuracy_score(flat_test_targets, flat_test_predictions))

print("Train f1:", f1_score(flat_train_targets, flat_train_predictions, average='macro'))
print("Test f1:", f1_score(flat_test_targets, flat_test_predictions, average='macro'))