In [None]:
# Import libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import itertools

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
from sklearn.utils import resample, shuffle

from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils.np_utils import to_categorical

from keras.models import Sequential
from keras.layers import Embedding, Dropout, Bidirectional, LSTM, GlobalMaxPool1D, Dense

import gensim

In [None]:
# Set GPU
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" 
os.environ["CUDA_VISIBLE_DEVICES"] = ""

In [None]:
## Load dataset
df = pd.read_table('FinancialPhraseBank-v1.0/Sentences_50Agree.txt', delimiter='\r\n')
values = np.array([df.values[i][0].split('@') for i in range(df.size)])
data = pd.DataFrame({'sentence':values[:, 0], 'sentiment':values[:, 1]})

# Show first rows
data.head()

In [None]:
# Visualize the target distribution
ax = data['sentiment'].value_counts().plot(kind='bar')
ax.set_ylabel('Percentage of sentiments', fontsize=12)
ax.set_yticks(np.arange(0, 3501, 500))

# Print percents per class
total = data['sentiment'].size
for i in ax.patches:
    ax.text(i.get_x() + i.get_width()/2., i.get_height() + 40, str(round((i.get_height()/total)*100, 2)) + '%', ha ='center', fontsize=15)
plt.show()

In [None]:
## Split dataset into training and testing sets
X = data.sentence
y = data.sentiment

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=7)

y_train.shape

In [None]:
## Sampling the dataset
# Separate classes
neu_index = y_train[y_train == 'neutral'].index
pos_index = y_train[y_train == 'positive'].index
neg_index = y_train[y_train == 'negative'].index

In [None]:
# Down-sample neutral class
X_train_neu, y_train_neu = resample(X_train.loc[neu_index], y_train.loc[neu_index],
                                    n_samples=len(pos_index), replace=False,
                                    random_state=7)

# Up-sample negative class
X_train_neg, y_train_neg = resample(X_train.loc[neg_index], y_train.loc[neg_index],
                                    n_samples=len(pos_index), replace=True,
                                    random_state=7)

In [None]:
# Combine resampled classes
X_train_resample = X_train.loc[pos_index].append([X_train_neu, X_train_neg])
y_train_resample = y_train.loc[pos_index].append([y_train_neu, y_train_neg])

# Shuffle samples
X_train_resample, y_train_resample = shuffle(X_train_resample, y_train_resample, random_state=7)

# Display new class counts
print(y_train_resample.value_counts())

In [None]:
## Tokenize training and testing sets
tokenizer = Tokenizer()
tokenizer.fit_on_texts(X_train_resample)

X_train_resample = tokenizer.texts_to_sequences(X_train_resample)
X_test = tokenizer.texts_to_sequences(X_test)

word_index = tokenizer.word_index
print("Number of words: {}".format(len(word_index)))

In [None]:
# Pad the sequences
max_len = np.max([len(X_train_resample[i]) for i in range(len(X_train_resample))])

X_train_resample = pad_sequences(X_train_resample, maxlen=max_len)
X_test = pad_sequences(X_test, maxlen=max_len)

In [None]:
# Encode target values as integers
le = LabelEncoder()
le.fit(y_train)

y_train_resample = le.transform(y_train_resample)
y_test = le.transform(y_test)

In [None]:
# Convert integers to one hot encoded
y_train_resample = to_categorical(y_train_resample)
y_test = to_categorical(y_test)

n_classes = y_train_resample.shape[1]

In [None]:
## Create embedding matrix
def embedding_matrix(fname, word_index):
    """
    This function creates an embedding matrix.
    """
    f = open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')

    vectors = {}
    for line in f:
        tokens = line.rstrip().split(' ')
        vectors[tokens[0]] = np.asarray(tokens[1:], dtype='float32')
    f.close()

    embedding_matrix = np.zeros((len(word_index) + 1, EMBEDDING_DIM))
    for word, i in word_index.items():
        embedding_vector = vectors.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

    return embedding_matrix

In [None]:
# Embedding size
EMBEDDING_DIM = 300

# Load and save FastText word vectors
ft_matrix = embedding_matrix('wiki-news-300d-1M.vec', word_index)
np.save('embeddings/emb_matrix_ft', ft_matrix)

In [None]:
## Create the model
model = Sequential()
model.add(Embedding(len(word_index)+1, EMBEDDING_DIM, weights=[ft_matrix],
                    trainable=False, input_length=max_len))model.add(Dropout(0.25))
model.add(Bidirectional(LSTM(200, return_sequences=True), merge_mode='concat'))
model.add(Dropout(0.25))
model.add(GlobalMaxPool1D())
model.add(Dense(50, activation='sigmoid'))
model.add(Dropout(0.25))
model.add(Dense(n_classes, activation='softmax'))

model.summary()

In [None]:
## Compile the model
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['acc'])

In [None]:
## Train the model
batch_size = 64
epochs = 10
model.fit(X_train_resample, y_train_resample, epochs=epochs, batch_size=batch_size, verbose=0)

In [None]:
## Compute the testing accuracy
test_loss, test_acc = model.evaluate(X_test, y_test, batch_size=batch_size)

print("Testing accuracy: {:.1f}".format(test_acc * 100))

In [None]:
## Make predictions
y_pred = model.predict(X_test, batch_size=batch_size)

y_test = np.argmax(y_test, axis=1)
y_pred = np.argmax(y_pred, axis=1)

In [None]:
# Confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Set options to print 2 decimals
np.set_printoptions(precision=2)

In [None]:
def plot_confusion_matrix(cm, classes, 
                          title='Confusion matrix', 
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    
    fmt = 'd'
    thresh = cm.max() / 2.
    
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", 
                 color="white" if cm[i, j] > thresh else "black")
    
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
labels = le.classes_.tolist()

# Plot confusion matrix
plt.figure()
plot_confusion_matrix(cm, classes=labels)

In [None]:
# Precision, recall, f1 score and support
p, r, f1, s = precision_recall_fscore_support(y_test, y_pred, average=None)
results = pd.DataFrame({'1-Precision': p, '2-Recall': r, '3-F1 score': f1, '4-Support': s}, index=labels)

# Print precision, recall, f1 score and support
print(results.round(decimals=3))