In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import numpy as np
import pandas as pd
import tensorflow as tf

tf.keras.utils.set_random_seed(33)

In [2]:
data = pd.read_csv("data/ner_dataset.csv", encoding = "ISO-8859-1") 
train_sents = open('data/small/train/sentences.txt', 'r').readline()
train_labels = open('data/small/train/labels.txt', 'r').readline()
print('SENTENCE:', train_sents)
print('SENTENCE LABEL:', train_labels)
print('ORIGINAL DATA:\n', data.head())
del(data, train_sents, train_labels)

SENTENCE: Thousands of demonstrators have marched through London to protest the war in Iraq and demand the withdrawal of British troops from that country .

SENTENCE LABEL: O O O O O O B-geo O O O O O B-geo O O O O O B-gpe O O O O O

ORIGINAL DATA:
     Sentence #           Word  POS Tag
0  Sentence: 1      Thousands  NNS   O
1          NaN             of   IN   O
2          NaN  demonstrators  NNS   O
3          NaN           have  VBP   O
4          NaN        marched  VBN   O


In [3]:
def load_data(file_path):
    with open(file_path,'r') as file:
        data = np.array([line.strip() for line in file.readlines()])
    return data

In [4]:
train_sentences = load_data('data/large/train/sentences.txt')
train_labels = load_data('data/large/train/labels.txt')

val_sentences = load_data('data/large/val/sentences.txt')
val_labels = load_data('data/large/val/labels.txt')

test_sentences = load_data('data/large/test/sentences.txt')
test_labels = load_data('data/large/test/labels.txt')

In [5]:
def get_sentence_vectorizer(sentences):
    tf.keras.utils.set_random_seed(33)
    """
    Create a TextVectorization layer for sentence tokenization and adapt it to the provided sentences.

    Parameters:
    sentences (list of str): Sentences for vocabulary adaptation.

    Returns:
    sentence_vectorizer (tf.keras.layers.TextVectorization): TextVectorization layer for sentence tokenization.
    vocab (list of str): Extracted vocabulary.
    """
    sentence_vectorizer = tf.keras.layers.TextVectorization(standardize=None)
    sentence_vectorizer.adapt(sentences)
    vocab = sentence_vectorizer.get_vocabulary()
    
    return sentence_vectorizer, vocab

In [6]:
test_vectorizer, test_vocab = get_sentence_vectorizer(train_sentences[:1000])
print(f"Test vocab size: {len(test_vocab)}")

sentence = "I like learning new NLP models !"
sentence_vectorized = test_vectorizer(sentence)
print(f"Sentence: {sentence}\nSentence vectorized: {sentence_vectorized}")

Test vocab size: 4650
Sentence: I like learning new NLP models !
Sentence vectorized: [ 296  314    1   59    1    1 4649]


In [7]:
sentence_vectorizer, vocab = get_sentence_vectorizer(train_sentences)

In [8]:
print(f"Sentence: {train_sentences[0]}")
print(f"Labels: {train_labels[0]}")

Sentence: Thousands of demonstrators have marched through London to protest the war in Iraq and demand the withdrawal of British troops from that country .
Labels: O O O O O O B-geo O O O O O B-geo O O O O O B-gpe O O O O O


In [9]:
def get_tags(labels):
    tag_set = set()
    for el in labels:
        for tag in el.split(" "):
            tag_set.add(tag)
    tag_list = list(tag_set) 
    tag_list.sort()
    return tag_list

tags = get_tags(train_labels)
print(tags)

['B-art', 'B-eve', 'B-geo', 'B-gpe', 'B-nat', 'B-org', 'B-per', 'B-tim', 'I-art', 'I-eve', 'I-geo', 'I-gpe', 'I-nat', 'I-org', 'I-per', 'I-tim', 'O']


In [10]:
def make_tag_map(tags):
    tag_map = {}
    for i,tag in enumerate(tags):
        tag_map[tag] = i 
    return tag_map

In [11]:
tag_map = make_tag_map(tags)
print(tag_map)

{'B-art': 0, 'B-eve': 1, 'B-geo': 2, 'B-gpe': 3, 'B-nat': 4, 'B-org': 5, 'B-per': 6, 'B-tim': 7, 'I-art': 8, 'I-eve': 9, 'I-geo': 10, 'I-gpe': 11, 'I-nat': 12, 'I-org': 13, 'I-per': 14, 'I-tim': 15, 'O': 16}


In [12]:
def label_vectorizer(labels, tag_map):
    """
    Convert list of label strings to padded label IDs using a tag mapping.

    Parameters:
    labels (list of str): List of label strings.
    tag_map (dict): Dictionary mapping tags to IDs.
    Returns:
    label_ids (numpy.ndarray): Padded array of label IDs.
    """
    label_ids = []
    for element in labels:
        tokens = element.split()
        element_ids = []

        for token in tokens:
            element_ids.append(tag_map[token])

        label_ids.append(element_ids)
        
    label_ids = tf.keras.preprocessing.sequence.pad_sequences(label_ids, padding='post', value=-1)

    return label_ids

In [13]:
print(f"Sentence: {train_sentences[5]}")
print(f"Labels: {train_labels[5]}")
print(f"Vectorized labels: {label_vectorizer([train_labels[5]], tag_map)}")

Sentence: The party is divided over Britain 's participation in the Iraq conflict and the continued deployment of 8,500 British troops in that country .
Labels: O O O O O B-gpe O O O O B-geo O O O O O O O B-gpe O O O O O
Vectorized labels: [[16 16 16 16 16  3 16 16 16 16  2 16 16 16 16 16 16 16  3 16 16 16 16 16]]


In [14]:
def generate_dataset(sentences, labels, sentence_vectorizer, tag_map):
    sentences_ids = sentence_vectorizer(sentences)
    labels_ids = label_vectorizer(labels, tag_map = tag_map)
    dataset = tf.data.Dataset.from_tensor_slices((sentences_ids, labels_ids))
    return dataset

In [15]:
train_dataset = generate_dataset(train_sentences,train_labels, sentence_vectorizer, tag_map)
val_dataset = generate_dataset(val_sentences,val_labels,  sentence_vectorizer, tag_map)
test_dataset = generate_dataset(test_sentences, test_labels,  sentence_vectorizer, tag_map)

In [16]:
print(f'The number of outputs is {len(tags)}')
g_vocab_size = len(vocab)
print(f"Num of vocabulary words in the training set: {g_vocab_size}")
print('The training size is', len(train_dataset))
print('The validation size is', len(val_dataset))
print('An example of the first sentence is\n\t', next(iter(train_dataset))[0].numpy())
print('An example of its corresponding label is\n\t', next(iter(train_dataset))[1].numpy())

The number of outputs is 17
Num of vocabulary words in the training set: 29847
The training size is 33570
The validation size is 7194
An example of the first sentence is
	 [1046    6 1121   18 1832  232  543    7  528    2  158    5   60    9
  648    2  922    6  192   87   22   16   54    3    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0]
An example of its corresponding label is
	 [16 16 16 16 16 16  2 16 16 16 16 16  2 16 16 16 16 16  3 16 16 16 16 16
 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 

In [17]:
def NER(len_tags, vocab_size, embedding_dim = 50):
    """
    Create a Named Entity Recognition (NER) model.

    Parameters:
    len_tags (int): Number of NER tags (output classes).
    vocab_size (int): Vocabulary size.
    embedding_dim (int, optional): Dimension of embedding and LSTM layers (default is 50).

    Returns:
    model (Sequential): NER model.
    """
    model = tf.keras.Sequential(name = 'sequential')
    model.add(tf.keras.layers.Embedding(input_dim=vocab_size + 1, output_dim=embedding_dim, mask_zero=True))
    model.add(tf.keras.layers.LSTM(units=embedding_dim, return_sequences=True))
    model.add(tf.keras.layers.Dense(units=len_tags, activation=tf.nn.log_softmax))

    return model

In [20]:
def masked_loss(y_true, y_pred):
    """
    Calculate the masked sparse categorical cross-entropy loss.

    Parameters:
    y_true (tensor): True labels.
    y_pred (tensor): Predicted logits.
    
    Returns:
    loss (tensor): Calculated loss.
    """
    y_true = tf.convert_to_tensor(y_true)
    y_pred = tf.convert_to_tensor(y_pred)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=-1)
    loss = loss_fn(y_true, y_pred)

    return  loss 

In [21]:
true_labels = [0,1,2,0]
predicted_logits = [[0.1,0.6,0.3] , [0.2,0.7,0.1], [0.1, 0.5,0.4], [0.4,0.4,0.2]]
print(masked_loss(true_labels, predicted_logits))

tf.Tensor(1.0508584, shape=(), dtype=float32)


In [22]:
def masked_accuracy(y_true, y_pred):
    """
    Calculate masked accuracy for predicted labels.

    Parameters:
    y_true (tensor): True labels.
    y_pred (tensor): Predicted logits.

    Returns:
    accuracy (tensor): Masked accuracy.

    """
    
    y_true = tf.cast(y_true, tf.float32)
    mask = tf.not_equal(y_true, -1)
    mask = tf.cast(mask, tf.float32)
    y_pred_class = tf.math.argmax(y_pred, axis=-1)
    y_pred_class = tf.cast(y_pred_class, tf.float32)
    matches_true_pred = tf.equal(y_true, y_pred_class)
    matches_true_pred = tf.cast(matches_true_pred , tf.float32)
    matches_true_pred *= mask
    masked_acc = tf.reduce_sum(matches_true_pred) / tf.reduce_sum(mask)

    return masked_acc

In [23]:
true_labels = [0,1,2,0]
predicted_logits = [[0.1,0.6,0.3] , [0.2,0.7,0.1], [0.1, 0.5,0.4], [0.4,0.4,0.2]]
print(masked_accuracy(true_labels, predicted_logits))

tf.Tensor(0.5, shape=(), dtype=float32)


In [24]:
model = NER(len(tag_map), len(vocab))
model.summary()

In [25]:
x = tf.expand_dims(np.array([545, 467, 896]), axis = 0)
x_padded = tf.expand_dims(np.array([545, 467, 896, 0, 0, 0]), axis = 0)

In [26]:
pred_x = model(x)
pred_x_padded = model(x_padded)
print(f'x shape: {pred_x.shape}\nx_padded shape: {pred_x_padded.shape}')

x shape: (1, 3, 17)
x_padded shape: (1, 6, 17)


In [27]:
np.allclose(pred_x, pred_x[:3])

True

In [28]:
y_true = tf.expand_dims([16, 6, 12], axis = 0)
y_true_padded = tf.expand_dims([16,6,12,-1,-1,-1], axis = 0)
print(f"masked_loss is the same: {np.allclose(masked_loss(y_true,pred_x), masked_loss(y_true_padded,pred_x_padded))}")
print(f"masked_accuracy is the same: {np.allclose(masked_accuracy(y_true,pred_x), masked_accuracy(y_true_padded,pred_x_padded))}")

masked_loss is the same: True
masked_accuracy is the same: True


In [29]:
model.compile(optimizer=tf.keras.optimizers.Adam(0.01), 
              loss = masked_loss,
              metrics = [masked_accuracy])

In [30]:
tf.keras.utils.set_random_seed(33)

BATCH_SIZE = 64

model.fit(train_dataset.batch(BATCH_SIZE),
          validation_data = val_dataset.batch(BATCH_SIZE),
          shuffle=True,
          epochs = 2)

Epoch 1/2
[1m525/525[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 29ms/step - loss: 0.4594 - masked_accuracy: 0.8952 - val_loss: 0.1393 - val_masked_accuracy: 0.9573
Epoch 2/2
[1m525/525[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 28ms/step - loss: 0.1299 - masked_accuracy: 0.9612 - val_loss: 0.1359 - val_masked_accuracy: 0.9584


<keras.src.callbacks.history.History at 0x24407493bd0>

In [31]:
test_sentences_id = sentence_vectorizer(test_sentences)
test_labels_id = label_vectorizer(test_labels,tag_map)
y_true = test_labels_id 
y_pred = model.predict(test_sentences_id)

[1m225/225[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step


In [32]:
print(f"The model's accuracy in test set is: {masked_accuracy(y_true,y_pred).numpy():.4f}")

The model's accuracy in test set is: 0.9576


In [33]:
def predict(sentence, model, sentence_vectorizer, tag_map):
    """
    Predict NER labels for a given sentence using a trained model.

    Parameters:
    sentence (str): Input sentence.
    model (tf.keras.Model): Trained NER model.
    sentence_vectorizer (tf.keras.layers.TextVectorization): Sentence vectorization layer.
    tag_map (dict): Dictionary mapping tag IDs to labels.

    Returns:
    predictions (list): Predicted NER labels for the sentence.

    """
    sentence_vectorized = sentence_vectorizer([sentence])
    output = model(sentence_vectorized)
    outputs = np.argmax(output, axis=-1)
    outputs = outputs[0]
    labels = list(tag_map.keys()) 
    pred = []
    for tag_idx in outputs:
        pred_label = labels[tag_idx]
        pred.append(pred_label)
    
    return pred

In [34]:
sentence = "Peter Parker , the White House director of trade and manufacturing policy of U.S , said in an interview on Sunday morning that the White House was working to prepare for the possibility of a second wave of the coronavirus in the fall , though he said it wouldn ’t necessarily come"
predictions = predict(sentence, model, sentence_vectorizer, tag_map)
for x,y in zip(sentence.split(' '), predictions):
    if y != 'O':
        print(x,y)

Peter B-per
Parker I-per
White B-org
House I-org
U.S B-org
Sunday B-tim
morning I-tim
White B-org
House I-org


In [36]:
sentence = "Ryan Madhuwala is an AI enthusiast. He studies at IIIT Gwalior BCS Branch. He cleared JEE to get admission there."
predictions = predict(sentence, model, sentence_vectorizer, tag_map)
for x,y in zip(sentence.split(' '), predictions):
    if y != 'O':
        print(x,y)

Ryan B-per
Madhuwala I-per
IIIT B-geo
Gwalior I-geo
BCS I-org
Branch. I-per
