In [None]:
!pip3 install datasets
!wget https://raw.githubusercontent.com/sighsmile/conlleval/master/conlleval.py

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from datasets import load_dataset
from collections import Counter
from conlleval import evaluate

In [None]:
conll_data = load_dataset("conll2003")

In [None]:
def export_to_file(export_file_path, data):
    with open(export_file_path, "w") as f:
        for record in data:
            ner_tags = record["ner_tags"]
            tokens = record["tokens"]
            if len(tokens) > 0:
                f.write(
                    str(len(tokens))
                    + "\t"
                    + "\t".join(tokens)
                    + "\t"
                    + "\t".join(map(str, ner_tags))
                    + "\n"
                )


os.mkdir("data")
export_to_file("./data/conll_train.txt", conll_data["train"])
export_to_file("./data/conll_val.txt", conll_data["validation"])

In [None]:
def make_tag_lookup_table():
    iob_labels = ["B", "I"]
    ner_labels = ["PER", "ORG", "LOC", "MISC"]
    all_labels = [(label1, label2) for label2 in ner_labels for label1 in iob_labels]
    all_labels = ["-".join([a, b]) for a, b in all_labels]
    all_labels = ["[PAD]", "O"] + all_labels
    return dict(zip(range(0, len(all_labels) + 1), all_labels))


mapping = make_tag_lookup_table()
print(mapping)

In [None]:
all_tokens = sum(conll_data["train"]["tokens"], [])
all_tokens_array = np.array(list(map(str.lower, all_tokens)))

counter = Counter(all_tokens_array)
print(len(counter))

num_tags = len(mapping)
vocab_size = 20000

# We only take (vocab_size - 2) most commons words from the training data since
# the `StringLookup` class uses 2 additional tokens - one denoting an unknown
# token and another one denoting a masking token
vocabulary = [token for token, count in counter.most_common(vocab_size - 2)]

# The StringLook class will convert tokens to token IDs
lookup_layer = keras.layers.StringLookup(
    vocabulary=vocabulary
)

In [None]:
train_data = tf.data.TextLineDataset("./data/conll_train.txt")
val_data = tf.data.TextLineDataset("./data/conll_val.txt")

In [None]:
def map_record_to_training_data(record):
    record = tf.strings.split(record, sep="\t")
    length = tf.strings.to_number(record[0], out_type=tf.int32)
    tokens = record[1 : length + 1]
    tags = record[length + 1 :]
    tags = tf.strings.to_number(tags, out_type=tf.int64)
    tags += 1
    return tokens, tags


def lowercase_and_convert_to_ids(tokens):
    tokens = tf.strings.lower(tokens)
    return lookup_layer(tokens)


# We use `padded_batch` here because each record in the dataset has a
# different length.
batch_size = 32
train_dataset = (
    train_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    # .padded_batch(batch_size)
)
val_dataset = (
    val_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    # .padded_batch(batch_size)
)


In [None]:
train_dataset=np.array(list(train_dataset.as_numpy_iterator()))
val_dataset=np.array(list(val_dataset.as_numpy_iterator()))


train_dataset = tf.keras.preprocessing.sequence.pad_sequences(train_dataset[:,0], padding="post",maxlen=64),tf.keras.preprocessing.sequence.pad_sequences(
train_dataset[:,1], padding="post",maxlen=64)

val_dataset = tf.keras.preprocessing.sequence.pad_sequences(val_dataset[:,0], padding="post",maxlen=64),tf.keras.preprocessing.sequence.pad_sequences(
val_dataset[:,1], padding="post",maxlen=64)

In [None]:
train_dataset=tf.data.Dataset.from_tensor_slices(
    train_dataset
)
val_dataset=tf.data.Dataset.from_tensor_slices(
    val_dataset
)

## NOW MODEL

In [None]:
class CustomNonPaddingTokenLoss(keras.losses.Loss):
    def __init__(self, name="custom_ner_loss"):
        super().__init__(name=name)

    def call(self, y_true, y_pred):
        loss_fn = keras.losses.SparseCategoricalCrossentropy(
            from_logits=True, reduction=keras.losses.Reduction.NONE
        )
        loss = loss_fn(y_true, y_pred)
        mask = tf.cast((y_true > 0), dtype=tf.float32)
        loss = loss * mask
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)


loss = CustomNonPaddingTokenLoss()




def custom_metric(y_true, y_pred):
  metric=tf.math.argmax(y_pred,axis=-1)==tf.cast(y_true, dtype=tf.int64)
  mask = tf.cast((y_true > 0), dtype=tf.float32)
  metric = tf.cast(metric,dtype=tf.float32) * mask
  return tf.reduce_sum(metric) / tf.reduce_sum(mask)


In [None]:
inp=tf.keras.layers.Input((64,))
embedding=tf.keras.layers.Embedding(vocab_size - 1, 64)(inp)
# att1=tf.keras.layers.MultiHeadAttention(num_heads=5,key_dim=64,output_shape=128)(embedding,embedding)
# att2=tf.keras.layers.MultiHeadAttention(num_heads=5,key_dim=64,output_shape=128)(att1,att1)
out=tf.keras.layers.Dense(num_tags, activation="softmax")(embedding)
ner_model=tf.keras.Model(inputs=inp,outputs=out)
ner_model.summary()

In [None]:
# class TransformerBlock(layers.Layer):
#     def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
#         super(TransformerBlock, self).__init__()
#         self.att = keras.layers.MultiHeadAttention(
#             num_heads=num_heads, key_dim=embed_dim
#         )
#         self.ffn = keras.Sequential(
#             [
#                 keras.layers.Dense(ff_dim, activation="relu"),
#                 keras.layers.Dense(embed_dim),
#             ]
#         )
#         self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
#         self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
#         self.dropout1 = keras.layers.Dropout(rate)
#         self.dropout2 = keras.layers.Dropout(rate)

#     def call(self, inputs, training=False):
#         attn_output = self.att(inputs, inputs)
#         attn_output = self.dropout1(attn_output, training=training)
#         out1 = self.layernorm1(inputs + attn_output)
#         ffn_output = self.ffn(out1)
#         ffn_output = self.dropout2(ffn_output, training=training)
#         return self.layernorm2(out1 + ffn_output)


# class TokenAndPositionEmbedding(layers.Layer):
#     def __init__(self, maxlen, vocab_size, embed_dim):
#         super(TokenAndPositionEmbedding, self).__init__()
#         self.token_emb = keras.layers.Embedding(
#             input_dim=vocab_size, output_dim=embed_dim
#         )
#         self.pos_emb = keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

#     def call(self, inputs):
#         maxlen = tf.shape(inputs)[-1]
#         positions = tf.range(start=0, limit=maxlen, delta=1)
#         position_embeddings = self.pos_emb(positions)
#         token_embeddings = self.token_emb(inputs)
#         return token_embeddings + position_embeddings



# class NERModel(keras.Model):
#     def __init__(
#         self, num_tags, vocab_size, maxlen=128, embed_dim=32, num_heads=2, ff_dim=32
#     ):
#         super(NERModel, self).__init__()
#         self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
#         self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
#         self.dropout1 = layers.Dropout(0.1)
#         self.ff = layers.Dense(ff_dim, activation="relu")
#         self.dropout2 = layers.Dropout(0.1)
#         self.ff_final = layers.Dense(num_tags, activation="softmax")

#     def call(self, inputs, training=False):
#         x = self.embedding_layer(inputs)
#         x = self.transformer_block(x)
#         x = self.dropout1(x, training=training)
#         x = self.ff(x)
#         x = self.dropout2(x, training=training)
#         x = self.ff_final(x)
#         return x


# ner_model = NERModel(num_tags, vocab_size, embed_dim=32, num_heads=4, ff_dim=64)

In [None]:
ner_model.compile(optimizer=tf.keras.optimizers.RMSprop(), loss=loss,metrics=[custom_metric])
ner_model.fit(train_dataset.batch(64), epochs=100,validation_data=val_dataset.batch(64))

In [None]:
def tokenize_and_convert_to_ids(text):
    tokens = text.split()
    return lowercase_and_convert_to_ids(tokens)


# Sample inference using the trained model
sample_input = tokenize_and_convert_to_ids(
    "Hi Hussain , do you know the Nottinghamshire"
)
sample_input = tf.reshape(sample_input, shape=[1, -1])
print(sample_input)

sample_input=tf.keras.preprocessing.sequence.pad_sequences(sample_input, padding="post",maxlen=64)



output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]

# eu -> B-ORG, german -> B-MISC, british -> B-MISC
print(prediction)

In [None]:
cnt=0
for element in val_data:
  print(tf.strings.regex_replace(element, "\t", " "))
  cnt+=1
  if cnt==10:
    break

In [None]:
def calculate_metrics(dataset):
    all_true_tag_ids, all_predicted_tag_ids = [], []

    for x, y in dataset:
        # print(x)
        output = ner_model.predict(x)
        predictions = np.argmax(output, axis=-1)
        predictions = np.reshape(predictions, [-1])

        true_tag_ids = np.reshape(y, [-1])

        mask = (true_tag_ids > 0) & (predictions > 0)
        true_tag_ids = true_tag_ids[mask]
        predicted_tag_ids = predictions[mask]

        all_true_tag_ids.append(true_tag_ids)
        all_predicted_tag_ids.append(predicted_tag_ids)

    all_true_tag_ids = np.concatenate(all_true_tag_ids)
    all_predicted_tag_ids = np.concatenate(all_predicted_tag_ids)

    predicted_tags = [mapping[tag] for tag in all_predicted_tag_ids]
    real_tags = [mapping[tag] for tag in all_true_tag_ids]

    evaluate(real_tags, predicted_tags)


calculate_metrics(val_dataset.batch(64))