In [None]:
import os
import numpy as np
from numpy.random import seed
seed(42)
rng = np.random.RandomState(42)
import tensorflow
tensorflow.random.set_seed(42)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip

In [None]:
!unzip glove.6B.zip -d glove

In [None]:
!pip install keras -U

In [None]:
!pip install tensorflow-addons tensorflow-determinism

In [None]:
import pandas as pd
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from tensorflow_addons.optimizers import AdamW
from tensorflow.keras.layers import Layer, Embedding, Input, Dropout, Bidirectional, LSTM, Flatten, Dense
#from tensorflow.compat.v1.keras.layers import CuDNNLSTM
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, matthews_corrcoef, f1_score, precision_score, recall_score, balanced_accuracy_score
from imblearn.metrics import specificity_score

In [None]:
data = pd.read_csv('../input/financialphrasesemevalfiqa/title-sentiment.csv',encoding='latin-1')

In [None]:
data

In [None]:
data['sentiment'].value_counts()

In [None]:
data.title = data.title.astype(str)
data.sentiment = data.sentiment.astype(str)

In [None]:
data['title'] = data['title'].str.replace(r'[^\w\s]+', '')
data['title'] = data['title'].str.replace('\s+', ' ', regex=True)

In [None]:
data['seq_length'] = data.title.apply(lambda x: len(x))

In [None]:
data.seq_length.hist()

In [None]:
data['title'] = data['title'].str.lower()

In [None]:
X = data['title'].to_numpy()

In [None]:
MAX_NB_WORDS = 12697
MAX_SEQUENCE_LENGTH = 225

In [None]:
tokenizer = Tokenizer(num_words=MAX_NB_WORDS,split=' ')
tokenizer.fit_on_texts(X)

In [None]:
X = tokenizer.texts_to_sequences(X)
X = pad_sequences(X, maxlen=MAX_SEQUENCE_LENGTH)

In [None]:
word_index = tokenizer.word_index
print('%s unique tokens.' % len(word_index))

In [None]:
encoder = LabelEncoder()
Y = encoder.fit_transform(data['sentiment'])

In [None]:
encoder.classes_

In [None]:
embeddings_index = dict()
f = open('./glove/glove.6B.300d.txt')
for line in f:
	values = line.split()
	word = values[0]
	coefs = np.asarray(values[1:], dtype='float32')
	embeddings_index[word] = coefs
f.close()

In [None]:
print('Total: %s word vectors.' % len(embeddings_index))

In [None]:
vocab_size = len(tokenizer.word_index) + 1
print (vocab_size)

In [None]:
# create weight matrix
embedding_matrix = np.zeros((vocab_size, 300))
for word, i in tokenizer.word_index.items():
	embedding_vector = embeddings_index.get(word)
	if embedding_vector is not None:
		embedding_matrix[i] = embedding_vector

In [None]:
embedding_matrix

In [None]:
embedding_matrix.shape

In [None]:
embedding_layer = Embedding(vocab_size, 300, weights=[embedding_matrix], input_length=(MAX_SEQUENCE_LENGTH,), trainable=False)

In [None]:
custom_adam = AdamW(weight_decay=0.0,learning_rate=1e-5, epsilon=1e-8)

In [None]:
import tensorflow.keras.backend as K
from tensorflow.keras import regularizers, constraints, initializers

def dot_product(x, kernel):
    """
    Wrapper for dot product operation, in order to be compatible with both
    Theano and Tensorflow
    Args:
        x (): input
        kernel (): weights
    Returns:
    """
    if K.backend() == 'tensorflow':
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)


class AttentionWithContext(Layer):
    """
    Attention operation, with a context/query vector, for temporal data.
    Supports Masking.
    Follows the work of Yang et al. [https://www.cc.gatech.edu/~dyang888/docs/naacl16.pdf]
    "Hierarchical Attention Networks for Document Classification"
    by using a context vector to assist the attention
    # Input shape
        3D tensor with shape: `(samples, steps, features)`.
    # Output shape
        2D tensor with shape: `(samples, features)`.
    How to use:
    Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
    The dimensions are inferred based on the output shape of the RNN.
    Note: The layer has been tested with Keras 2.0.6
    Example:
        model.add(LSTM(64, return_sequences=True))
        model.add(AttentionWithContext())
        # next add a Dense layer (for classification/regression) or whatever...
    """

    def __init__(self,
                 W_regularizer=None, u_regularizer=None, b_regularizer=None,
                 W_constraint=None, u_constraint=None, b_constraint=None,
                 bias=True, **kwargs):

        self.supports_masking = True
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.u_regularizer = regularizers.get(u_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.u_constraint = constraints.get(u_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        super(AttentionWithContext, self).__init__(**kwargs)

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[-1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)

        self.u = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_u'.format(self.name),
                                 regularizer=self.u_regularizer,
                                 constraint=self.u_constraint)

        super(AttentionWithContext, self).build(input_shape)

    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None

    def call(self, x, mask=None):
        uit = dot_product(x, self.W)

        if self.bias:
            uit += self.b

        uit = K.tanh(uit)
        ait = dot_product(uit, self.u)

        a = K.exp(ait)

        # apply mask after the exp. will be re-normalized next
        if mask is not None:
            # Cast the mask to floatX to avoid float64 upcasting in theano
            a *= K.cast(mask, K.floatx())

        # in some cases especially in the early stages of training the sum may be almost zero
        # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
        # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        return K.sum(weighted_input, axis=1)

    def compute_output_shape(self, input_shape):
        return input_shape[0], input_shape[-1]

In [None]:
def bilstm_model(input_shape):
  X_indices = Input(input_shape)
  embeddings = embedding_layer(X_indices)
  #X = Dropout(0.5)(embeddings)
  X = Bidirectional(LSTM(100, return_sequences=True))(embeddings)
  #X = Bidirectional(LSTM(100, return_sequences=False))(embeddings) #Sem Attention
  X = AttentionWithContext()(X)
  X = Dense(3, activation='softmax')(X)
  model = Model(inputs=X_indices, outputs=X)
  
  model.summary()
  plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)
    
  return model

In [None]:
bilstm_mcc = []
bilstm_f1 = []
bilstm_precision = []
bilstm_recall = []
bilstm_bacc = []
bilstm_spec = []

fold = 1

skf = StratifiedKFold(n_splits=10, random_state=rng, shuffle=True)
for train_index, test_index in skf.split(X, Y):
    model_bilstm = bilstm_model((MAX_SEQUENCE_LENGTH,))
    model_bilstm.compile(optimizer=custom_adam,loss='sparse_categorical_crossentropy',metrics=['acc'])

    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = Y[train_index], Y[test_index]
    
    class_weights = compute_class_weight('balanced', np.unique(y_train), y_train)
    weight = {i : class_weights[i] for i in range(3)}
    
    model_bilstm.fit(X_train,y_train,epochs=10,verbose=1,batch_size=32, class_weight=weight)

    y_pred = model_bilstm.predict(X_test, batch_size=32)
    preds = np.argmax(y_pred, axis = 1)
    
    cnf_mtx = confusion_matrix(y_test, preds)
    print("Fold #%i Confusion Matrix:" % fold)
    print(cnf_mtx)
    
    bilstm_mcc.append(matthews_corrcoef(y_test, preds))
    bilstm_f1.append(f1_score(y_test, preds, average='weighted'))
    bilstm_precision.append(precision_score(y_test, preds, average='weighted'))
    bilstm_recall.append(recall_score(y_test, preds, average='weighted'))
    bilstm_bacc.append(balanced_accuracy_score(y_test, preds))
    bilstm_spec.append(specificity_score(y_test, preds, average='weighted'))
    fold += 1

In [None]:
print(f"Mean-MCC: {sum(bilstm_mcc) / len(bilstm_mcc):.4f}")

In [None]:
print(f"Mean-F1: {sum(bilstm_f1) / len(bilstm_f1):.4f}")

In [None]:
print(f"Mean-Precision: {sum(bilstm_precision) / len(bilstm_precision):.4f}")

In [None]:
print(f"Mean-Recall: {sum(bilstm_recall) / len(bilstm_recall):.4f}")

In [None]:
print(f"Mean-BACC: {sum(bilstm_bacc) / len(bilstm_bacc):.4f}")

In [None]:
print(f"Mean-Specificity: {sum(bilstm_spec) / len(bilstm_spec):.4f}")