# Overview

This notebook contain the process of finding the best architecture and learning rate for the Transformer model that is used as one the base learner in this study. The dataset that is used is the selected feature from financial statements (based on the feature selection result) and also the best sentiment representation from the sentiment experiment result. The dataset that is used is the BBRI dataset.

# Base Model Search for Stock Prediction using fundamental and embedding sentiment feature (Transformer)

In [None]:
import re
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator

# DEFINE CONSTANT
TRAIN_SIZE = 0.85
# TEST_SIZE = 0.15

N_COMPONENT_PCA = 0.8

USE_PCA = True
TARGET_COLUMN_NAME = "Closing Price"
EMBEDDING_COLUMN_NAME = "text_embedding_multilingual_mpnet"

N_ITERATION = 20
CONTEXT_WINDOW = 5
NUM_EPOCHS = 100

In [None]:
fundamental_features  = [
  "Financing Cash Flow",
  "P/B Ratio",
  "P/S Ratio",
  "Capital Adequacy Ratio",
  "Debt to Assets Ratio",
  "Debt to Equity Ratio",
  "Investing Cash Flow",
  "Operating Cash Flow",
  "Return on Assets",
  "Operating Profit",
  "Loan to Deposit Ratio"
]

historic_price_feature = [
  'Opening Price',
  'Highest Price',
  'Lowest Price',
  'Volume',
  'Change'
]

print(f"Number of selected features: {len(fundamental_features)}")

In [None]:
def parse_embedding_from_df(df, embedding_column_name, n_comp, target_column_name="Closing Price"):

  feature_columns = fundamental_features + historic_price_feature

  column = [target_column_name, embedding_column_name] + feature_columns
  df_processed = df[column].copy()

  def parse_embedding_string(embedding_str):

      if not isinstance(embedding_str, str):
          return None # Handle non-string inputs (like NaN)

      # Remove brackets and split by whitespace
      embedding_str = embedding_str.strip().strip('[]')
      # Use regex to find all floating point numbers, including those in scientific notation
      numbers = re.findall(r"[-+]?\d*\.?\d+[eE][-+]?\d+|[-+]?\d*\.\d+|\d+", embedding_str)

      try:
          # Convert the extracted numbers to floats
          return [float(num) for num in numbers]
      except ValueError:
          return None # Return None if conversion to float fails for any number

  # Apply the parsing function to the embedding column and handle potential None values
  df_processed['embedding'] = df_processed[embedding_column_name].apply(parse_embedding_string)

  # Handle rows where parsing failed (e.g., by filling with zeros)
  # Determine the embedding dimension from the first successfully parsed embedding
  embedding_dim = None
  for embedding_list in df_processed['embedding']:
      if embedding_list is not None:
          embedding_dim = len(embedding_list)
          break

  if embedding_dim is None:
      # Handle case where all embeddings are None or invalid
      # Using a default BERT base dimension as a fallback.
      embedding_dim = 768 # Default BERT base dimension
      print(f"Warning: Could not determine embedding dimension from data. Using a default embedding dimension of {embedding_dim}.")

  df_processed['embedding'] = df_processed['embedding'].apply(lambda x: np.array(x, dtype=float) if x is not None else np.zeros(embedding_dim))

  def pca_reduce(embedding_list, n_comp=n_comp):
    pca = PCA(n_components=n_comp)
    embedding_reduced = pca.fit_transform(np.array(embedding_list.tolist())) # Convert list of arrays to numpy array
    return embedding_reduced

  df_processed['embedding_pca'] = pca_reduce(df_processed['embedding']).tolist()

  # Convert feature columns to numeric, coercing errors, and fill NaNs
  for col in feature_columns:
      df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce')

  return df_processed

In [None]:
def create_sequences(data, target_index, context_window):
  X, y = [], []
  for i in range(len(data) - context_window):
    X.append(data[i:i+context_window, :])
    y.append(data[i+context_window, target_index])
  return np.array(X), np.array(y)

In [None]:
bbri_merged_df = pd.read_csv("final_bbri_dataset.csv")

df_processed = parse_embedding_from_df(bbri_merged_df, EMBEDDING_COLUMN_NAME, n_comp=N_COMPONENT_PCA)

In [None]:
target_data = df_processed[TARGET_COLUMN_NAME].values

# exclude original embedding, processed embedding, embedding_pca, target column, and date from the fundamental feature columns
feature_columns = [col for col in df_processed.columns if col not in [TARGET_COLUMN_NAME, EMBEDDING_COLUMN_NAME, 'Date', 'embedding', 'embedding_pca']]
feature_data = df_processed[feature_columns].values

if USE_PCA:
    embedding_data = np.array(df_processed['embedding_pca'].tolist())
else:
  embedding_data = np.array(df_processed['embedding'].tolist())

scaler = MinMaxScaler()
scaled_target_data = scaler.fit_transform(target_data.reshape(-1, 1))

feature_scaler = MinMaxScaler()
scaled_feature_data = feature_scaler.fit_transform(feature_data)

combined_data = np.concatenate((scaled_target_data, scaled_feature_data, embedding_data), axis=1)

X, y = create_sequences(combined_data, target_index=0, context_window=CONTEXT_WINDOW)

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

n_total = len(combined_data)
train_split_index = int(n_total * TRAIN_SIZE)

X_train, X_test = X_tensor[:train_split_index], X_tensor[train_split_index:]
y_train, y_test = y_tensor[:train_split_index], y_tensor[train_split_index:]

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
def find_average_test_score_transformer(train_loader, test_loader, scaler, NUM_HEADS, NUM_LAYERS, NUM_HIDDEN_DIM, LEARNING_RATE, N_ITERATION, NUM_EPOCHS):
  class TimeSeriesTransformer(nn.Module):
    def __init__(self, feature_size, num_heads=NUM_HEADS, num_layers=NUM_LAYERS, hidden_dim=NUM_HIDDEN_DIM):
      super().__init__()
      self.input_proj = nn.Linear(feature_size, hidden_dim)
      encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dim_feedforward=2*hidden_dim, batch_first=True)
      self.transformer = nn.TransformerEncoder(encoder_layer=encoder_layer, num_layers=num_layers)
      self.regressor = nn.Linear(hidden_dim, 1)

    def forward(self, x):
      x = self.input_proj(x)
      x = self.transformer(x)
      return self.regressor(x[:, -1, :]).squeeze()

  # Check for GPU availability and set the device
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(f"Using device: {device}")

  feature_size = combined_data.shape[1]
  sum_mse = 0
  sum_mae = 0
  sum_mape = 0

  for i in tqdm(range(N_ITERATION), desc="Training and Evaluating Transformer Based Model..."):
    model = TimeSeriesTransformer(feature_size=feature_size).to(device) # Move model to device
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

    for epoch in range(NUM_EPOCHS):
      model.train()
      total_train_loss = 0
      for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device) # Move data to device
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    model.eval()
    preds, trues = [], []
    with torch.no_grad():
      for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device) # Move data to device
        outputs = model(X_batch)
        preds.extend(outputs.cpu().numpy()) # Move predictions back to CPU for evaluation
        trues.extend(y_batch.cpu().numpy()) # Move true values back to CPU for evaluation

    preds = np.array(preds).reshape(-1, 1)
    trues = np.array(trues).reshape(-1, 1)

    predicted_actual = scaler.inverse_transform(preds)
    true_actual = scaler.inverse_transform(trues)

    mse = mean_squared_error(true_actual, predicted_actual)
    mae = mean_absolute_error(true_actual, predicted_actual)
    mape = mean_absolute_percentage_error(true_actual, predicted_actual)

    sum_mse += mse
    sum_mae += mae
    sum_mape += mape

  final_mse = sum_mse / N_ITERATION
  final_mae = sum_mae / N_ITERATION
  final_mape = sum_mape / N_ITERATION

  print("\n")
  print("FINAL SCORE")
  print(f"Test MSE: {final_mse:.3f}")
  print(f"Test MAE: {final_mae:.3f}")
  print(f"Test MAPE: {(final_mape*100):.2f}%")

## Finding the Learning Rate

In [None]:
# the variable is learning rate

NUM_HEADS = 2
NUM_LAYERS = 2
NUM_HIDDEN_DIM = 32

In [None]:
LEARNING_RATE = 0.0001

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=5,
    NUM_EPOCHS=100
)

In [None]:
LEARNING_RATE = 0.0005

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
LEARNING_RATE = 0.001

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
LEARNING_RATE = 0.005

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

## Finding the Number of Heads

In [None]:
# the variable is number of heads

NUM_LAYERS = 2
NUM_HIDDEN_DIM = 2
LEARNING_RATE = 0.0001

In [None]:
NUM_HEADS = 4

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_HEADS = 8

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_HEADS = 16

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

## Finding the Number of Layers

In [None]:
# number of layers is the variable

NUM_HEADS = 4
NUM_HIDDEN_DIM = 32
LEARNING_RATE = 0.0001

In [None]:
NUM_LAYERS = 4

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_LAYERS = 6

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_LAYERS = 8

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

# Finding the Number of Hidden Dimension

In [None]:
# number of hidden dim is the variable

NUM_HEADS = 4
NUM_LAYERS = 2
LEARNING_RATE = 0.0001

In [None]:
NUM_HIDDEN_DIM = 16

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_HIDDEN_DIM = 64

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)

In [None]:
NUM_HIDDEN_DIM = 128

find_average_test_score_transformer(
    train_loader=train_loader,
    test_loader=test_loader,
    scaler=scaler,
    NUM_HEADS=NUM_HEADS,
    NUM_LAYERS=NUM_LAYERS,
    NUM_HIDDEN_DIM=NUM_HIDDEN_DIM,
    LEARNING_RATE=LEARNING_RATE,
    N_ITERATION=N_ITERATION,
    NUM_EPOCHS=NUM_EPOCHS
)