In [None]:
import pandas as pd
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import re
from sklearn.metrics import accuracy_score
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Download necessary NLTK data
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('stopwords')

# Data study

In [None]:
# import the Sentiment Analysis for Financial News DataSet: FinancialPhraseBank (Available on Hugging Face)

# https://huggingface.co/datasets/financial_phrasebank
# 0 is negative
# 1 is positive
# 2 is neutral

dataset = load_dataset("financial_phrasebank", "sentences_allagree")

# Convert to pandas DataFrame
df = pd.DataFrame(dataset["train"])

# Print the first few rows to understand the structure
print(df.head())

In [None]:
df = pd.DataFrame(dataset["train"])

df_distrib = df["label"].value_counts()
plt.bar(df_distrib.index, df_distrib.values)
plt.xlabel("label")
plt.ylabel("count")
plt.title("Distribution of the data among the labels")

In [None]:
from wordcloud import WordCloud

text = " ".join(df["sentence"])

wordcloud = WordCloud().generate(text)

plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")

In [None]:
import seaborn as sns

phrase_length = pd.DataFrame(data=dataset["train"]).apply(lambda x : len(x["sentence"]), axis=1)
sns.histplot(phrase_length, bins=30, kde=True)
plt.title('Distribution of Phrase Lengths')
plt.xlabel('Phrase Length')
plt.show()

number_words = pd.DataFrame(data=dataset["train"]).apply(lambda x : len(x["sentence"].split(" ")), axis=1)
sns.histplot(number_words, bins=30, kde=True)
plt.title('Distribution Number of words in Phrases')
plt.xlabel('Phrase Length')
plt.show()

# Preprocess the data

In [None]:
# Define stopwords
en_stopwords = set(stopwords.words("english"))

# Define preprocessing function
def preprocessing(sentence):
    sentence = sentence.lower()  # Remove caps
    sentence = re.sub(r"[^a-z\s]", "", sentence)  # Remove everything that is not a letter or a space
    sentence = word_tokenize(sentence)  # Tokenize
    sentence = [word for word in sentence if word not in en_stopwords]  # Remove stopwords
    lemmatizer = WordNetLemmatizer()
    sentence = [lemmatizer.lemmatize(word) for word in sentence]  # Lemmatize
    return " ".join(sentence)

# Preprocess the sentences for the training and validation sets
df["sentence_preprocessed"] = df["sentence"].apply(preprocessing)

print(df.head())

In [None]:
# Define tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define tokenization function
def tokenize(batch):
    return tokenizer(batch['sentence_preprocessed'], padding='max_length', truncation=True, max_length=512)

# Tokenize the dataset
preprocessed_dataset = Dataset.from_pandas(df[['sentence_preprocessed', 'label']])
preprocessed_dataset = preprocessed_dataset.map(tokenize, batched=True)

# Add labels to the dataset and set format for PyTorch
preprocessed_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# Split the dataset into training, validation, and testing datasets
length = len(preprocessed_dataset)
indices = list(range(length))
train_val_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_val_dataset = preprocessed_dataset.select(train_val_indices)
test_dataset = df.loc[test_indices]  # Non-preprocessed test set in DataFrame format

# Further split the train_val_dataset into training and validation datasets
train_indices, val_indices = train_test_split(
    list(range(len(train_val_dataset))), test_size=0.2, random_state=42
)

train_dataset = train_val_dataset.select(train_indices)
val_dataset = train_val_dataset.select(val_indices)

# Print data splits
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")
print(f"Number of test samples: {len(test_dataset)}")

# Bert fine-tuning

In [None]:
# Define the model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)

# Define the training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=64,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
    save_strategy="epoch",  # Save at the end of each epoch
    load_best_model_at_end=True,
    metric_for_best_model="eval_accuracy",
)

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
    }

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

# Train the model
trainer.train()

# Evaluate our fine-tuning

In [None]:
# Evaluate the model on the training set
training_accuracy = trainer.evaluate(train_dataset)
print(f"Training accuracy: {training_accuracy['eval_accuracy']}")

# Evaluate the model on the validation set
validation_accuracy = trainer.evaluate(val_dataset)
print(f"Validation accuracy: {validation_accuracy['eval_accuracy']}")

# Define tokenization function for non-preprocessed test set
def tokenize_no_preprocessing(batch):
    return tokenizer(batch['sentence'], padding='max_length', truncation=True, max_length=512)

# Convert non-preprocessed test set to Hugging Face dataset
test_dataset_hf = Dataset.from_pandas(test_dataset[['sentence', 'label']])

# Tokenize the non-preprocessed test set
test_dataset_hf = test_dataset_hf.map(tokenize_no_preprocessing, batched=True)
test_dataset_hf.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# Evaluate the model on the non-preprocessed test set
evaluation_results = trainer.evaluate(test_dataset_hf)
print(f"Accuracy on the test set: {evaluation_results['eval_accuracy']}")

# Print evaluation results
print(evaluation_results)