## Install and import libraries

In [None]:
#!pip install lime
#!pip install elis5
#!pip install shap

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import numpy as np
from tqdm import tqdm, trange
from collections import Counter
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

import tensorflow as tf
from keras.models import Sequential
from keras.utils import pad_sequences
from keras.layers import LSTM, InputLayer, Dense, Embedding, Dropout,SpatialDropout1D, Bidirectional, TimeDistributed, Activation, Masking, Lambda
from keras.callbacks import ModelCheckpoint
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report
from keras.models import load_model
from keras.regularizers import l1, l2
from keras.models import Model
from keras.layers import Input

Mounted at /content/drive


## Load data

In [None]:
data = pd.read_csv("/content/drive/MyDrive/ner_dataset.csv", encoding="latin1").fillna(method="ffill")
data.tail(10)

In [None]:
class SentenceGetter:
    def __init__(self, data):
        self.n_sent = 1
        self.data = data
        self.empty = False
        self.sentences = self.group_sentences(data)

    def group_sentences(self, data):
        agg_func = lambda s: [(w, p, t) for w, p, t in zip(s["Word"].values.tolist(),
                                                           s["POS"].values.tolist(),
                                                           s["Tag"].values.tolist())]
        grouped = data.groupby("Sentence #").apply(agg_func)
        return [s for s in grouped]

    def get_next(self):
        try:
            sentence = self.sentences[self.n_sent - 1]
            self.n_sent += 1
            return sentence
        except IndexError:
            return None

In [None]:
getter = SentenceGetter(data)
sentences = getter.sentences

In [None]:
ner = [[s[2] for s in sent] for sent in sentences]
pos = [[s[1] for s in sent] for sent in sentences]
sentences = [" ".join(s[0] for s in sent) for sent in sentences]

In [None]:
index = 2

def print_sentence_with_pos_and_ner(sentences, pos, ner, index):
    # Print the specified sentence with its POS and NER tags
    print("Sentence: ", sentences[index])
    print("POS: ", pos[index])
    print("NER: ", ner[index])

print_sentence_with_pos_and_ner(sentences, pos, ner, index)

## Data analysis and preprocessing


In [None]:
# Get the length of each sentence
sent_lengths = [len(sent.split()) for sent in sentences]

# Find the index of the shortest and longest sentences
shortest_idx = sent_lengths.index(min(sent_lengths))
longest_idx = sent_lengths.index(max(sent_lengths))

# Print the shortest sentence and its POS and NER tags
print("Najkratšia veta v dátovej množine:")
print_sentence_with_pos_and_ner(sentences, pos, ner, shortest_idx)

# Print the longest sentence and its POS and NER tags
print("Najdlhšia veta v dátovej množine:")
print_sentence_with_pos_and_ner(sentences, pos, ner, longest_idx)

Najkratšia veta v dátovej množine:
Sentence:  ...
POS:  [':']
NER:  ['O']
Najdlhšia veta v dátovej množine:
Sentence:  Fisheries in 2006 - 7 landed 1,26,976 metric tons , of which 82 % ( 1,04,586 tons ) was krill ( Euphausia superba ) and 9.5 % ( 12,027 tons ) Patagonian toothfish ( Dissostichus eleginoides - also known as Chilean sea bass ) , compared to 1,27,910 tons in 2005 - 6 of which 83 % ( 1,06,591 tons ) was krill and 9.7 % ( 12,396 tons ) Patagonian toothfish ( estimated fishing from the area covered by the Convention of the Conservation of Antarctic Marine Living Resources ( CCAMLR ) , which extends slightly beyond the Southern Ocean area ) .
POS:  ['NNS', 'IN', 'CD', ':', 'CD', 'VBD', 'CD', 'JJ', 'NNS', ',', 'IN', 'WDT', 'CD', 'NN', 'LRB', 'CD', 'NNS', 'RRB', 'VBD', 'NN', 'LRB', 'NNP', 'NNP', 'RRB', 'CC', 'CD', 'NN', 'LRB', 'CD', 'NNS', 'RRB', 'JJ', 'NN', 'LRB', 'NNP', 'NNP', ':', 'RB', 'VBN', 'IN', 'JJ', 'NN', 'NN', 'RRB', ',', 'VBN', 'TO', 'CD', 'NNS', 'IN', 'CD', ':', 'CD

In [None]:
# Count the number of words in the shortest sentence
shortest_words = sentences[shortest_idx].split()
num_shortest_words = len(shortest_words)

# Count the number of words in the longest sentence
longest_words = sentences[longest_idx].split()
num_longest_words = len(longest_words)

# Print the results
print("Počet slov v najkratšej vete:", num_shortest_words)
print("Počet slov v najdlhšej vete:", num_longest_words)

Počet slov v najkratšej vete: 1
Počet slov v najdlhšej vete: 104


In [None]:
#identifying 4000 most common words
word_counter = Counter(data["Word"].values)
vocabulary = set([word[0] for word in word_counter.most_common(4000)])
words = list(set(data["Word"].values))
word_count= len(words)

In [None]:
max_len = 50

# word2idx mapping of vocabulary
word2idx = {"PAD": 0, "UNK": 1}
word2idx.update({w: i+2 for i, w in enumerate(vocabulary)})

In [None]:
ner_tags = list(set(data["Tag"].values))

In [None]:
pos_tags = list(set(data["POS"].values))

In [None]:
ner_tag2idx = {ner_tag: i + 1 for i, ner_tag in enumerate(ner_tags)}
ner_tag2idx['<PAD>'] = 0

In [None]:
pos_tag2idx = {pos_tag: i + 1 for i, pos_tag in enumerate(pos_tags)}
pos_tag2idx['<PAD>'] = 0

In [None]:
#convert to numerical values
X = [[word2idx.get(word, word2idx["UNK"]) for word in sentence.split()] for sentence in sentences]
X = pad_sequences(maxlen=max_len, sequences=X, padding="post", value=word2idx["PAD"])

In [None]:
X_seq = pad_sequences(X,maxlen=max_len, padding="post", value=word2idx["PAD"])

In [None]:
y = list(map(lambda z: [ner_tag2idx[z_i] for z_i in z], ner))
y_seq = pad_sequences(y, maxlen=max_len, padding="post", value=ner_tag2idx["O"])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, stratify=y_seq[:,1])

In [None]:
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

X_train shape: (38367, 50)
X_test shape: (9592, 50)
y_train shape: (38367, 50)
y_test shape: (9592, 50)


In [None]:
y_ner_train = to_categorical(y_train, num_classes=len(ner_tag2idx))
y_ner_test = to_categorical(y_test, num_classes=len(ner_tag2idx))

## Build model

In [None]:
#POS Bi-LSTM model2 with regularization:
model = Sequential()
model.add(InputLayer(input_shape=(max_len, )))
model.add(Embedding(input_dim=word_count+1, output_dim=50, input_length=max_len, dtype='float32'))
model.add(Bidirectional(LSTM(128, return_sequences=True,kernel_regularizer=l2(0.0000001), recurrent_regularizer=l1(0.0000001))))
model.add(LSTM(128, return_sequences=True,kernel_regularizer=l2(0.0000001)))
model.add(TimeDistributed(Dense(len(ner_tags)+1, activation='softmax')))
model.compile(optimizer=Adam(learning_rate=0.01), loss='categorical_crossentropy', metrics=['accuracy'], loss_weights=[0.5, 0.5])

In [None]:
model.summary()

In [None]:
tf.keras.utils.plot_model(
    model,
    to_file='model.png',
    show_shapes=True,
    show_dtype=False,
    show_layer_names=True,
    rankdir='TB',
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=True,
    show_trainable=False
)

In [None]:
saved_model = 'ner.hdf5'
checkpoint = ModelCheckpoint(saved_model, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

epochs = 5
batch_size = 128
history = model.fit(X_train, y_ner_train, validation_data=(X_test,y_ner_test), epochs=epochs, batch_size=batch_size,callbacks=[checkpoint])

In [None]:
def trainingProcess(history):
    plt.style.use('ggplot')
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('')
    plt.ylabel('Accuracy')
    plt.xlabel('Epochs')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()
    plt.style.use('ggplot')
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

In [None]:
trainingProcess(history)

## Model evaluation

In [None]:
# Evaluate the model
y_pred_ner_test = model.predict(X_test)
y_pred_ner_test = np.argmax(y_pred_ner_test, axis=-1)

In [None]:
y_pred_ner_test.shape

In [None]:
y_ner_test = np.argmax(y_ner_test, axis=-1)

In [None]:
y_ner_test.shape

In [None]:
# Evaluation Results of Test B Set
print('Evaluation for test set')
print('Accuracy score:', accuracy_score(y_ner_test.flatten(), y_pred_ner_test.flatten()))
print('Precision score:', precision_score(y_ner_test.flatten(), y_pred_ner_test.flatten(), average='macro',labels=np.unique(y_pred_ner_test)))
print('Recall score:', recall_score(y_ner_test.flatten(), y_pred_ner_test.flatten(), average='weighted'))
print('F1 score:', f1_score(y_ner_test.flatten(), y_pred_ner_test.flatten(), average='macro', labels=np.unique(y_pred_ner_test)))
#print('ROC AUC score:', roc_auc_score(y_ner_testb.flatten(), y_pred_ner_testb.flatten(), multi_class='ovr'))
#print('Confusion matrix:\n', confusion_matrix(y_ner_testb.flatten(), y_pred_ner_testb.flatten()))
print('Classification report:\n', classification_report(y_ner_test.flatten(), y_pred_ner_test.flatten(), target_names =ner_tag2idx))

In [None]:
# Classification report
print('Classification report:\n', classification_report(y_ner_test.flatten(), y_pred_ner_test.flatten(), target_names = list(ner_tags)))

In [None]:
import scipy
import numpy as np
def monkeypath_itemfreq(sampler_indices):
   return zip(*np.unique(sampler_indices, return_counts=True))

scipy.stats.itemfreq=monkeypath_itemfreq

In [None]:
from eli5.lime.samplers import MaskingTextSampler   

In [None]:
from lime.lime_text import LimeTextExplainer
 
explainer = LimeTextExplainer(class_names = ner_tags)

exp = explainer.explain_instance(sentences[index], predict_func)

print(exp.as_list())

In [None]:
exp.show_in_notebook(text=True)

In [None]:
exp_set_feature = explainer.explain_instance(sentences[index], predict_func, num_features=9)

print(exp_set_feature.as_list())
exp_set_feature.show_in_notebook(text=True)

In [None]:
index = 46781
ner_idx = ner[index]
sentence_text = sentences[index]
print('Veta bez priradenia pomenovaných entít:')
print(sentence_text)
print('Veta s NER:')
print(" ".join([f"{t} ({l})" for t, l in zip(sentence_text.split(), ner_idx)]))

In [None]:
class NERExplainerGenerator(object):
    
    def __init__(self, model, word2idx, tag2idx, max_len):
        self.model = model
        self.word2idx = word2idx
        self.ner_tag2idx = ner_tag2idx
        self.idx2tag = {v: k for k,v in tag2idx.items()}
        self.max_len = max_len
        
    def _preprocess(self, texts):
        X = [[self.word2idx.get(w, self.word2idx["UNK"]) for w in t.split()]
             for t in texts]
        X = pad_sequences(maxlen=self.max_len, sequences=X,
                          padding="post", value=self.word2idx["PAD"])
        return X
    
    def get_predict_function(self, word_index):
        def predict_func(texts):
            X = self._preprocess(texts)
            p = self.model.predict(X)
            return p[:,word_index,:]
        return predict_func

In [None]:
for i, w in enumerate(sentence_text.split()):
    print(f"{i}: {w}")

In [None]:
explainer_generator = NERExplainerGenerator(model, word2idx, ner_tag2idx, max_len)

In [None]:
word_index = 4
predict_func = explainer_generator.get_predict_function(word_index=word_index)

In [1]:
import numpy as np
from lime.lime_text import LimeTextExplainer
from sklearn.pipeline import make_pipeline
import tensorflow as tf
from keras.models import load_model

model = load_model('ner (1).hdf5')

c = make_pipeline(model)

text = "This is an example sentence."

words = text.split()
explainer = LimeTextExplainer(class_names=ner_tags)


# Function to generate explanations for a word in a sentence
def explain_word(word_index, words, explainer, pipeline):
    modified_sentence = ' '.join(words[:word_index] + ['__TARGET__'] + words[word_index+1:])
    exp = explainer.explain_instance(sentence[index], predict_fun, num_features=word_count, labels=ner_tag2idx)
    return exp.as_list(label=1)

# Generate explanations for each word
explanations = [explain_word(i, words, explainer, c) for i in range(len(words))]

# Aggregate explanations
aggregate_explanations = {}
for explanation in explanations:
    for word, importance in explanation:
        if word not in aggregate_explanations:
            aggregate_explanations[word] = 0
        aggregate_explanations[word] += importance

# Normalize the aggregated importance scores
normalized_importance = {word: importance/len(words) for word, importance in aggregate_explanations.items()}

print(normalized_importance)

ModuleNotFoundError: ignored