In [None]:
import json

import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import root_mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

%config InlineBackend.figure_format = 'svg'

In [None]:
torch.manual_seed(0)
device = torch.device(
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

In [None]:
DATASET = "eimi"

In [None]:
df = pd.read_csv(f"../data/raw/{DATASET}.csv", parse_dates=True)

In [None]:
df["Close"].plot(title=f"{DATASET}", figsize=(12, 8))

In [None]:
x_scaler = MinMaxScaler()
y_scaler = MinMaxScaler()

In [None]:
Xy_train_val, Xy_test = train_test_split(
    df["Close"].values, test_size=0.2, shuffle=False
)
Xy_train, Xy_val = train_test_split(Xy_train_val, test_size=0.2, shuffle=False)

In [None]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers):
        super(GRU, self).__init__()
        self.num_layers = num_layers
        self.hidden_layer_size = hidden_layer_size
        self.gru = nn.GRU(
            input_size, hidden_layer_size, num_layers=num_layers, batch_first=True
        )
        self.fc = nn.Linear(hidden_layer_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_layer_size).to(device)
        out, _ = self.gru(x, h0)
        out = self.fc(out[:, -1, :])
        return out

In [None]:
def to_tensor(data):
    return torch.tensor(data, dtype=torch.float32).unsqueeze(-1).to(device)

In [None]:
def create_sequences(data, window_size=7):
    xs, ys = [], []
    for i in range(len(data) - window_size):
        x = data[i : (i + window_size)]
        y = data[i + window_size]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

In [None]:
# Hyperparameter tuning
def objective(trial):
    hidden_size = trial.suggest_int("hidden_size", 16, 128)
    # num_layers = trial.suggest_int("num_layers", 1, 4)
    learning_rate = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    epochs = trial.suggest_int("epochs", 100, 1000)
    num_inputs = trial.suggest_int("num_inputs", 5, 30)

    model = GRU(1, hidden_layer_size=hidden_size, num_layers=2).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    X_train, y_train = create_sequences(Xy_train, window_size=num_inputs)
    X_val, y_val = create_sequences(Xy_val, window_size=num_inputs)

    X_train = to_tensor(x_scaler.fit_transform(X_train))
    X_val = to_tensor(x_scaler.transform(X_val))

    y_train = to_tensor(y_scaler.fit_transform(y_train.reshape(-1, 1)).reshape(-1))
    y_val = to_tensor(y_scaler.transform(y_val.reshape(-1, 1)).reshape(-1))

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        train_output = model(X_train)
        train_loss = criterion(train_output, y_train)

        train_loss.backward()
        optimizer.step()

        if (epoch + 1) % 100 == 0:
            model.eval()
            with torch.no_grad():
                val_output = model(X_val)
                val_loss = criterion(val_output, y_val)
            trial.report(val_loss.item(), epoch)
            if trial.should_prune():
                raise optuna.exceptions.TrialPruned()

    # Compute final validation loss after training completes
    model.eval()
    with torch.no_grad():
        final_val_output = model(X_val)
        final_val_loss = criterion(final_val_output, y_val)

    return final_val_loss.item()

In [None]:
def train_model(hyperparams, X_train, y_train, X_test, y_test):
    hidden_size = hyperparams["hidden_size"]
    # num_layers = hyperparams["num_layers"]
    learning_rate = hyperparams["lr"]
    epochs = hyperparams["epochs"]

    model = GRU(1, hidden_layer_size=hidden_size, num_layers=2).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    loss = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train)
        train_loss = loss(y_pred, y_train)
        train_loss.backward()
        optimizer.step()

        if epoch % 100 == 0:
            model.eval()
            with torch.no_grad():
                y_test_pred = model(X_test)
                test_loss = loss(y_test_pred, y_test)
            print(
                f"Epoch {epoch}, Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}"
            )
    return model

In [None]:
sampler = optuna.samplers.TPESampler(seed=0)
study = optuna.create_study(direction="minimize", sampler=sampler)
study.optimize(objective, n_trials=50)

In [None]:
params = study.best_params
num_inputs = params["num_inputs"]

In [None]:
with open(f"../models/{DATASET}_hyperparams.json", "w") as f:
    json.dump(params, f, indent=4)

In [None]:
X_train_val, y_train_val = create_sequences(Xy_train_val, window_size=num_inputs)
X_train_val = x_scaler.fit_transform(X_train_val)
y_train_val = y_scaler.fit_transform(y_train_val.reshape(-1, 1)).reshape(-1)

X_test, y_test = create_sequences(Xy_test, window_size=num_inputs)
X_test = x_scaler.transform(X_test)
y_test = y_scaler.transform(y_test.reshape(-1, 1)).reshape(-1)

In [None]:
gru = train_model(
    params,
    to_tensor(X_train_val),
    to_tensor(y_train_val),
    to_tensor(X_test),
    to_tensor(y_test),
)

In [None]:
points = []
for i in range(len(X_test)):
    input_seq = (
        torch.tensor(X_test[i, :], dtype=torch.float32)
        .to(device)
        .unsqueeze(-1)
        .unsqueeze(0)
    )
    value = gru(input_seq)
    points.append(value.cpu().item())
points = y_scaler.inverse_transform(np.array(points).reshape(-1, 1))
rmse = root_mean_squared_error(
    points, y_scaler.inverse_transform(y_test.reshape(-1, 1))
)
print(f"RMSE: {rmse:.4f}")
plt.plot(y_scaler.inverse_transform(y_test.reshape(-1, 1)), label="Observed")
plt.plot(points, "--", label="Predicted")
plt.legend()

In [None]:
torch.save(gru, f"../models/gru_{DATASET}.pt")