# Google colab commands

In [None]:
!git clone https://github.com/Francesco9932/financial-sentiment-analysis

In [None]:
%cd financial-sentiment-analysis/

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip

In [None]:
!unzip glove*.zip

In [None]:
#pip install transformers

# Import

In [None]:
from transformers import TFDistilBertModel
from transformers import DistilBertTokenizer
import pandas as pd
import numpy as np

# pre-processing
from sklearn.preprocessing import LabelEncoder
from nltk.stem.porter import *
from nltk.corpus import stopwords
import re
import nltk 
nltk.download("stopwords")

from transformers import BertTokenizer
from transformers import TFBertModel
import tensorflow as tf
from keras.utils import pad_sequences
import keras
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from keras.models import load_model

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Data preprocessing

In [None]:
df = pd.read_csv('all-data.csv', encoding="latin-1",
                 names=['sentiment', 'headline'])
df.head()

In [None]:
print("original shape: ", df.shape)
df = df.drop_duplicates()
print("after drop duplicates shape: ", df.shape)
dd_dn = df.dropna()
print("after drop null shape: ", df.shape)

In [None]:
df.isnull().sum() 

In [None]:
df['sentiment'].value_counts().plot(kind='bar')
class_percentage = df['sentiment'].value_counts() / len(df) * 100
print(round(class_percentage, 1))

In [None]:
import string
def preprocess(text):
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9]", " ", text)
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    # words = text.split()
    # words = [w for w in words if w not in stopwords.words("english")]
    return text

df['headline'] = df['headline'].apply(preprocess)
#df.to_csv('preprocessed.csv', index=False)

In [None]:
# Positive tweet
print("Positive news headline example :",df[df['sentiment']=='positive']['headline'].values[1])
#negative_text
print("Negative news headline example :",
      df[df['sentiment'] == 'negative']['headline'].values[1])
#neutral_text
print("Neutral news headline example  :",df[df['sentiment']=='neutral']['headline'].values[1])

In [None]:
sentences = df['headline'].to_list()

# Encode target labels
le = LabelEncoder()
le.fit(df['sentiment'])
df['sentiment'] = le.transform(df['sentiment'])
le.classes_

# Tokenizer for other models

In [None]:
tokenizer = keras.preprocessing.text.Tokenizer(lower=True)
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index
print("Vocab length:", len(word_index) + 1)

max_seq_length = np.max(list(map(lambda x: len(x), sequences)))
print("Maximum sequence length:", max_seq_length)
sequences = pad_sequences(sequences, maxlen=max_seq_length, padding='post')

In [None]:
train_sequences, test_sequences, y_train, y_test = train_test_split(sequences, df['sentiment'], train_size=0.7, shuffle=True, random_state=42)

In [None]:
print('Train Set ->', train_sequences.shape, y_train.shape)
print('Test Set ->', test_sequences.shape, y_test.shape)

# Global Vectors for Word Representation (GloVe)

In [None]:
vocab_size = 10123
embedding_size = 200

embeddings_index = {}

with open('glove.6B.200d.txt') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs


embeddings_matrix = np.zeros((vocab_size+1, embedding_size))
for word, i in word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embeddings_matrix[i] = embedding_vector

In [None]:
print(embeddings_matrix.shape)

# CNN1d Model

In [None]:
inputs = tf.keras.Input(shape=(train_sequences.shape[1],))
x = tf.keras.layers.Embedding(input_dim=vocab_size+1,
                              output_dim=embedding_size,
                              input_length=train_sequences.shape[1],
                              weights=[embeddings_matrix])(inputs)

convs = []
filter_sizes = [2,3,4,5,6]

for filter_size in filter_sizes:
    l_conv = tf.keras.layers.Conv1D(filters=200, 
                        kernel_size=filter_size, 
                        activation='relu')(x)
    l_pool = tf.keras.layers.GlobalMaxPooling1D()(l_conv)
    convs.append(l_pool)

l_merge = tf.keras.layers.concatenate(convs, axis=1)
x = tf.keras.layers.Dropout(0.1)(l_merge)  
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.2)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)


model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=5)
mc = ModelCheckpoint('./best_model/best_model_cnn1d.h5', 
                     monitor='val_accuracy', mode='max', verbose=1, 
                     save_best_only=True)

history = model.fit(train_sequences,
                    y_train,
                    batch_size=32,
                    epochs=100, 
                    validation_split=0.1,
                    callbacks=[es, mc])

In [None]:
saved_model = load_model('./best_model/best_model_cnn1d.h5')

results = saved_model.evaluate(test_sequences, y_test, verbose =  0)

print("Test Losss: {:.5f}".format(results[0]))
print("Test accuracy: {:.5f}%".format(results[1]  * 100))

# LSTM Model

In [None]:
inputs = tf.keras.Input(shape=(train_sequences.shape[1],))
x = tf.keras.layers.Embedding(input_dim=vocab_size,
                              output_dim=embedding_size,
                              input_length=train_sequences.shape[1])(inputs)
x = tf.keras.layers.LSTM(256, return_sequences=True, activation='tanh')(x)
x = tf.keras.layers.Flatten()(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=5)
mc = ModelCheckpoint('./best_model/best_model_lstm.h5', 
                     monitor='val_accuracy', mode='max', verbose=1, 
                     save_best_only=True)

history = model.fit(train_sequences,
                    y_train,
                    batch_size=32,
                    epochs=100, 
                    validation_split=0.1,
                    callbacks=[es, mc])

In [None]:
saved_model = load_model('./best_model/best_model_lstm.h5')

results = saved_model.evaluate(test_sequences, y_test, verbose =  0)

print("Test Losss: {:.5f}".format(results[0]))
print("Test accuracy: {:.5f}%".format(results[1]  * 100))

# BERT Finetune

In [None]:
finBertTokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert')
finBert = TFBertModel.from_pretrained('ProsusAI/finbert')

distilBertTokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
distilBert = TFDistilBertModel.from_pretrained('distilbert-base-uncased')

In [None]:
def create_BERT_wrapper(bert_embeddings, max_len):
    input_ids = tf.keras.layers.Input(
        shape=(max_len,), dtype=tf.int32, name="input_ids")
    input_mask = tf.keras.layers.Input(
        shape=(max_len,), dtype=tf.int32, name="attention_mask")

    embeddings = finBert(input_ids, attention_mask=input_mask)[
        0]  # 0 = last hidden state, 1 = poller_output

    out = tf.keras.layers.GlobalMaxPool1D()(embeddings)
    out = tf.keras.layers.Dense(128, activation='relu')(out)
    out = tf.keras.layers.Dropout(0.1)(out)
    out = tf.keras.layers.Dense(32, activation='relu')(out)

    y = tf.keras.layers.Dense(3, activation='softmax')(out)

    model = tf.keras.Model(inputs=[input_ids, input_mask], outputs=y)

    model.layers[2].trainable = True
    # check https://stackoverflow.com/questions/60463829/training-tfbertforsequenceclassification-with-custom-x-and-y-data

    return model


In [None]:
def calculate_max_sentence_len(sentences, tokenizer):
    max_len = 0

    # For every sentence...
    for sent in sentences:

        # Tokenize the text and add `[CLS]` and `[SEP]` tokens.
        input_ids = tokenizer.encode(sent, add_special_tokens=True)

        # Update the maximum sentence length.
        max_len = max(max_len, len(input_ids))

    return max_len

In [None]:
"""def train_test_set(tokenizer):
    train_sentences, test_sentences, labels_train, labels_test = train_test_split(
        sentences, df['sentiment'], train_size=0.7, shuffle=True, random_state=42)

    max_len = min(calculate_max_sentence_len(train_sentences, tokenizer),
                  calculate_max_sentence_len(test_sentences, tokenizer))

    print('Max sentence length: ', max_len)

    X_train = tokenizer(
        text=train_sentences,
        add_special_tokens=True,
        max_length=max_len,
        truncation=True,
        padding=True,
        return_tensors='tf',
        return_token_type_ids=False,
        return_attention_mask=True,
        verbose=True)

    X_test = tokenizer(
        text=test_sentences,
        add_special_tokens=True,
        max_length=max_len,
        truncation=True,
        padding=True,
        return_tensors='tf',
        return_token_type_ids=False,
        return_attention_mask=True,
        verbose=True
    )
    # the same for attention mask
    print("\nTrain split shape: ", X_train['input_ids'].shape)
    print("\nTest split shape: ", X_test['input_ids'].shape)
    print(X_train['input_ids'])
    print(X_train['attention_mask'])

    return X_train, X_test, labels_train, labels_test, max_len """


In [None]:
"""X_train, X_test, labels_train, labels_test, max_len_finBert = train_test_set(
    finBertTokenizer)"""


In [None]:
def train_test_set(tokenizer):
    max_len = calculate_max_sentence_len(sentences, tokenizer)
    print('Max sentence length: ', max_len)

    train_sentences, test_sentences, labels_train, labels_test = train_test_split(
    sentences, df['sentiment'], train_size=0.7, shuffle=True, random_state=42)

    X_train = {"input_ids" : [], "attention_mask" : []}
    X_test = {"input_ids" : [], "attention_mask" : []}

    for sentence in train_sentences:
        
            # `encode_plus` will:
            #    (1) Tokenize the sentence
            #    (2) Add the `[CLS]` and `[SEP]` token to the start and end
            #    (3) Truncate/Pad sentence to max length
            #    (4) Map tokens to their IDs
            #    (5) Create attention mask
            #    (6) Return a dictionary of outputs

        encoded_sent = tokenizer.encode_plus(
            text=sentence,  # Preprocess sentence
            add_special_tokens=True,        # Add `[CLS]` and `[SEP]`
            max_length=max_len,                  # Max length to truncate/pad
            padding='max_length',
            truncation=True,       # Pad sentence to max length
            return_attention_mask=True,
            return_token_type_ids=False,
            return_tensors='tf',
            verbose=True     # Return attention mask
        )

        # Add the outputs to the lists
        X_train["input_ids"].append(encoded_sent.get('input_ids'))
        X_train["attention_mask"].append(encoded_sent.get('attention_mask'))

    for sentence in test_sentences:  
        encoded_sent = tokenizer.encode_plus(
            text=sentence,  # Preprocess sentence
            add_special_tokens=True,        # Add `[CLS]` and `[SEP]`
            max_length=max_len,                  # Max length to truncate/pad
            padding='max_length',
            truncation=True,       # Pad sentence to max length
            return_attention_mask=True,
            return_token_type_ids=False,
            return_tensors='tf',
            verbose=True     # Return attention mask
        )

        X_test["input_ids"].append(encoded_sent.get('input_ids'))
        X_test["attention_mask"].append(encoded_sent.get('attention_mask'))

    for key in ["input_ids", "attention_mask"]:
        X_train[key] = np.array(X_train[key])
        X_test[key] = np.array(X_test[key])
        X_train[key] = X_train[key].reshape(X_train[key].shape[0], max_len)
        X_test[key] = X_test[key].reshape(X_test[key].shape[0], max_len)

    # the same for attention mask
    print("\nTrain split shape: ", X_train['input_ids'].shape)
    print("\nTest split shape: ", X_test['input_ids'].shape)

    return X_train, X_test, labels_train, labels_test, max_len

In [None]:
X_train, X_test, labels_train, labels_test, max_len_finBert = train_test_set(finBertTokenizer)

# finBERT

In [None]:
model = create_BERT_wrapper(finBert, max_len_finBert)

optimizer = tf.keras.optimizers.Adam(
    learning_rate=5e-05,  # HF recommendation
    epsilon=1e-08,
    clipnorm=1.0
)

loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.CategoricalAccuracy('balanced_accuracy')

model.compile(
    optimizer=optimizer,
    loss=loss,
    metrics=metric
)

model.summary()

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=3)
mc = ModelCheckpoint('./best_model/finetuned_finbert.h5',
                     monitor='val_balanced_accuracy', mode='max', verbose=1,
                     save_best_only=True)

history = model.fit(x= {'input_ids': X_train['input_ids'], 'attention_mask': X_train['attention_mask']},
                    y= tf.keras.utils.to_categorical(labels_train),
                    batch_size=32,
                    epochs=10,
                    validation_split=0.1,
                    callbacks=[es, mc])

In [None]:
saved_model = load_model('./best_model/finetuned_finbert.h5',
                         custom_objects={"TFBertModel": TFBertModel})


In [None]:
# Classification report:
predicted = saved_model.predict(
    {'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']})
y_predicted = np.argmax(predicted, axis=1)
print(classification_report(labels_test, y_predicted))

In [None]:
# Example of prediction
def tokenize_sentence(sentence, tokenizer=finBertTokenizer):
    tokenized_sentence = tokenizer.encode_plus(
        text=sentence,  # Preprocess sentence
        add_special_tokens=True,        # Add `[CLS]` and `[SEP]`
        max_length=93,                  # Max length to truncate/pad
        pad_to_max_length=True,         # Pad sentence to max length
        return_attention_mask=True,
        return_token_type_ids=False,
        return_tensors='tf',
        verbose=True     # Return attention mask
    )
    return tokenized_sentence


tokenized_positive_sentence = tokenize_sentence(
    df[df['sentiment'] == 'positive']['headline'].values[1])
tokenized_negative_sentence = tokenize_sentence(
    df[df['sentiment'] == 'negative']['headline'].values[1])
tokenized_neutral_sentence = tokenize_sentence(
    df[df['sentiment'] == 'neutral']['headline'].values[1])

predicted1 = saved_model.predict(
    {'input_ids': tokenized_positive_sentence['input_ids'], 'attention_mask': tokenized_positive_sentence['attention_mask']})
y_predicted1 = np.argmax(predicted1, axis=1)

print("Positive sentence prediction: ", le.inverse_transform(y_predicted1)[0])
print("Positive sentence true label: ",
      df[df['sentiment'] == 'positive']['sentiment'].values[0])
print("-"*50)

predicted2 = saved_model.predict(
    {'input_ids': tokenized_negative_sentence['input_ids'], 'attention_mask': tokenized_negative_sentence['attention_mask']})
y_predicted2 = np.argmax(predicted2, axis=1)

print("Negative sentence prediction: ", le.inverse_transform(y_predicted2)[0])
print("Negative sentence true label: ",
      df[df['sentiment'] == 'negative']['sentiment'].values[0])
print("-"*50)

predicted3 = saved_model.predict(
    {'input_ids': tokenized_neutral_sentence['input_ids'], 'attention_mask': tokenized_neutral_sentence['attention_mask']})
y_predicted3 = np.argmax(predicted3, axis=1)

print("Neutral sentence prediction: ", le.inverse_transform(y_predicted3)[0])
print("Neutral sentence true label: ",
      df[df['sentiment'] == 'neutral']['sentiment'].values[0])
print("-"*50)


# distilBERT

In [None]:
X_train, X_test, labels_train, labels_test, max_len_distilBert = train_test_set(distilBertTokenizer)

In [None]:
model = create_BERT_wrapper(distilBert, max_len = 76)

optimizer = tf.keras.optimizers.Adam(
    learning_rate=5e-05,  # HF recommendation
    epsilon=1e-08,
    clipnorm=1.0
)

loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.CategoricalAccuracy('balanced_accuracy')

model.compile(
    optimizer=optimizer,
    loss=loss,
    metrics=metric
)

model.summary()

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=3)
mc = ModelCheckpoint('./best_model/finetuned_distilbert.h5',
                     monitor='val_balanced_accuracy', mode='max', verbose=1,
                     save_best_only=True)

history = model.fit(x= {'input_ids': X_train['input_ids'], 'attention_mask': X_train['attention_mask']},
                    y= tf.keras.utils.to_categorical(labels_train),
                    batch_size=32,
                    epochs=10,
                    validation_split=0.1,
                    callbacks=[es, mc])

In [None]:
saved_model = load_model('./best_model/finetuned_distilbert.h5',
                         custom_objects={"TFDistilBertModel": TFDistilBertModel})


In [None]:
# Classification report:
predicted = saved_model.predict(
    {'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']})
y_predicted = np.argmax(predicted, axis=1)
print(classification_report(labels_test, y_predicted))

In [None]:
# Example of prediction
def tokenize_sentence(sentence, tokenizer=distilBertTokenizer):
    tokenized_sentence = tokenizer.encode_plus(
        text=sentence,  # Preprocess sentence
        add_special_tokens=True,        # Add `[CLS]` and `[SEP]`
        max_length=120,                  # Max length to truncate/pad
        pad_to_max_length=True,         # Pad sentence to max length        
        return_attention_mask=True,
        return_token_type_ids=False,
        return_tensors='tf',
        verbose=True     # Return attention mask
    )
    return tokenized_sentence

tokenized_positive_sentence = tokenize_sentence(df[df['sentiment']=='positive']['headline'].values[0])
tokenized_negative_sentence = tokenize_sentence(df[df['sentiment'] == 'negative']['headline'].values[0])
tokenized_neutral_sentence = tokenize_sentence(df[df['sentiment']=='neutral']['headline'].values[0])

predicted1 = saved_model.predict(
    {'input_ids': tokenized_positive_sentence['input_ids'], 'attention_mask': tokenized_positive_sentence['attention_mask']})
y_predicted1 = np.argmax(predicted1, axis=1)

print("Positive sentence prediction: ", le.inverse_transform(y_predicted1)[0])
print("Positive sentence true label: ", df[df['sentiment']=='positive']['sentiment'].values[0])
print("-"*50)

predicted2 = saved_model.predict(
    {'input_ids': tokenized_negative_sentence['input_ids'], 'attention_mask': tokenized_negative_sentence['attention_mask']})
y_predicted2 = np.argmax(predicted2, axis=1)

print("Negative sentence prediction: ", le.inverse_transform(y_predicted2)[0])
print("Negative sentence true label: ", df[df['sentiment']=='negative']['sentiment'].values[0])
print("-"*50)

predicted3 = saved_model.predict({'input_ids': tokenized_neutral_sentence['input_ids'], 'attention_mask': tokenized_neutral_sentence['attention_mask']})
y_predicted3 = np.argmax(predicted3, axis=1)

print("Neutral sentence prediction: ", le.inverse_transform(y_predicted3)[0])
print("Neutral sentence true label: ", df[df['sentiment']=='neutral']['sentiment'].values[0])
print("-"*50)
