In [None]:
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier
from scipy.sparse import csr_matrix
import seaborn as sns
import numpy as np
import scipy
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.naive_bayes import MultinomialNB
from xgboost import XGBClassifier

## Import labeled dataset

In [None]:
#Import the dataset
data_path = '/Users/romainberquet/Desktop/epfl/ml-fin/Projet/archive/all-data.csv'
df = pd.read_csv(data_path, 
                   encoding='unicode_escape',
                   names=['Sentiment', 'Text'])

In [None]:
df = df.rename(columns={'Text' : 'text' , 'Sentiment' : 'sentiment'})

In [None]:
df["text"]=df["text"].str.lower() #We convert our texts to lowercase.
df["text"]=df["text"].str.replace("[^\w\s]","") #We remove punctuation marks from our texts.

In [None]:
#Perform data exploration on the dataset
print(df.head())

In [None]:
ax = sns.barplot(x= df.sentiment.unique(), y=df.sentiment.value_counts())
ax.set(xlabel='Sentiment', ylabel='Number of articles' , title='Number of articles per type of sentiment')
plt.show()

sentiment_proportions = df['sentiment'].value_counts(normalize=True)
print(f"The proportion of each sentiment in the dataset is \n {sentiment_proportions}")

The dataset is relatively imbalanced with a high proportion of neutral comments

In [None]:
#Plot the distribution of the number of words per article
df['text'].apply(lambda x: len(x.split(" "))).mean()
df['text'].apply(lambda x: len(x.split(" "))).plot(kind='hist' , bins=  75)
plt.show()

We have no empty articles and the distribution of the length is relatively well balanced among articles.

In [None]:
#We map the sentiment to a numerical value positive : 1, neutral 0 and negative 2
df['sentiment'] = df['sentiment'].map({'positive': 1, 'neutral': 0, 'negative': 2})

# Benchmark models

## Benchmark model : Bag of words

In [None]:
# Splitting the data into training and testing sets
X_train_bow, X_test_bow, y_train_bow, y_test_bow = train_test_split(df['text'], df['sentiment'], test_size=0.2, random_state=42)

# Convert text data into a bag-of-words model
vectorizer = CountVectorizer()
X_train_bow = vectorizer.fit_transform(X_train_bow)
X_test_bow = vectorizer.transform(X_test_bow)

In [None]:
def svm_classifier (X_train, y_train, X_test):
    # Train the model
    svm = SVC(kernel='linear')
    svm.fit(X_train, y_train)

    # Predict the sentiment for the test data
    y_pred = svm.predict(X_test)

    return y_pred

In [None]:
def gradient_boosting_classifier (X_train, y_train, X_test):
    # Train a Gradient Boosting classifier
    xgb_clf = XGBClassifier()
    xgb_clf.fit(X_train, y_train)

    # Predict and evaluate
    y_pred = xgb_clf.predict(X_test)

    return y_pred

In [None]:
def naive_bayes_classifier (X_train, y_train, X_test):
    # Train a Naive Bayes classifier
    nb_clf = MultinomialNB()
    nb_clf.fit(X_train, y_train)

    # Predict and evaluate
    y_pred = nb_clf.predict(X_test)
    # Evaluate using accuracy, precision, recall, F1-score as before

    return y_pred

In [None]:
def mlp_classifier(X_train, y_train, X_test):

    if isinstance(X_train, csr_matrix):
        X_train_dense = X_train.toarray()
    else:
        X_train_dense = X_train

    if isinstance(X_test, csr_matrix):
        X_test_dense = X_test.toarray()
    else:
        X_test_dense = X_test

    mlp = MLPClassifier(hidden_layer_sizes=(512, 256), activation='relu', solver='adam', 
                        max_iter=35, batch_size=128, verbose=True)

    mlp.fit(X_train_dense, y_train)

    # Predict
    y_pred = mlp.predict(X_test_dense)

    return y_pred
  

In [None]:
def logistic_regression_classifier(X_train, y_train, X_test):
    # Train a logistic regression classifier
    lr_clf = LogisticRegression(max_iter=1000)
    lr_clf.fit(X_train, y_train)

    # Predict and evaluate
    y_pred = lr_clf.predict(X_test)

    return y_pred

In [None]:
#Classification using bag of words
y_pred_svm_bow = svm_classifier(X_train_bow, y_train_bow, X_test_bow)
y_pred_xgb_bow  = gradient_boosting_classifier(X_train_bow, y_train_bow, X_test_bow)
y_pred_nb_bow  = naive_bayes_classifier(X_train_bow, y_train_bow, X_test_bow)
y_pred_mlp_bow  = mlp_classifier(X_train_bow, y_train_bow, X_test_bow)

In [None]:
def update_performance(df, model_name, y_test, y_pred):

    # Generate classification report
    report = classification_report(y_test, y_pred, output_dict=True)

    # Extract weighted average metrics
    weighted_avg = report['weighted avg']
    precision = weighted_avg['precision']
    recall = weighted_avg['recall']
    f1_score = weighted_avg['f1-score']

    # Create a new DataFrame for the row to be added
    new_row_df = pd.DataFrame({'Model': [model_name], 
                               'Precision': [precision], 
                               'Recall': [recall], 
                               'F1-Score': [f1_score]})

    # Concatenate the new row with the existing DataFrame
    df = pd.concat([df, new_row_df], ignore_index=True)
    
    return df

In [None]:
#Define a dataframe that will contain the performance of the different models
performance_df_bow = pd.DataFrame(columns=['Model', 'Precision', 'Recall', 'F1-Score'])

In [None]:
def plot_performance(performance_df):    
    # Plotting
    n_models = len(performance_df_bow)
    ind = np.arange(n_models)  # the x locations for the groups
    width = 0.25  # the width of the bars
    sns.set_style("whitegrid")
    sns.set_palette("Set2")
    fig, ax = plt.subplots(figsize=(12, 6))

    # Plotting each metric
    rects1 = ax.bar(ind - width, performance_df['Precision'], width, label='Precision')
    rects2 = ax.bar(ind, performance_df['Recall'], width, label='Recall')
    rects3 = ax.bar(ind + width, performance_df['F1-Score'], width, label='F1-Score')

    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax.set_ylabel('Scores')
    ax.set_title('Performance by Model and Metric')
    ax.set_xticks(ind)
    ax.set_xticklabels(performance_df['Model'])
    ax.legend()

    # Attach a text label above each bar in *rects*, displaying its height.
    def autolabel(rects):
        for rect in rects:
            height = rect.get_height()
            ax.annotate('{}'.format(round(height, 2)),
                        xy=(rect.get_x() + rect.get_width() / 2, height),
                        xytext=(0, 3),  # 3 points vertical offset
                        textcoords="offset points",
                        ha='center', va='bottom')

    # Call the function to attach the labels
    autolabel(rects1)
    autolabel(rects2)
    autolabel(rects3)

    # Show the plot
    plt.show()

In [None]:
performance_df_bow = update_performance(performance_df_bow, 'BoW & SVM', y_test_bow, y_pred_svm_bow)
performance_df_bow = update_performance(performance_df_bow, 'BoW & XGBoost', y_test_bow, y_pred_xgb_bow)
performance_df_bow = update_performance(performance_df_bow, 'BoW & Naive Bayes', y_test_bow, y_pred_nb_bow)
performance_df_bow = update_performance(performance_df_bow, 'BoW & MLP', y_test_bow, y_pred_mlp_bow)

plot_performance(performance_df_bow)

### Benchmark model : TF-IDF (Term Frequency-Inverse Frequency)

In [None]:
X_train_tfidf, X_test_tfidf, y_train_tfidf, y_test_tfidf = train_test_split(df['text'], df['sentiment'], test_size=0.2, random_state=42)

tfidf_vectorizer = TfidfVectorizer()

# Fit and transform the training data
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train_tfidf)

# Only transform the test data
X_test_tfidf = tfidf_vectorizer.transform(X_test_tfidf)

In [None]:
#Classification using bag of words
y_pred_svm_tfidf = svm_classifier(X_train_tfidf, y_train_tfidf, X_test_tfidf)
y_pred_xgb_tfidf  = gradient_boosting_classifier(X_train_tfidf, y_train_tfidf, X_test_tfidf)
y_pred_nb_tfidf  = naive_bayes_classifier(X_train_tfidf, y_train_tfidf, X_test_tfidf)
y_pred_mlp_tfidf = mlp_classifier(X_train_tfidf, y_train_tfidf, X_test_tfidf)

In [None]:
performance_df_tfidf = pd.DataFrame(columns=['Model', 'Precision', 'Recall', 'F1-Score'])

performance_df_tfidf = update_performance(performance_df_tfidf, 'TF-IDF & SVM', y_test_tfidf, y_pred_svm_tfidf)
performance_df_tfidf = update_performance(performance_df_tfidf, 'TF-IDF & XGBoost', y_test_tfidf, y_pred_xgb_tfidf)
performance_df_tfidf = update_performance(performance_df_tfidf, 'TF-IDF & Naive Bayes', y_test_tfidf, y_pred_nb_tfidf)
performance_df_tfidf = update_performance(performance_df_tfidf, 'TF-IDF & MLP', y_test_tfidf, y_pred_mlp_tfidf)

plot_performance(performance_df_tfidf)

### Benchmark model : GloVe

In [None]:
def load_glove_model(glove_file_path):
    print("Loading Glove Model")
    with open(glove_file_path, 'r', encoding='utf-8') as f:
        glove_model = {}
        for line in f:
            split_line = line.split()
            word = split_line[0]
            embedding = np.array([float(val) for val in split_line[1:]])
            glove_model[word] = embedding
        print(f"{len(glove_model)} words loaded!")
        return glove_model

glove_path = '/Users/romainberquet/Desktop/epfl/ml-fin/Projet/glove.6B/glove.6B.300d.txt'  
glove_model = load_glove_model(glove_path)

X_train_glove, X_test_glove, y_train_glove, y_test_glove = train_test_split(df['text'], df['sentiment'], test_size=0.2, random_state=42)

In [None]:
def document_vector(glove_model, doc):
    words = doc.split()
    word_vectors = [glove_model[word] for word in words if word in glove_model]
    return np.mean(word_vectors, axis=0) if word_vectors else np.zeros(300)  # 300 is the GloVe vector size

X_train_vectors_glove = np.array([document_vector(glove_model, text) for text in X_train_glove])
X_test_vectors_glove = np.array([document_vector(glove_model, text) for text in X_test_glove])

In [None]:
#Classification using bag of words
y_pred_svm_Glove = svm_classifier(X_train_vectors_glove, y_train_glove, X_test_vectors_glove)
y_pred_xgb_Glove  = gradient_boosting_classifier(X_train_vectors_glove, y_train_glove, X_test_vectors_glove)
y_pred_mlp_Glove = mlp_classifier(X_train_vectors_glove, y_train_glove, X_test_vectors_glove)

#Add a logistic regression classifier
y_pred_lr_Glove = logistic_regression_classifier(X_train_vectors_glove, y_train_glove, X_test_vectors_glove)

In [None]:
performance_df_glove = pd.DataFrame(columns=['Model', 'Precision', 'Recall', 'F1-Score'])

performance_df_glove = update_performance(performance_df_glove, 'Glove & SVM', y_test_glove, y_pred_svm_Glove)
performance_df_glove = update_performance(performance_df_glove, 'Glove & XGBoost', y_test_glove, y_pred_xgb_Glove)
performance_df_glove = update_performance(performance_df_glove, 'Glove & MLP', y_test_glove, y_pred_mlp_Glove)
performance_df_glove = update_performance(performance_df_glove, 'Glove & Logistic Regression', y_test_glove, y_pred_lr_Glove)


plot_performance(performance_df_glove)

### Financial BERT

In [None]:
tokenizer_finbert = AutoTokenizer.from_pretrained("ProsusAI/finbert")
model_finbert = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")

In [None]:
X_test_finbert = df['text'].to_list()
y_test_finbert = df['sentiment'].to_list()

In [None]:
def classify_model(X_train , tokenizer , model) :
    preds = []
    preds_proba = []
    tokenizer_kwargs = {"padding": True, "truncation": True, "max_length": 512}
    for x in X_train:
        with torch.no_grad():
            input_sequence = tokenizer(x, return_tensors="pt", **tokenizer_kwargs)
            logits = model(**input_sequence).logits
            scores = {
            k: v
            for k, v in zip(
                model.config.id2label.values(),
                scipy.special.softmax(logits.numpy().squeeze()),
            )
        }
        sentiment = max(scores, key=scores.get)
        probability = max(scores.values())
        preds.append(sentiment)
        preds_proba.append(probability)

    return preds, preds_proba

In [None]:
#Map the predictions to numerical values
y_pred_finbert, y_pred_proba_finbert = classify_model(X_test_finbert, tokenizer_finbert, model_finbert)

In [None]:
#Asses the performance of the model
finbert_performance = pd.DataFrame(classification_report(y_pred_finbert, y_test_finbert, output_dict=True))
finbert_performance

In [None]:
#FinBert is the most accurate model, we will look at the predictions it made and the probability it assigned to each prediction
finbert_df = pd.DataFrame({'text': X_test_finbert, 'sentiment': y_test_finbert, 'prediction': y_pred_finbert, 'probability': y_pred_proba_finbert})

#We look at the articles that were misclassified
missclassified = finbert_df[finbert_df['sentiment'] != finbert_df['prediction']]

#Count the number of pairs of sentiment and prediction
missclassified = missclassified.groupby(['sentiment', 'prediction']).count().reset_index().drop(columns=['probability'])

#plot a heat map of the number of misclassified articles
missclassified = missclassified.pivot(index='sentiment', columns='prediction', values='text')
sns.heatmap(missclassified, annot=True, cmap="viridis", linewidths=.5)
plt.show()

In [None]:
#Plot the distribution of the probability of the prediction
missclassified = finbert_df[finbert_df['sentiment'] != finbert_df['prediction']]

# Improved version
sns.displot(missclassified, x="probability", hue="prediction", kind="kde", fill=True, height=6)
plt.title("Probability Distribution by Prediction", fontsize=16)  
plt.xlabel("Probability", fontsize=14)
plt.ylabel("Density", fontsize=14) 
plt.xticks(fontsize=12)  
plt.yticks(fontsize=12) 
plt.show()

### RoBERTa

In [None]:
tokenizer_roberta = AutoTokenizer.from_pretrained("mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis")
model_roberta = AutoModelForSequenceClassification.from_pretrained("mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis")

In [None]:
X_test_roberta = df['text'].to_list()
y_test_roberta = df['sentiment'].to_list()

In [None]:
#Map the predictions to numerical values
y_pred_roberta, y_pred_proba_roberta = classify_model(X_test_roberta , tokenizer_roberta , model_roberta)

In [None]:
#Asses the performance of the model
roberta_performance = pd.DataFrame(classification_report(y_pred_roberta, y_test_roberta, output_dict=True))
roberta_performance