In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from gensim.models import KeyedVectors
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, SpatialDropout1D, LSTM, Bidirectional, TimeDistributed, Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.preprocessing.sequence import pad_sequences
import nltk
from collections import Counter
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from gensim.models import KeyedVectors
from huggingface_hub import hf_hub_download
np.random.seed(42)

In [None]:
# Load the dataset
file_path = 'Classified_data.csv'
data = pd.read_csv(file_path, sep=',', usecols=['Text', 'Tag', 'sentence_number', 'Word'])
print(data.head())
print("Data shape:", data.shape)


The ContextNER class encapsulates the preprocessing needed for NER tasks, making it easier to prepare the data for training with neural network models

In [None]:
class ContextNER:
    # Class attributes for storing processed data and mappings
    __X, __y = None, None  # Private attributes to store the raw sentences and tags
    X_array, y_array = None, None  # Arrays to store processed and padded data
    word2idx, idx2word = None, None  # Mappings from words to indices and vice versa
    tag2idx, idx2tag = None, None  # Mappings from tags to indices and vice versa
    y_array_normal = None  # Store non-categorical y array

    def __init__(self, df, all_Words, max_len=None):
        # Constructor to initialize the object with the dataframe and all unique words
        self.__df = df  
        self.all_words = set(all_words)  
        self.all_tags = set(df.Tag.values)  
        self.sentences = self.__build_sentences()  
        self.num_words = len(self.all_words) + 2 
        self.num_tags = len(self.all_tags) + 1 
        self.max_len = max_len if max_len else self._get_maxlen() 
        self.__build_Xy()  
        self.__build_parsers()  
        self.__parser_arrays() 

    def _get_maxlen(self):
        # Calculate the maximum sentence length
        return max([len(x) for x in self.sentences]) + 1

    def __build_sentences(self):
        # Extract sentences as lists of (word, tag) tuples from the dataframe
        return [x for x in self.__df.groupby('Word').apply(
            lambda xdef: [x for x in zip(
                xdef.Word.values,
                xdef.Tag.values
            )]
        )]

    def __build_Xy(self):
        # Prepare the X and y data matrices by extracting words and tags
        self.__X = [[word for word, __ in value] for value in self.sentences]
        self.__y = [[tag for __, tag in value] for value in self.sentences]

    def __build_parsers(self):
        # Create mappings from words and tags to indices and vice versa
        self.word2idx = {value: idx + 2 for idx, value in enumerate(self.all_words)}
        self.word2idx["UNK"] = 1  # Unknown words
        self.word2idx["PAD"] = 0  # Padding
        self.idx2word = {idx: value for value, idx in self.word2idx.items()}
        self.tag2idx = {value: idx + 1 for idx, value in enumerate(self.all_tags)}
        self.tag2idx["PAD"] = 0  # Padding for tags
        self.idx2tag = {idx: value for value, idx in self.tag2idx.items()}

    def parser2categorical(self, y_pred, y_true):
        # Convert predictions and true values from indices to tags
        pred_tag = [[self.idx2tag[idx] for idx in row] for row in y_pred]
        y_true_tag = [[self.idx2tag[idx] for idx in row] for row in y_true]
        return pred_tag, y_true_tag

    def __parser_arrays(self):
        # Convert word sequences to index sequences and pad them
        tmp_X = [[self.word2idx[index] for index in value] for value in self.__X]
        tmp_y = [[self.tag2idx[index] for index in value] for value in self.__y]
        self.X_array = pad_sequences(maxlen=self.max_len, sequences=tmp_X, padding="post", value=0)
        y_pad = pad_sequences(maxlen=self.max_len, sequences=tmp_y, padding="post", value=0)
        self.y_array_normal = y_pad
        self.y_array = np.array([to_categorical(index, num_classes=self.num_tags, dtype='int8') for index in y_pad])

all_words = data['Word'].tolist()  # Extract all words
ner_aux = ContextNER(data, data.Word.to_list())



In [None]:
ner_aux.tag2idx

In [None]:
ner_aux.idx2tag

In [None]:
ner_aux.word2idx

In [None]:
data['Tag'].value_counts()

## Train-Test split

In [None]:
# Preparing the data for training
X_TRAIN, X_TEST, Y_TRAIN, Y_TEST = train_test_split(
    ner_aux.X_array, ner_aux.y_array_normal, test_size=0.3, random_state=42
)
print(X_TRAIN.shape, Y_TRAIN.shape, X_TEST.shape, Y_TEST.shape)

In [None]:
# Load the pre-trained word embeddings
word_vectors = KeyedVectors.load_word2vec_format(
    hf_hub_download(repo_id="Word2vec/german_model", filename="german.model"), 
    binary=True, 
    unicode_errors="ignore"
)

In [None]:
# # Hyperparameters
EMBEDDING_DIM = word_vectors.vector_size
HIDDEN_DIM = 64  # This can be adjusted
OUTPUT_DIM = len(ner_aux.tag2idx)  # Number of tags

# Create a weight matrix for words in training docs
embedding_matrix = np.zeros((len(ner_aux.word2idx), EMBEDDING_DIM))
for word, i in ner_aux.word2idx.items():
    try:
        embedding_vector = word_vectors[word]
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
    except:
        pass  # If word not in Word2Vec, it remains as a vector of zeros


In [None]:
# Define the model architecture
model = Sequential()
model.add(Embedding(input_dim=len(ner_aux.word2idx),
                    output_dim=EMBEDDING_DIM,
                    weights=[embedding_matrix],
                    input_length=ner_aux.max_len,
                    trainable=True))
model.add(SpatialDropout1D(0.1))
model.add(Bidirectional(LSTM(units=50, return_sequences=True)))
model.add(TimeDistributed(Dense(len(ner_aux.tag2idx), activation="softmax")))

# Compile the model
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy'])



In [None]:
# Convert labels to categorical
Y_TRAIN = pad_sequences(Y_TRAIN, maxlen=ner_aux.max_len, padding='post')
Y_TEST = pad_sequences(Y_TEST, maxlen=ner_aux.max_len, padding='post')


In [None]:
# For monitoring validation loss
es = EarlyStopping(monitor='val_loss', mode='min', patience=5, verbose=1)
mc = ModelCheckpoint('best_Word2Vec_model.h5', monitor='val_loss', mode='min', save_best_only=True, verbose=1)


In [None]:
# Train the model
history = model.fit(
    X_TRAIN, Y_TRAIN,
    validation_data=(X_TEST, Y_TEST),
    batch_size=64,
    epochs=50,
    verbose=1,
    callbacks=[es, mc]
)

In [None]:
# Access the history to get training and validation loss and accuracy
train_losses = history.history['loss']
train_accuracies = history.history['accuracy']
val_losses = history.history['val_loss']
val_accuracies = history.history['val_accuracy']

# Plot training & validation loss values
plt.plot(train_losses)
plt.plot(val_losses)
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
# Plot the training and validation accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# Predict and flatten the output
predictions = model.predict(X_TEST)
y_pred_flat = np.argmax(predictions, axis=-1).flatten()
y_test_flat = Y_TEST.flatten()

# Filter out 'PAD' tokens for the classification report
non_pad_elements = y_test_flat != ner_aux.tag2idx['PAD']  # Assuming 'PAD' is mapped to 0
y_pred_flat = y_pred_flat[non_pad_elements]
y_test_flat = y_test_flat[non_pad_elements]

# Convert the index-based predictions and true values to their actual tags
y_pred_tags = [ner_aux.idx2tag[i] for i in y_pred_flat]
y_test_tags = [ner_aux.idx2tag[i] for i in y_test_flat]

# Print classification report
print(classification_report(y_test_tags, y_pred_tags))

In [None]:
from sklearn.metrics import confusion_matrix

# Generate the confusion matrix
cm = confusion_matrix(y_test_tags, y_pred_tags, labels=list(ner_aux.tag2idx.keys()))

# Create a DataFrame from the confusion matrix for better labeling in the heatmap
cm_df = pd.DataFrame(cm, index=ner_aux.tag2idx.keys(), columns=ner_aux.tag2idx.keys())

# Plot the heatmap
plt.figure(figsize=(12, 12))
sns.heatmap(cm_df, annot=True, fmt='g')
plt.title('Confusion Matrix for NER Entities')
plt.ylabel('Actual Labels')
plt.xlabel('Predicted Labels')
plt.show()