# Named Entity Recognition using Transformers

Bases on https://keras.io/examples/nlp/ner_transformers/

https://keras.io/examples/nlp/ner_transformers/

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from datasets import load_dataset
from collections import Counter
from conlleval import evaluate
import nltk
nltk.download('punkt')

We will be using the transformer implementation from this fantastic
[example](https://keras.io/examples/nlp/text_classification_with_transformer/).

Let's start by defining a `TransformerBlock` layer:

In [2]:

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.ffn = keras.Sequential(
            [
                keras.layers.Dense(ff_dim, activation="relu"),
                keras.layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


Next, let's define a `TokenAndPositionEmbedding` layer:

In [3]:

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = keras.layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.pos_emb = keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, inputs):
        maxlen = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        position_embeddings = self.pos_emb(positions)
        token_embeddings = self.token_emb(inputs)
        return token_embeddings + position_embeddings


## Build the NER model class as a `keras.Model` subclass

In [4]:

class NERModel(keras.Model):
    def __init__(
        self, num_tags, vocab_size, maxlen=128, embed_dim=32, num_heads=2, ff_dim=32
    ):
        super(NERModel, self).__init__()
        self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
        self.dropout1 = layers.Dropout(0.1)
        self.ff = layers.Dense(ff_dim, activation="relu")
        self.dropout2 = layers.Dropout(0.1)
        self.ff_final = layers.Dense(num_tags, activation="softmax")

    def call(self, inputs, training=False):
        x = self.embedding_layer(inputs)
        x = self.transformer_block(x)
        x = self.dropout1(x, training=training)
        x = self.ff(x)
        x = self.dropout2(x, training=training)
        x = self.ff_final(x)
        return x


## Make the NER label lookup table



In [5]:

# def make_tag_lookup_table():
#     iob_labels = ["B", "I"]
#     ner_labels = ["PER", "ORG", "LOC", "MISC"]
#     all_labels = [(label1, label2) for label2 in ner_labels for label1 in iob_labels]
#     all_labels = ["-".join([a, b]) for a, b in all_labels]
#     all_labels = ["[PAD]", "O"] + all_labels
#     return dict(zip(range(0, len(all_labels) + 1), all_labels))

mapping={0: '[PAD]', 1: 'None', 2:'Service',  3:'Resource', 4:'Device'}
# mapping = make_tag_lookup_table()
print(mapping)

{0: '[PAD]', 1: 'None', 2: 'Service', 3: 'Resource', 4: 'Device'}


In [6]:
train_data = tf.data.TextLineDataset("train_dt_nf.txt")
val_data = tf.data.TextLineDataset("val_dt_nf.txt")

2021-12-18 19:17:01.304062: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [7]:
raw_train = np.load('TRAIN_DATA_np.npy', allow_pickle=True)
raw_test = np.load('TEST_DATA_np.npy', allow_pickle=True)

all_tokens = set()
for instance in raw_train:
    tokens=nltk.word_tokenize(instance[0])
    for token in tokens:
      all_tokens.add(token)


for instance in raw_test:
    tokens=nltk.word_tokenize(instance[0])
    for token in tokens:
      all_tokens.add(token)

print(len(all_tokens))

[nltk_data] Downloading package punkt to /Users/cristovao/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


13123


Get a list of all tokens in the training dataset. This will be used to create the
vocabulary.

In [9]:
all_tokens_array = np.array(list(map(str.lower, all_tokens)))

counter = Counter(all_tokens_array)
print(len(counter))

num_tags = len(mapping)
vocab_size = 12000

# We only take (vocab_size - 2) most commons words from the training data since
# the `StringLookup` class uses 2 additional tokens - one denoting an unknown
# token and another one denoting a masking token
vocabulary = [token for token, count in counter.most_common(vocab_size - 2)]
print(len(vocabulary))

# The StringLook class will convert tokens to token IDs
lookup_layer = keras.layers.StringLookup(
    vocabulary=vocabulary
)

12072
11998


Create 2 new `Dataset` objects from the training and validation data

Print out one line to make sure it looks good. The first record in the line is the number of tokens.
After that we will have all the tokens followed by all the ner tags.

In [10]:
print(list(train_data.take(2).as_numpy_iterator()))

[b'23\tIt\tlays\ta\tcertain\ttheoretical\tfoundation\tfor\tgiant\tmagnetostrictive\trelay\tactuator\tbeing\tused\tin\tthe\tfields\tof\tcutting\twith\tinvariableness\tcutting\tforce\t.\t0\t0\t0\t0\t0\t0\t0\t0\t0\t3\t3\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0', b"20\tHe\t's\tthe\trepository\tof\tour\tcommon\thistory\t,\tand\tby\tthat\tright\t,\tgrand\tpatron\tof\tthe\tBicentennial\t.\t0\t0\t0\t2\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0\t0"]


We will be using the following map function to transform the data in the dataset:

In [11]:

def map_record_to_training_data(record):
    record = tf.strings.split(record, sep="\t")
    # print(record)
    length = tf.strings.to_number(record[0], out_type=tf.int32)
    tokens = record[1 : length + 1]
    tags = record[length + 1 :]
    tags = tf.strings.to_number(tags, out_type=tf.int64)
    tags += 1
    return tokens, tags


def lowercase_and_convert_to_ids(tokens):
    tokens = tf.strings.lower(tokens)
    return lookup_layer(tokens)


# We use `padded_batch` here because each record in the dataset has a
# different length.
batch_size = 32
train_dataset = (
    train_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    .padded_batch(32)
)
val_dataset = (
    val_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    .padded_batch(batch_size)
)

ner_model = NERModel(num_tags, vocab_size, embed_dim=32, num_heads=4, ff_dim=64)

In [12]:
print(list(val_dataset.take(1).as_numpy_iterator()))

[(array([[ 1010,  7107,  3021, ...,     0,     0,     0],
       [  519,  1027,  9466, ...,     0,     0,     0],
       [  375,   519,  2540, ...,     0,     0,     0],
       ...,
       [ 1032,  1017,   417, ...,     0,     0,     0],
       [  519,   799,   955, ...,     0,     0,     0],
       [  519, 11936,   937, ...,     0,     0,     0]]), array([[1, 1, 1, ..., 0, 0, 0],
       [1, 1, 3, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       ...,
       [1, 1, 3, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0]]))]


We will be using a custom loss function that will ignore the loss from padded tokens.

In [13]:

class CustomNonPaddingTokenLoss(keras.losses.Loss):
    def __init__(self, name="custom_ner_loss"):
        super().__init__(name=name)

    def call(self, y_true, y_pred):
        loss_fn = keras.losses.SparseCategoricalCrossentropy(
            from_logits=True, reduction=keras.losses.Reduction.NONE
        )
        loss = loss_fn(y_true, y_pred)
        mask = tf.cast((y_true > 0), dtype=tf.float32)
        loss = loss * mask
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)


loss = CustomNonPaddingTokenLoss()

## Compile and fit the model

In [14]:
ner_model.compile(optimizer="adam", loss=loss)
ner_model.fit(train_dataset, epochs=10)


def tokenize_and_convert_to_ids(text):
    tokens = text.split()
    return lowercase_and_convert_to_ids(tokens)

Epoch 1/10


  return dispatch_target(*args, **kwargs)


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [15]:



# Sample inference using the trained model
sample_input = tokenize_and_convert_to_ids(
    "I need of an actuator"
)
sample_input = tf.reshape(sample_input, shape=[1, -1])
print(sample_input)

output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]

# eu -> B-ORG, german -> B-MISC, british -> B-MISC
print(prediction)

tf.Tensor([[681 816 241 917 311]], shape=(1, 5), dtype=int64)
['None', 'None', 'None', 'None', 'Device']


## Metrics calculation

Here is a function to calculate the metrics. The function calculates F1 score for the
overall NER dataset as well as individual scores for each NER tag.

In [17]:
from sklearn.metrics import classification_report


def calculate_metrics(dataset):
    all_true_tag_ids, all_predicted_tag_ids = [], []

    for x, y in dataset:
        output = ner_model.predict(x)
        # print(output)
        predictions = np.argmax(output, axis=-1)
        # print(predictions)
        predictions = np.reshape(predictions, [-1])
        # print(predictions)

        true_tag_ids = np.reshape(y, [-1])
        # print(true_tag_ids)

        mask = (true_tag_ids > 0) & (predictions > 0)
        true_tag_ids = true_tag_ids[mask]
        predicted_tag_ids = predictions[mask]
        # print(true_tag_ids.shape)
        # print(predicted_tag_ids.shape)

        all_true_tag_ids.append(true_tag_ids)
        all_predicted_tag_ids.append(predicted_tag_ids)

    all_true_tag_ids = np.concatenate(all_true_tag_ids)
    all_predicted_tag_ids = np.concatenate(all_predicted_tag_ids)
    print(len(all_true_tag_ids))
    print(len(all_predicted_tag_ids))

    predicted_tags = [mapping[tag] for tag in all_predicted_tag_ids]
    real_tags = [mapping[tag] for tag in all_true_tag_ids]
#     print(predicted_tags)
#     print(real_tags)

    # evaluate(real_tags, predicted_tags)
    print(classification_report(real_tags, predicted_tags))


calculate_metrics(val_dataset)

52594
52594
              precision    recall  f1-score   support

      Device       0.92      0.91      0.92      1162
        None       0.99      1.00      0.99     49241
    Resource       0.96      0.87      0.91      1604
     Service       0.89      0.67      0.76       587

    accuracy                           0.99     52594
   macro avg       0.94      0.86      0.90     52594
weighted avg       0.99      0.99      0.99     52594



In [20]:
sentence="""
As a Doctor, I want to the system to notify me about abnormality in sensor readings.
As a Doctor, I want to store in a database any abnormality in breathing of the patient.
As an Admin, I want to remove doctors and patients details of the MySQL.
As an Admin, I want to my iphone to alert me about some DDOS attacks.
As a patient, I want to upload a specific image and video of the patient.
"""
# Sample inference using the trained model
sample_input = tokenize_and_convert_to_ids(sentence)
sample_input = tf.reshape(sample_input, shape=[1, -1])
output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]
print(prediction)

['None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None']


In [24]:
# Sample inference using the trained model
sentences="""
As a Doctor, I want to the system to notify me about abnormality in sensor readings.
As a Doctor, I want to store in a database any abnormality in breathing of the patient.
As an Admin, I want to remove doctors and patients details of the MySQL.
As an Admin, I want to my iphone to alert me about some DDOS attacks.
As a patient, I want to upload a specific image and video of the patient.
"""
for sentence in sentences.split('.'):
    sample_input = tokenize_and_convert_to_ids(sentence)
    sample_input = tf.reshape(sample_input, shape=[1, -1])
    output = ner_model.predict(sample_input)
    prediction = np.argmax(output, axis=-1)[0]
    prediction = [mapping[i] for i in prediction]
    print(prediction)

['None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'Device', 'None']
['None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'Resource', 'None', 'None', 'None', 'None', 'None', 'None', 'None']
['None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'Resource']
['None', 'None', 'None', 'None', 'None', 'None', 'None', 'Device', 'None', 'None', 'None', 'None', 'None', 'None', 'None']
['None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None']
[]


In [22]:
# Sample inference using the trained model
sample_input = tokenize_and_convert_to_ids(
    "I need of an sensor"
)
sample_input = tf.reshape(sample_input, shape=[1, -1])
print(sample_input)

output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]

# eu -> B-ORG, german -> B-MISC, british -> B-MISC
print(prediction)

tf.Tensor([[681 816 241 917 186]], shape=(1, 5), dtype=int64)
['None', 'None', 'None', 'None', 'Device']
