<a href="https://colab.research.google.com/github/gustikresna/LLMs-StockMovement-Forecasting/blob/main/Learning_Hyperparameter_Optimisation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **IMPORT LIBRARIES**

In [None]:
# install packages
!pip install transformers[torch]
!pip install --upgrade accelerate
!pip install datasets
!pip install optuna

In [None]:
# import libraries
import optuna
import pandas as pd
import numpy as np
import torch
import torch.nn.functional as F
from datasets import load_metric
from torch.utils.data import Dataset, DataLoader
from torch.nn.functional import softmax
from google.colab import files, runtime
from transformers import BertTokenizer, EvalPrediction, AutoTokenizer, AutoModelForSequenceClassification
from sklearn.metrics import make_scorer, f1_score, accuracy_score
from transformers import BertTokenizer, BertModel, RobertaTokenizer, RobertaModel, DistilBertTokenizer, DistilBertModel, BertForSequenceClassification, Trainer, TrainingArguments, RobertaForSequenceClassification, DistilBertForSequenceClassification

# **LOAD DATA**

In [None]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# load training data
file_path_train = '/content/drive/MyDrive/PostGrad/5. Extended Research Projects/Dataset/train_data.csv'
train_df = pd.read_csv(file_path_train)

# load test data
file_path_test = '/content/drive/MyDrive/PostGrad/5. Extended Research Projects/Dataset/test_data.csv'
test_df = pd.read_csv(file_path_test)

In [None]:
# format to datetime
train_df['start_date'] = pd.to_datetime(train_df['start_date'], format='%Y-%m-%d')
test_df['start_date'] = pd.to_datetime(test_df['start_date'], format='%Y-%m-%d')

In [None]:
# filter out no change label due to very small occurence
train_df = train_df[train_df['price_direction'] != 'no change']
test_df = test_df[test_df['price_direction'] != 'no change']

In [None]:
# encode price_direction
label_mapping = {'positive': 1, 'negative': 0}
train_df['label'] = train_df['price_direction'].map(label_mapping)
test_df['label'] = test_df['price_direction'].map(label_mapping)

In [None]:
# define function to concatenate with special tokens to separate columns
def concatenate_columns(row):
    return f"[HEADLINE] {row['headline']} [SEP] [SITUATION] {row['situation']} [SEP] [EVENTTYPE] {row['eventtype']}"

# apply function to concatenate columns headline, situation, and eventtype
train_df['combined_text'] = train_df.apply(concatenate_columns, axis=1)
test_df['combined_text'] = test_df.apply(concatenate_columns, axis=1)

In [None]:
# use only 1 company to speed up computation
train_df = train_df[train_df['permco'] == 21394]
test_df = test_df[test_df['permco'] == 21394]

# **DEFINE FUNCTION**

In [None]:
# set the device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# custom Dataset class for text data
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        # convert all texts to strings and store texts, labels, and tokenizer
        self.texts = [str(text) for text in texts]
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        # return the number of samples in the dataset
        return len(self.texts)

    def __getitem__(self, idx):
        # get a single item from the dataset
        text = self.texts[idx]
        label = self.labels[idx]
        # tokenize the text
        encodings = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        # remove the batch dimension (squeeze) from encodings
        item = {key: val.squeeze(0) for key, val in encodings.items()}
        # add the label to the item dictionary
        item['labels'] = torch.tensor(label, dtype=torch.long)
        return item

# define compute_metrics function to calculate accuracy
def compute_metrics(p):
    preds = p.predictions.argmax(axis=1)
    accuracy = accuracy_score(p.label_ids, preds)
    return {'accuracy': accuracy}

# define the objective function for Optuna hyperparameter optimisation
def objective(trial):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)

    # define hyperparameters to be optimised
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
    weight_decay = trial.suggest_float("weight_decay", 0.01, 0.1)
    warmup_steps = trial.suggest_int("warmup_steps", 500, 1000)

    # define training arguments
    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=3,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        learning_rate=learning_rate,
        warmup_steps=warmup_steps,
        weight_decay=weight_decay,
        load_best_model_at_end=True,
        metric_for_best_model="eval_accuracy",
        fp16=True,  # enable mixed precision training
    )

    # initialise trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,  # Use the test dataset for evaluation
        compute_metrics=compute_metrics
    )

    # train and evaluate the model
    trainer.train()
    eval_result = trainer.evaluate()
    return -eval_result['eval_accuracy']  # Return negative accuracy for maximization

# **HYPERPARAMETER OPTIMISATION**

## **BERT**

In [None]:
# define the features and labels
train_texts = train_df['combined_text'].tolist()
train_labels = train_df['label'].tolist()
test_texts = test_df['combined_text'].tolist()
test_labels = test_df['label'].tolist()

# initialise tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model_name = "bert-base-uncased"

# create dataset objects
train_dataset = TextDataset(train_texts, train_labels, tokenizer)
test_dataset = TextDataset(test_texts, test_labels, tokenizer)

# create a study with Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# print the best trial
print("Best trial:")
trial = study.best_trial
print(f"  Learning rate: {trial.params['learning_rate']}")
print(f"  Batch size: {trial.params['batch_size']}")
print(f"  Weight decay: {trial.params['weight_decay']}")
print(f"  Warmup steps: {trial.params['warmup_steps']}")

In [None]:
# disconnect run time
runtime.unassign()

## **RoBERTa**

In [None]:
# define the features and labels
train_texts = train_df['combined_text'].tolist()
train_labels = train_df['label'].tolist()
test_texts = test_df['combined_text'].tolist()
test_labels = test_df['label'].tolist()

# initialise tokenizer
tokenizer = AutoTokenizer.from_pretrained("roberta-base")
model_name = "roberta-base"

# create dataset objects
train_dataset = TextDataset(train_texts, train_labels, tokenizer)
test_dataset = TextDataset(test_texts, test_labels, tokenizer)

# create a study with Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=15)

# print the best trial
print("Best trial:")
trial = study.best_trial
print(f"  Learning rate: {trial.params['learning_rate']}")
print(f"  Batch size: {trial.params['batch_size']}")
print(f"  Weight decay: {trial.params['weight_decay']}")
print(f"  Warmup steps: {trial.params['warmup_steps']}")

In [None]:
# disconnect run time
runtime.unassign()

## **DistilBERT**

In [None]:
# define the features and labels
train_texts = train_df['combined_text'].tolist()
train_labels = train_df['label'].tolist()
test_texts = test_df['combined_text'].tolist()
test_labels = test_df['label'].tolist()

# initialise tokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model_name = "distilbert-base-uncased"

# create dataset objects
train_dataset = TextDataset(train_texts, train_labels, tokenizer)
test_dataset = TextDataset(test_texts, test_labels, tokenizer)

# create a study with Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# print the best trial
print("Best trial:")
trial = study.best_trial
print(f"  Learning rate: {trial.params['learning_rate']}")
print(f"  Batch size: {trial.params['batch_size']}")
print(f"  Weight decay: {trial.params['weight_decay']}")
print(f"  Warmup steps: {trial.params['warmup_steps']}")

In [None]:
# disconnect run time
runtime.unassign()

## **DistilRoBERTa**

In [None]:
# define the features and labels
train_texts = train_df['combined_text'].tolist()
train_labels = train_df['label'].tolist()
test_texts = test_df['combined_text'].tolist()
test_labels = test_df['label'].tolist()

# initialise tokenizer
tokenizer = AutoTokenizer.from_pretrained("distilroberta-base")
model_name = "distilroberta-base"

# create dataset objects
train_dataset = TextDataset(train_texts, train_labels, tokenizer)
test_dataset = TextDataset(test_texts, test_labels, tokenizer)

# create a study with Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# print the best trial
print("Best trial:")
trial = study.best_trial
print(f"  Learning rate: {trial.params['learning_rate']}")
print(f"  Batch size: {trial.params['batch_size']}")
print(f"  Weight decay: {trial.params['weight_decay']}")
print(f"  Warmup steps: {trial.params['warmup_steps']}")

In [None]:
# disconnect run time
runtime.unassign()

## **FinBERT**

In [None]:
# define the features and labels
train_texts = train_df['combined_text'].tolist()
train_labels = train_df['label'].tolist()
test_texts = test_df['combined_text'].tolist()
test_labels = test_df['label'].tolist()

# initialise tokenizer
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model_name = "yiyanghkust/finbert-tone"

# create dataset objects
train_dataset = TextDataset(train_texts, train_labels, tokenizer)
test_dataset = TextDataset(test_texts, test_labels, tokenizer)

# create a study with Optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=16)

# print the best trial
print("Best trial:")
trial = study.best_trial
print(f"  Learning rate: {trial.params['learning_rate']}")
print(f"  Batch size: {trial.params['batch_size']}")
print(f"  Weight decay: {trial.params['weight_decay']}")
print(f"  Warmup steps: {trial.params['warmup_steps']}")

In [None]:
# disconnect run time
runtime.unassign()