In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.metrics import precision_score, accuracy_score, recall_score, f1_score, confusion_matrix
import pandas as pd
import seaborn as sns

import preprocess_utils as prep

In [2]:
VOCAB_SIZE = 20_000
MAX_SEQUENCE_LENGTH = 30

def to_lower(tokens):
    lower_tokens = []
    for sentence in tokens:
        lower_sentence = []
        for word in sentence:
            lower_sentence.append(word.lower())
        lower_tokens.append(lower_sentence)
    return lower_tokens

In [3]:
dir_path = os.path.join("datasets", "conll2003")

train_data = prep.read_iob_file(os.path.join(dir_path, "train.txt"))
valid_data = prep.read_iob_file(os.path.join(dir_path, "valid.txt"))
test_data = prep.read_iob_file(os.path.join(dir_path, "test.txt"))

train_data["tokens"] = to_lower(train_data["tokens"])
x_train, vocab = prep.preprocess_tokens(train_data["tokens"], VOCAB_SIZE, MAX_SEQUENCE_LENGTH)
y_train, class_names = prep.preprocess_entity_tags(train_data["entity_tags"], MAX_SEQUENCE_LENGTH)

valid_data["tokens"] = to_lower(valid_data["tokens"])
x_valid, _ = prep.preprocess_tokens(valid_data["tokens"], VOCAB_SIZE, MAX_SEQUENCE_LENGTH)
y_valid, _ = prep.preprocess_entity_tags(valid_data["entity_tags"], MAX_SEQUENCE_LENGTH)

test_data["tokens"] = to_lower(test_data["tokens"])
x_test, _ = prep.preprocess_tokens(test_data["tokens"], VOCAB_SIZE, MAX_SEQUENCE_LENGTH)
y_test, _ = prep.preprocess_entity_tags(test_data["entity_tags"], MAX_SEQUENCE_LENGTH)

In [4]:
batch_size = 32

train_dataset = (tf.data.Dataset
                 .from_tensor_slices((x_train, y_train))
                 .batch(batch_size))

valid_dataset = (tf.data.Dataset
                 .from_tensor_slices((x_valid, y_valid))
                 .batch(batch_size))

test_dataset = (tf.data.Dataset
                .from_tensor_slices((x_test, y_test))
                .batch(batch_size))

In [5]:
print(f"Train examples:      {x_train.shape[0]}, vocabulary length: {len(vocab)}, classes: {len(class_names)}")
print(f"Validation examples: {x_valid.shape[0]}")
print(f"Test examples:       {x_test.shape[0]}")
print(f"Labels:              {class_names}")

Train examples:      14041, vocabulary length: 20000, classes: 9
Validation examples: 3250
Test examples:       3453
Labels:              ['B-LOC', 'B-MISC', 'B-ORG', 'B-PER', 'I-LOC', 'I-MISC', 'I-ORG', 'I-PER', 'O']


In [6]:
print(train_data["tokens"][0])
print(train_data["entity_tags"][0])

['eu', 'rejects', 'german', 'call', 'to', 'boycott', 'british', 'lamb', '.']
['B-ORG', 'O', 'B-MISC', 'O', 'O', 'O', 'B-MISC', 'O', 'O']


In [7]:
print(x_train[0])
print(y_train[0])

[  989 10951   205   629     7  3939   216  5774     3     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0]
[3 9 2 9 9 9 2 9 9 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [8]:
# vocabulary of how the tokens were mapped to integers
# [PAD] is for padding when the sequence of words is less than MAXMAX_SEQUENCE_LENGTH
# [UNK] is for unknown words (words that are not in the vocabulary)
for k, v in list(vocab.items())[:10]:
    print(f"{k} : {v}")

[PAD] : 0
[UNK] : 1
the : 2
. : 3
, : 4
of : 5
in : 6
to : 7
a : 8
and : 9


In [9]:
units = 256
embedding_dim = 256

inputs = layers.Input(shape=(None,))
x = layers.Embedding(VOCAB_SIZE, embedding_dim, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(units,
                                     return_sequences=True,
                                     kernel_regularizer=tf.keras.regularizers.L2(0.001),
                                     dropout=0.1))(x)
x = layers.Bidirectional(layers.LSTM(units,
                                     return_sequences=True,
                                     kernel_regularizer=tf.keras.regularizers.L2(0.001),
                                     dropout=0.1))(x)
x = layers.Bidirectional(layers.LSTM(units,
                                     return_sequences=True,
                                     kernel_regularizer=tf.keras.regularizers.L2(0.001),
                                     dropout=0.1))(x)
outputs = layers.TimeDistributed(layers.Dense(len(class_names) + 1))(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None)]            0         
                                                                 
 embedding (Embedding)       (None, None, 256)         5120000   
                                                                 
 bidirectional (Bidirectiona  (None, None, 512)        1050624   
 l)                                                              
                                                                 
 bidirectional_1 (Bidirectio  (None, None, 512)        1574912   
 nal)                                                            
                                                                 
 bidirectional_2 (Bidirectio  (None, None, 512)        1574912   
 nal)                                                            
                                                             

In [10]:
class IgnorePaddingSparseCategoricalCrossentropy(tf.keras.losses.Loss):
    def __init__(self, from_logits=False, weight_class=None):
        super(IgnorePaddingSparseCategoricalCrossentropy, self).__init__()
        self.from_logits = from_logits
        self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=self.from_logits,
            reduction=tf.keras.losses.Reduction.NONE
        )
        self.weight_class = weight_class
    
    def call(self, y_true, y_pred, class_weight=None):
        loss = self.loss_fn(y_true, y_pred)
        mask = tf.cast(tf.not_equal(y_true, 0), dtype=tf.dtypes.float32)
        if self.weight_class is not None:
            weights = tf.gather(self.weight_class, y_true)
            result = mask * loss * weights
        else:
            result = mask * loss
        return tf.reduce_sum(result) / tf.reduce_sum(mask)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "from_logits": self.from_logits
        })
        return config

In [11]:
class IgnorePaddingSparseCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self):
        super(IgnorePaddingSparseCategoricalAccuracy, self).__init__(name="accuracy")
        self.total = self.add_weight(name="total", initializer="zeros")
        self.count = self.add_weight(name="count", initializer="zeros")
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        labels = tf.math.argmax(y_pred, axis=2)
        mask = tf.not_equal(y_true, 0)
        correct_predictions = tf.equal(y_true, labels)
        correct_predictions = tf.cast(tf.logical_and(mask, correct_predictions),
                                      dtype=tf.dtypes.float32)
        total_labels = tf.cast(mask, dtype=tf.dtypes.float32)
        self.count.assign_add(tf.reduce_sum(correct_predictions))
        self.total.assign_add(tf.reduce_sum(total_labels))
    
    def result(self):
        return self.count / self.total
    
    def reset_state(self):
        self.total.assign(0.0)
        self.count.assign(0.0)
    
    def get_config(self):
        config = super().get_config()
        return config

In [12]:
[layer.supports_masking for layer in model.layers]

[True, True, True, True, True, True]

In [13]:
class_names

['B-LOC', 'B-MISC', 'B-ORG', 'B-PER', 'I-LOC', 'I-MISC', 'I-ORG', 'I-PER', 'O']

In [14]:
class_weight_dict = {
    0: 1.0,
    1: 5.0,
    2: 5.0,
    3: 5.0,
    4: 5.0,
    5: 5.0,
    6: 5.0,
    7: 5.0,
    8: 5.0,
    9: 1.0
}

class_weight = tf.constant(list(class_weight_dict.values()))

In [15]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-6),
    loss=IgnorePaddingSparseCategoricalCrossentropy(from_logits=True, weight_class=class_weight),
    metrics=[IgnorePaddingSparseCategoricalAccuracy()]
)

# model.compile(
#     optimizer=tf.keras.optimizers.Adam(1e-5),
#     loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
#     metrics=["accuracy"]
# )

In [None]:
epochs = 20

history = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=valid_dataset,
    class_weight=class_weight_dict
)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20

In [None]:
metrics = model.evaluate(test_dataset)

for metric_name, metric in zip(model.metrics_names, metrics):
    print(f"{metric_name:>7s} {metric:4f}")

In [None]:
fig, ax = plt.subplots(figsize=(10, 6), constrained_layout=True)
ax.plot(history.history["loss"], label="Training loss")
ax.plot(history.history["val_loss"], label="Validation loss")
ax.set_xlabel("Epcohs")
ax.set_ylabel("Loss")
ax.set_xticks(np.arange(len(history.history["loss"])))
ax.legend()
# plt.savefig("loss.png")

In [None]:
fig, ax = plt.subplots(figsize=(10, 6), constrained_layout=True)
ax.plot(history.history["accuracy"], label="Training accuracy")
ax.plot(history.history["val_accuracy"], label="Validation accuracy")
ax.set_xlabel("Epcohs")
ax.set_ylabel("Accuracy")
ax.set_xticks(np.arange(len(history.history["accuracy"])))
ax.legend()
# fig.savefig("accuracy.png")

In [None]:
y_true = []
for tokens, labels in test_dataset:
    y_true.append(labels.numpy())
y_true = np.concatenate(y_true, axis=0)

predictions = model.predict(test_dataset)
y_pred = np.argmax(predictions, axis=2)

y_pred_raveled = y_pred[y_true != 0]
y_true_raveled = y_true[y_true != 0]

accuracy = accuracy_score(y_true_raveled, y_pred_raveled)
precision = precision_score(y_true_raveled, y_pred_raveled, average="macro")
recall = recall_score(y_true_raveled, y_pred_raveled, average="macro")
f1score = f1_score(y_true_raveled, y_pred_raveled, average="macro")
print(f"accuracy : {accuracy:.4f}")
print(f"precision: {precision:.4f}")
print(f"recall   : {recall:.4f}")
print(f"F1       : {f1score:.4f}")

In [None]:
matrix = confusion_matrix(y_true_raveled, y_pred_raveled)
df = pd.DataFrame(matrix, index=class_names, columns=class_names)
plt.figure(figsize=(10, 7))
sns.heatmap(df, annot=True, fmt="d")
# plt.savefig("confusion_matrix.png")

In [None]:
y_true = []
for tokens, labels in train_dataset:
    y_true.append(labels.numpy())
y_true = np.concatenate(y_true, axis=0)

predictions = model.predict(train_dataset)
y_pred = np.argmax(predictions, axis=2)

y_pred_raveled = y_pred[y_true != 0]
y_true_raveled = y_true[y_true != 0]

matrix = confusion_matrix(y_true_raveled, y_pred_raveled)
df = pd.DataFrame(matrix, index=class_names, columns=class_names)
plt.figure(figsize=(10, 7))
sns.heatmap(df, annot=True, fmt="d")

In [None]:
# At prediction time it is necessary to work on a list of strings
class PreprocessTextLayer(layers.Layer):
    def __init__(self, vocabulary, max_sequence):
        super(PreprocessTextLayer, self).__init__()
        self.text_vectorization_layer = layers.TextVectorization(
            vocabulary=vocabulary,
            standardize="lower"
        )
        self.max_sequence = max_sequence
        
    def call(self, inputs):
        transformed_inputs = self.text_vectorization_layer(inputs)
        n = transformed_inputs.shape[1]
        if n > self.max_sequence:
            transformed_inputs = transformed_inputs[:, self.max_sequence]
        else:
            transformed_inputs = tf.pad(transformed_inputs, [[0, 0], [0, self.max_sequence - n]])
        return transformed_inputs
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "vocabulary": self.vocabulary,
            "max_sequence": self.max_sequence
        })
        return config