In [None]:
!pip install seqeval

In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import Sequence, to_categorical

from seqeval.metrics import accuracy_score, classification_report, f1_score

import tensorflow as tf
import tensorflow.keras.backend as K
import tensorflow.keras.layers as L
from tensorflow_addons.text import crf_log_likelihood, crf_decode

In [None]:
data = pd.read_csv('../input/entity-annotated-corpus/ner_dataset.csv', sep=",", encoding="latin1").fillna(method='ffill')

In [None]:
class ContextNER:

    def __init__(self, df):

        self.__df = df

        self.all_words = set(df.Word.values)
        self.all_tags = set(df.Tag.values)

        self.num_words = len(self.all_words) + 2
        self.num_tags = len(self.all_tags) + 1

        self.sentences = self.__build_sentences()
        self.max_len = self.__get_maxlen()

        self.__build_Xy()
        self.__build_parsers()
        self.__parser_arrays()

    def __get_maxlen(self):
        return max([len(x) for x in self.sentences]) + 1

    def __build_sentences(self):

        return [x for x in self.__df.groupby('Sentence #').apply(
            lambda xdef: [x for x in zip(
                xdef.Word.values,
                xdef.Tag.values
            )]
        )]

    def __build_Xy(self):

        self.__X = [[word for word, __ in value] for value in self.sentences]
        self.__y = [[tag for __, tag in value] for value in self.sentences]

    def __build_parsers(self):

        self.word2idx = {value: idx + 2 for idx,
                         value in enumerate(self.all_words)}
        
        self.word2idx["PAD"] = 0  # Padding - Preenchimento
        self.word2idx["UNK"] = 1  # Palavras Desconhecidas
        

        # Converte um index em Word
        self.idx2word = {idx: value for value, idx in self.word2idx.items()}

        # Converte Tag em ìndice
        self.tag2idx = {value: idx + 1 for idx,
                        value in enumerate(self.all_tags)}
        self.tag2idx["PAD"] = 0  # Padding - Preenchimento

        # Converte index em Tag
        self.idx2tag = {idx: value for value, idx in self.tag2idx.items()}

    def parser2categorical(self, y_pred, y_true):

        pred_tag = [[self.idx2tag[idx] for idx in row] 
                    for row in y_pred]
        
        y_true_tag = [[self.idx2tag[idx] for idx in row] 
                      for row in y_true]

        return pred_tag, y_true_tag

    def __parser_arrays(self):

        tmp_X = [[self.word2idx[index] for index in value]
                 for value in self.__X]
        
        tmp_y = [[self.tag2idx[index] for index in value]
                 for value in self.__y]

        self.X_array = pad_sequences(maxlen=self.max_len,
                                     sequences=tmp_X,
                                     padding="post",
                                     value=0)

        y_pad = pad_sequences(maxlen=self.max_len,
                              sequences=tmp_y,
                              padding="post",
                              value=0)

        self.y_array_normal = y_pad
        self.y_array = np.array(
            [to_categorical(index, num_classes=self.num_tags) for index in y_pad])

In [None]:
# https://github.com/IntelLabs/nlp-architect/blob/5fd26cfb16739aee2f53fe1e913a12a09c8d0404/nlp_architect/nn/tensorflow/python/keras/layers/crf.py#L19

class CRF(L.Layer):
    def __init__(self,
                 output_dim,
                 sparse_target=True,
                 **kwargs):
        """    
        Args:
            output_dim (int): the number of labels to tag each temporal input.
            sparse_target (bool): whether the the ground-truth label represented in one-hot.
        Input shape:
            (batch_size, sentence length, output_dim)
        Output shape:
            (batch_size, sentence length, output_dim)
        """
        super(CRF, self).__init__(**kwargs)
        self.output_dim = int(output_dim) 
        self.sparse_target = sparse_target
        self.input_spec = L.InputSpec(min_ndim=3)
        self.supports_masking = False
        self.sequence_lengths = None
        self.transitions = None

    def build(self, input_shape):
        assert len(input_shape) == 3
        f_shape = tf.TensorShape(input_shape)
        input_spec = L.InputSpec(min_ndim=3, axes={-1: f_shape[-1]})

        if f_shape[-1] is None:
            raise ValueError('The last dimension of the inputs to `CRF` '
                             'should be defined. Found `None`.')
        if f_shape[-1] != self.output_dim:
            raise ValueError('The last dimension of the input shape must be equal to output'
                             ' shape. Use a linear layer if needed.')
        self.input_spec = input_spec
        self.transitions = self.add_weight(name='transitions',
                                           shape=[self.output_dim, self.output_dim],
                                           initializer='glorot_uniform',
                                           trainable=True)
        self.built = True

    def compute_mask(self, inputs, mask=None):
        # Just pass the received mask from previous layer, to the next layer or
        # manipulate it if this layer changes the shape of the input
        return mask

    def call(self, inputs, sequence_lengths=None, training=None, **kwargs):
        sequences = tf.convert_to_tensor(inputs, dtype=self.dtype)
        if sequence_lengths is not None:
            assert len(sequence_lengths.shape) == 2
            assert tf.convert_to_tensor(sequence_lengths).dtype == 'int32'
            seq_len_shape = tf.convert_to_tensor(sequence_lengths).get_shape().as_list()
            assert seq_len_shape[1] == 1
            self.sequence_lengths = K.flatten(sequence_lengths)
        else:
            self.sequence_lengths = tf.ones(tf.shape(inputs)[0], dtype=tf.int32) * (
                tf.shape(inputs)[1]
            )

        viterbi_sequence, _ = crf_decode(sequences,
                                         self.transitions,
                                         self.sequence_lengths)
        output = K.one_hot(viterbi_sequence, self.output_dim)
        return K.in_train_phase(sequences, output)

    @property
    def loss(self):
        def crf_loss(y_true, y_pred):
            y_pred = tf.convert_to_tensor(y_pred, dtype=self.dtype)
            log_likelihood, self.transitions = crf_log_likelihood(
                y_pred,
                tf.cast(K.argmax(y_true), dtype=tf.int32) if self.sparse_target else y_true,
                self.sequence_lengths,
                transition_params=self.transitions,
            )
            return tf.reduce_mean(-log_likelihood)
        return crf_loss

    @property
    def accuracy(self):
        def viterbi_accuracy(y_true, y_pred):
            # -1e10 to avoid zero at sum(mask)
            mask = K.cast(
                K.all(K.greater(y_pred, -1e10), axis=2), K.floatx())
            shape = tf.shape(y_pred)
            sequence_lengths = tf.ones(shape[0], dtype=tf.int32) * (shape[1])
            y_pred, _ = crf_decode(y_pred, self.transitions, sequence_lengths)
            if self.sparse_target:
                y_true = K.argmax(y_true, 2)
            y_pred = K.cast(y_pred, 'int32')
            y_true = K.cast(y_true, 'int32')
            corrects = K.cast(K.equal(y_true, y_pred), K.floatx())
            return K.sum(corrects * mask) / K.sum(mask)
        return viterbi_accuracy

    def compute_output_shape(self, input_shape):
        tf.TensorShape(input_shape).assert_has_rank(3)
        return input_shape[:2] + (self.output_dim,)

    def get_config(self):
        config = {
            'output_dim': self.output_dim,
            'sparse_target': self.sparse_target,
            'supports_masking': self.supports_masking,
            'transitions': K.eval(self.transitions)
        }
        base_config = super(CRF, self).get_config()
        return dict(base_config, **config)

In [None]:
data_ner = ContextNER(data)

In [None]:
X_TRAIN, X_TEST, Y_TRAIN, Y_TEST  = train_test_split(data_ner.X_array,
                                                     data_ner.y_array, 
                                                     test_size=0.3)

In [None]:
sequence_input = tf.keras.layers.Input(shape=(data_ner.max_len,),
                                       dtype=tf.int32, 
                                       name='sequence_input')

model_embedding = tf.keras.layers.Embedding(input_dim=data_ner.num_words,
                                            output_dim=data_ner.max_len,
                                            input_length=data_ner.max_len)(sequence_input)

model_bilstm = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=64,
                                                                  return_sequences=True,
                                                                  recurrent_dropout=0.1))(model_embedding)

model_dropout = tf.keras.layers.TimeDistributed(tf.keras.layers.Dropout(0.3))(model_bilstm)

model_dense = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(data_ner.num_tags, 
                                                                    activation='relu'))(model_dropout)
model_dense = tf.keras.layers.Dense(data_ner.num_tags, activation='relu')(model_dropout)

crf_out = CRF(data_ner.num_tags, sparse_target=True)

model = tf.keras.Model(inputs=sequence_input, outputs=model_dense)

model.compile(optimizer=tf.keras.optimizers.Adam(0.0001),
              loss='categorical_crossentropy', metrics=['accuracy']
             )

model.summary()

In [None]:
%%time

History = model.fit(X_TRAIN,
                    Y_TRAIN,
                    validation_split=0.1,
                    batch_size=64, 
                    epochs=15, 
                    shuffle=True)

In [None]:
y_pred, y_true = \
np.argmax(model.predict(X_TEST, verbose=1, batch_size=64), axis=-1), \
np.argmax(Y_TEST, -1)

pred_tag, true_tag = \
data_ner.parser2categorical(y_pred, y_true) 

In [None]:
f1_score(pred_tag, true_tag)

In [None]:
print(classification_report(pred_tag, true_tag))