<a href="https://colab.research.google.com/github/AhmUmarCMS/AhmuCMS/blob/main/Fine_Tuned_FinBERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import random
from tqdm import tqdm
import seaborn as sns
import matplotlib.pyplot as plt
from transformers import logging

logging.set_verbosity_error()
from sklearn.metrics import f1_score, accuracy_score
from sklearn.model_selection import train_test_split
from transformers import (
    BertTokenizer,
    AutoModelForSequenceClassification,
    AdamW,
    get_linear_schedule_with_warmup,
)
import torch
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset

# Load the dataset
financial_data = pd.read_csv(
    "financial_phrase_bank.csv", encoding="latin-1", names=["sentiment", "NewsHeadline"]
)

# Label encode the sentiment values
def encode_sentiments_values(df):
    possible_sentiments = df.sentiment.unique()
    sentiment_dict = {}

    for index, possible_sentiment in enumerate(possible_sentiments):
        sentiment_dict[possible_sentiment] = index

    # Encode all the sentiment values
    df["label"] = df.sentiment.replace(sentiment_dict)

    return df, sentiment_dict

# Encode the sentiment column
financial_data, sentiment_dict = encode_sentiments_values(financial_data)

# Create training and validation data
X_train, X_val, y_train, y_val = train_test_split(
    financial_data.index.values,
    financial_data.label.values,
    test_size=0.20,
    random_state=2022,
    stratify=financial_data.label.values,
)

# Convert indices to DataFrames
X_train = financial_data.loc[X_train]
X_val = financial_data.loc[X_val]

# Handle missing values
X_train['NewsHeadline'] = X_train['NewsHeadline'].fillna('')
X_val['NewsHeadline'] = X_val['NewsHeadline'].fillna('')

# Ensure all values are strings
X_train['NewsHeadline'] = X_train['NewsHeadline'].astype(str)
X_val['NewsHeadline'] = X_val['NewsHeadline'].astype(str)

# Get the BERT Tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased", do_lower_case=True)

# Encode the Training and Validation Data
encoded_data_train = tokenizer.batch_encode_plus(
    X_train.NewsHeadline.values,
    return_tensors="pt",
    add_special_tokens=True,
    return_attention_mask=True,
    pad_to_max_length=True,
    max_length=150,
)

encoded_data_val = tokenizer.batch_encode_plus(
    X_val.NewsHeadline.values,
    return_tensors="pt",
    add_special_tokens=True,
    return_attention_mask=True,
    pad_to_max_length=True,
    max_length=150,
)

# Convert encoded data to torch tensors
input_ids_train = encoded_data_train["input_ids"]
attention_masks_train = encoded_data_train["attention_mask"]
labels_train = torch.tensor(y_train)

input_ids_val = encoded_data_val["input_ids"]
attention_masks_val = encoded_data_val["attention_mask"]
sentiments_val = torch.tensor(y_val)

# Create TensorDatasets for training and validation data
dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train)
dataset_val = TensorDataset(input_ids_val, attention_masks_val, sentiments_val)

# Create DataLoader for training and validation data
batch_size = 32

dataloader_train = DataLoader(
    dataset_train, sampler=RandomSampler(dataset_train), batch_size=batch_size
)

dataloader_validation = DataLoader(
    dataset_val, sampler=SequentialSampler(dataset_val), batch_size=batch_size
)

# Load the pre-trained BERT model
model = AutoModelForSequenceClassification.from_pretrained(
    "ProsusAI/finbert", num_labels=len(sentiment_dict)
)

# Set up the optimizer and learning rate scheduler
epochs = 3
optimizer1 = torch.optim.AdamW(model.parameters(), lr=5e-5, eps=1e-8)

scheduler = get_linear_schedule_with_warmup(
    optimizer1, num_warmup_steps=0, num_training_steps=len(dataloader_train) * epochs
)

# Set random seeds for reproducibility
seed_val = 2022
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Function to evaluate the model on the validation set
def evaluate(dataloader_val):
    model.eval()  # Set the model to evaluation mode

    loss_val_total = 0
    predictions, true_vals = [], []

    for batch in dataloader_val:
        batch = tuple(b.to(device) for b in batch)
        inputs = {
            "input_ids": batch[0],
            "attention_mask": batch[1],
            "labels": batch[2],
        }

        with torch.no_grad():
            outputs = model(**inputs)

        loss = outputs[0]
        logits = outputs[1]
        loss_val_total += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = inputs["labels"].cpu().numpy()
        predictions.append(logits)
        true_vals.append(label_ids)

    loss_val_avg = loss_val_total / len(dataloader_val)

    predictions = np.concatenate(predictions, axis=0)
    true_vals = np.concatenate(true_vals, axis=0)
    return loss_val_avg, predictions, true_vals

# Training loop
for epoch in tqdm(range(1, epochs + 1)):
    model.train()  # Set the model to training mode

    loss_train_total = 0

    progress_bar = tqdm(
        dataloader_train, desc="Epoch {:1d}".format(epoch), leave=False, disable=False
    )
    for batch in progress_bar:
        model.zero_grad()  # Clear previously calculated gradients

        batch = tuple(b.to(device) for b in batch)

        inputs = {
            "input_ids": batch[0],
            "attention_mask": batch[1],
            "labels": batch[2],
        }

        outputs = model(**inputs)  # Perform forward pass

        loss = outputs[0]
        loss_train_total += loss.item()
        loss.backward()  # Perform backward pass to calculate gradients

        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer1.step()  # Update model parameters
        scheduler.step()  # Update the learning rate

        progress_bar.set_postfix(
            {"training_loss": "{:.3f}".format(loss.item() / len(batch))}
        )

    # Save the model after each epoch
    torch.save(model.state_dict(), f"finetuned_BERT_epoch_{epoch}.model")

    tqdm.write(f"\nEpoch {epoch}")

    loss_train_avg = loss_train_total / len(dataloader_train)
    tqdm.write(f"Training loss: {loss_train_avg}")

    val_loss, predictions, true_vals = evaluate(dataloader_validation)
    val_f1 = f1_score(predictions, true_vals, average="weighted")
    tqdm.write(f"Validation loss: {val_loss}")
    tqdm.write(f"F1 Score (Weighted): {val_f1}")

# Load the best model & Make Predictions
model = AutoModelForSequenceClassification.from_pretrained(
    "ProsusAI/finbert", num_labels=len(sentiment_dict)
)

model.to(device)

model.load_state_dict(
    torch.load("finetuned_BERT_epoch_1.model", map_location=torch.device("cpu"))
)

_, predictions, true_vals = evaluate(dataloader_validation)

print("Accuracy: ", accuracy_score(predictions, true_vals))


  0%|          | 0/3 [00:00<?, ?it/s]
Epoch 1:   0%|          | 0/122 [00:00<?, ?it/s][A
Epoch 1:   0%|          | 0/122 [01:10<?, ?it/s, training_loss=0.799][A
Epoch 1:   1%|          | 1/122 [01:10<2:22:42, 70.77s/it, training_loss=0.799][A
Epoch 1:   1%|          | 1/122 [02:02<2:22:42, 70.77s/it, training_loss=0.554][A
Epoch 1:   2%|▏         | 2/122 [02:02<1:58:37, 59.32s/it, training_loss=0.554][A
Epoch 1:   2%|▏         | 2/122 [02:50<1:58:37, 59.32s/it, training_loss=0.445][A
Epoch 1:   2%|▏         | 3/122 [02:50<1:47:57, 54.43s/it, training_loss=0.445][A
Epoch 1:   2%|▏         | 3/122 [03:39<1:47:57, 54.43s/it, training_loss=0.416][A
Epoch 1:   3%|▎         | 4/122 [03:39<1:42:57, 52.35s/it, training_loss=0.416][A