In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
print('Tensorflow version: ', tf.__version__)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
df = pd.read_csv(r"/content/drive/MyDrive/Mini ProjectRisk Msc DS/Main Project/NER/Dataset/NER dataset.csv", encoding='utf-8')
df = df.fillna(method='ffill')
df.head(20)

In [None]:
df.groupby('Tag').size().reset_index(name='counts')

In [None]:
print('Unique words in corpus: ', df['Word'].nunique())
print('Unique tags in corpus: ', df['Tag'].nunique())

In [None]:
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
le.fit(df['Tag'])
df['Enc_tag'] = le.transform(df['Tag'])

##  Retrieve sentences and corresponding tags


In [None]:
class SentenceGetter(object):
    def __init__(self, df):
        self.n_sent = 1
        self.df = df
        agg_func = lambda s: [w for w in s['Word'].values.tolist()]
        self.grouped = self.df.groupby('Sentence Id').apply(agg_func)
        self.sentences = [s for s in self.grouped]

In [None]:
getter = SentenceGetter(df)
sentences = getter.sentences

In [None]:
class POSGetter(object):
    def __init__(self, df):
        self.n_sent = 1
        self.df = df
        agg_func = lambda s: [w for w in s['POS'].values.tolist()]
        self.grouped = self.df.groupby('Sentence Id').apply(agg_func)
        self.sentences = [s for s in self.grouped]

In [None]:
getter = POSGetter(df)
POS_ = getter.sentences

In [None]:
class TagGetter(object):
    def __init__(self, df):
        self.n_sent = 1
        self.df = df
        agg_func = lambda s: [w for w in s['Enc_tag'].values.tolist()]
        self.grouped = self.df.groupby('Sentence Id').apply(agg_func)
        self.sentences = [s for s in self.grouped]

In [None]:
getter = TagGetter(df)
Tags= getter.sentences

In [None]:
data = {'Sentence Id' : df['Sentence Id'].unique(),'Word': sentences,'POS': POS_ ,'Tag': Tags }
df1 = pd.DataFrame(data = data)

In [None]:
from sklearn.model_selection import train_test_split

training_dataset, testing_dataset = train_test_split(df1, test_size=0.2, random_state=2018)

In [None]:
!pip install -q datasets

In [None]:
import pandas as pd
from datasets import Dataset

def dataframe_to_conll(df):
    conll_lines = []
    for i, row in df.iterrows():
        word = row['Word']
        #pos = row['POS']
        ner = row['Tag']
        conll_lines.append(f"{word}{ner}")

    conll_dataset = '\n'.join(conll_lines)
    return conll_dataset

conll_dataset = dataframe_to_conll(training_dataset)
dataset = Dataset.from_pandas(training_dataset)

In [None]:
raw_tags = df.Tag.unique().tolist()
print(raw_tags)

## Padding input sentences and creating train/test split

In [None]:
tags = ['<PAD>'] + raw_tags
print(tags)

In [None]:
from sklearn import preprocessing
le2 = preprocessing.LabelEncoder()
le2.fit(tags)

In [None]:
TAG_SIZE = len(tags)
VOCAB_SIZE = 20000

In [None]:
import matplotlib.pyplot as plt
import copy

import numpy as np
import tensorflow as tf

In [None]:
train_tokens = tf.ragged.constant(dataset['Word'])
train_tokens = tf.map_fn(tf.strings.lower, train_tokens)

lookup_layer = tf.keras.layers.StringLookup(max_tokens=VOCAB_SIZE, mask_token="[MASK]", oov_token="[UNK]")
lookup_layer.adapt(train_tokens)

print(len(lookup_layer.get_vocabulary()))
print(lookup_layer.get_vocabulary()[:10])

In [None]:
def create_data_generator(dataset):
  def data_generator():
    for item in dataset:
      yield item['Word'], item['Tag']

  return data_generator

data_signature= (
        tf.TensorSpec(shape=(None,), dtype=tf.string),
        tf.TensorSpec(shape=(None, ), dtype=tf.int32)
)

train_data = tf.data.Dataset.from_generator(
    create_data_generator(dataset),
    output_signature=data_signature
)

In [None]:
def dataset_preprocess(tokens, tag_ids):
    preprocessed_tokens = preprecess_tokens(tokens)

    # increase by 1 for all tag_ids,
    # because `<PAD>` is added as the first element in tags list
    preprocessed_tag_ids = tag_ids + 1

    return preprocessed_tokens, preprocessed_tag_ids

def preprecess_tokens(tokens):
    tokens = tf.strings.lower(tokens)
    return lookup_layer(tokens)

BATCH_SIZE = 128

train_dataset = (
    train_data.map(dataset_preprocess)
    .padded_batch(batch_size=BATCH_SIZE).cache()
)

## Build and compile a Bidirectional LSTM model


In [None]:
def build_embedding_bilstm_model(
    vocab_size: int, embed_dims: int, lstm_units: int, tag_size: int
) -> tf.keras.Model:
    x = tf.keras.layers.Input(shape=(None,), dtype=tf.int64, name="x")
    y = tf.keras.layers.Embedding(vocab_size, embed_dims, mask_zero=True)(x)
    y = tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(lstm_units, return_sequences=True)
    )(y)
    output = tf.keras.layers.Dense(tag_size, activation='softmax')(y) 

    return tf.keras.Model(inputs=x, outputs=output)


model = build_embedding_bilstm_model(VOCAB_SIZE, 64, 128, TAG_SIZE)

In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

## Train the model


In [None]:
train_dataset

In [None]:
EPOCHS = 40
BATCH_SIZE = 32

history = model.fit(train_dataset, epochs=EPOCHS, batch_size=BATCH_SIZE)

## Evaluate Named Entity Recognition model


In [None]:
test_conll_dataset = dataframe_to_conll(testing_dataset)
test_dataset = Dataset.from_pandas(testing_dataset)

def create_data_generator(dataset):
  def data_generator():
    for item in dataset:
      yield item['Word'], item['Tag']

  return data_generator

data_signature= (
        tf.TensorSpec(shape=(None,), dtype=tf.string),
        tf.TensorSpec(shape=(None, ), dtype=tf.int32)
)

test_data = tf.data.Dataset.from_generator(
    create_data_generator(test_dataset),
    output_signature=data_signature
)

test_dataset = (
    test_data.map(dataset_preprocess)
    .padded_batch(batch_size=BATCH_SIZE).cache()
)

In [None]:
model.evaluate(test_dataset)

In [None]:
import numpy as np
predicted_tags_list = []
true_tags_list = []
for i in range(len(testing_dataset)):
    Test_case = testing_dataset.iloc[i]['Word']
    true_tags_list.extend(le.inverse_transform(testing_dataset.iloc[i]['Tag']))
    # Preprocess the test sentence (similar to what you did during training)
    preprocessed_test_sentence = preprecess_tokens(Test_case)

    # Reshape the preprocessed input to match the model's input shape
    input_sequence = np.array(preprocessed_test_sentence)
    input_sequence = np.expand_dims(input_sequence, axis=0)

    # Predict the tags for the test sentence
    predictions = model.predict(input_sequence)

    # Decode the predictions to obtain the predicted tags
    predicted_tags = np.argmax(predictions, axis=-1)[0]

    # Inverse transform the predicted tags to get the original labels
    predicted_tags_list.extend(list(le2.inverse_transform(predicted_tags)))

In [None]:
from sklearn.metrics import classification_report
print(classification_report(true_tags_list, predicted_tags_list))

In [None]:
p = []
t = []
for i in predicted_tags_list:
  if i == 'O':
    p.append(i)
  else:
    p.append(i[2:])

for i in true_tags_list:
  if i == 'O':
    t.append(i)
  else:
    t.append(i[2:])

In [None]:
from sklearn.metrics import classification_report
print(classification_report(t, p))

In [None]:
report = classification_report(t, p, digits=4, output_dict=True)
f1_weighted = report['weighted avg']['f1-score']

recall_weighted = report['weighted avg']['recall']
precision_weighted = report['weighted avg']['precision']

# Print the results

print ('Weighted F1 Score: ', f1_weighted)
print ('Weighted Recall: ', recall_weighted)
print ('Weighted Precision: ', precision_weighted)

In [None]:
report = classification_report(t, p, digits=4, output_dict=True)
# Access the weighted F1 score, recall, and precision
f1_weighted = report['macro avg']['f1-score']

recall_weighted = report['macro avg']['recall']
precision_weighted = report['macro avg']['precision']

# Print the results

print ('Macro F1 Score: ', f1_weighted)
print ('Macro Recall: ', recall_weighted)
print ('Macro Precision: ', precision_weighted)

# Case Study

In [None]:
import numpy as np

# Sample test sentence
test_sentence = "Google has agreed to pay $93 million to settle a lawsuit filed by the U.S. state of California over allegations that the company's location-privacy practices misled consumers and violated consumer protection laws."

preprocessed_test_sentence = preprecess_tokens(test_sentence.split())

# Reshape the preprocessed input to match the model's input shape
input_sequence = np.array(preprocessed_test_sentence)
input_sequence = np.expand_dims(input_sequence, axis=0)

predictions = model.predict(input_sequence)

# Decode the predictions to obtain the predicted tags
predicted_tags = np.argmax(predictions, axis=-1)[0]

# Inverse transform the predicted tags to get the original labels
predicted_tags = list(le2.inverse_transform(predicted_tags))

for token, label in zip(test_sentence.split(), predicted_tags):
    print("{:20}\t{}".format(token, label))
