# Optuna Example

In [33]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader , TensorDataset
from torch.optim import Adam
import torch.nn.init as init

import numpy as np

import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from matplotlib.ticker import MultipleLocator
import matplotlib.cm as cm

import copy
import seaborn as sns

from scipy.stats import norm
from sklearn.neighbors import KernelDensity, LocalOutlierFactor

import tqdm

import optuna
from optuna.trial import TrialState



## Load Data

In [34]:
# model hyperparameters
cuda = torch.cuda.is_available()
DEVICE = torch.device("cuda" if cuda else "cpu")

num_seeds = 150
seed = 0

all_state_dim = 64
state_dim = 64
action_dim = 19
training_seed = 150

# Load fullstate
data_fullstate = np.empty(num_seeds, dtype=object)
data_no_joint_pos = np.empty(num_seeds, dtype=object)
data_no_joint_vel = np.empty(num_seeds, dtype=object)
data_no_action = np.empty(num_seeds, dtype=object)
data_no_imu = np.empty(num_seeds, dtype=object)
data_no_fc = np.empty(num_seeds, dtype=object)
for i in range(num_seeds): # HEBB-FULL_STATE-seed_0-fullstate-rand-0
    data_fullstate[i] = np.load(f"data/HEBB-full/HEBB-FULL_STATE-seed_{seed}-fullstate-rand-{i}.npz")    
    
train_x = torch.empty((0, all_state_dim), dtype=torch.float32 ,device=DEVICE)
train_y = torch.empty((0, action_dim), dtype=torch.float32,device=DEVICE)
test_x = torch.empty((0, all_state_dim), dtype=torch.float32,device=DEVICE)
test_y = torch.empty((0, action_dim), dtype=torch.float32,device=DEVICE)
for i in range(training_seed):
    train_x = torch.cat((train_x, torch.tensor(data_fullstate[i]["state"].reshape(data_fullstate[i]["state"].shape[0], -1), dtype=torch.float32,device=DEVICE)), dim=0)
    train_y = torch.cat((train_y, torch.tensor(data_fullstate[i]["action_lowpass"].reshape(data_fullstate[i]["action_lowpass"].shape[0], -1), dtype=torch.float32,device=DEVICE)), dim=0)
for j in range(training_seed, num_seeds):
    test_x = torch.cat((test_x, torch.tensor(data_fullstate[j]["state"].reshape(data_fullstate[j]["state"].shape[0], -1), dtype=torch.float32,device=DEVICE)), dim=0)
    test_y = torch.cat((test_y, torch.tensor(data_fullstate[j]["action_lowpass"].reshape(data_fullstate[j]["action_lowpass"].shape[0], -1), dtype=torch.float32,device=DEVICE)), dim=0)

## Network

In [35]:
"""
    A simple implementation of Gaussian MLP Encoder
"""
class Predictor(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(Predictor, self).__init__()

        self.FC_input = nn.Linear(input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        
        self.FC_mean  = nn.Linear(hidden_dim, output_dim)
        self.FC_var   = nn.Linear (hidden_dim, output_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        self.tanh = nn.Tanh()
        # self.LeakyReLU = nn.ReLU()
        self.softplus = nn.Softplus()
        
        self.training = True
        
    def reparameterization(self , mean, var):
        epsilon = torch.randn_like(var).to(DEVICE)        # sampling epsilon        
        z = mean + var*epsilon                          # reparameterization trick
        return z
    
    def forward(self, x):
        h_       = self.tanh(self.FC_input(x))
        h_       = self.tanh(self.FC_input2(h_))
        mean     = self.FC_mean(h_)         # encoder produces mean and log of variance 
        log_var  = self.FC_var(h_)          # (i.e., parateters of simple tractable normal distribution "q"
        # log_var  = self.softplus(log_var)  # clamp log_var to avoid numerical issues
        
        z = self.reparameterization(mean, torch.exp(log_var))  # reparameterization trick
        # z is sampling from the distribution z = mean + var * epsilon
        return z,mean, log_var

## Optuna setting

In [36]:
batch_size = 1000

state_index = torch.arange(0, 19) # pos
state_index = torch.arange(19, 38) # vel
state_dim = len(state_index)

train_dataset = TensorDataset(train_x[:,state_index], train_y)
test_dataset = TensorDataset(test_x[:,state_index], test_y)

print("TRAIN : X , Y shape : ",train_x[:,state_index].shape , train_y.shape)
print("TEST : X , Y shape : ",test_x[:,state_index].shape , test_y.shape)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

TRAIN : X , Y shape :  torch.Size([75000, 19]) torch.Size([75000, 19])
TEST : X , Y shape :  torch.Size([0, 19]) torch.Size([0, 19])


In [37]:
hidden_dim = 256
input_dim = state_dim
output_dim = 19
epochs = 150

def objective(trial):
    
    lr = trial.suggest_float("lr", 1e-6, 1e-3, log=True)
    hidden_dim = trial.suggest_categorical("hidden_dim", [64, 128, 256 ,512, 1024])
    batch_size = trial.suggest_categorical("batch_size", [16 , 32 ,64, 128, 256, 512, 1024, 2048])    

    model = Predictor(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim).to(DEVICE)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.MSELoss()
    
    optimizer = Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        count = 0
        for x_batch, y_batch in train_loader:
            x_batch = x_batch.to(DEVICE)    # dim = [batch_size, state_dim]
            y_batch = y_batch.to(DEVICE)
            
            optimizer.zero_grad()
            pred, mean , log_var = model(x_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * x_batch.size(0)
            count += 1

        avg_loss = total_loss / len(train_dataset)
        trial.report(avg_loss, epoch)
        # train_losses.append(avg_loss)
    return avg_loss

In [38]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100, timeout=900)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

[I 2025-08-15 16:57:37,873] A new study created in memory with name: no-name-18f170e5-c31e-4f01-90b3-8fa501ba9603
[I 2025-08-15 17:06:16,109] Trial 0 finished with value: 0.009424594403107962 and parameters: {'lr': 3.7128753111470305e-06, 'hidden_dim': 512, 'batch_size': 64}. Best is trial 0 with value: 0.009424594403107962.
[I 2025-08-15 17:12:05,998] Trial 1 finished with value: 0.008335044305523237 and parameters: {'lr': 0.0006423610753393246, 'hidden_dim': 1024, 'batch_size': 128}. Best is trial 1 with value: 0.008335044305523237.
[W 2025-08-15 17:32:40,348] Trial 2 failed with parameters: {'lr': 1.3913014358822212e-06, 'hidden_dim': 128, 'batch_size': 16} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "e:\FIBO\internship\thesis_vistec\locomotion_learning_information-theory\.venv\Lib\site-packages\optuna\study\_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "C:\Users\em

KeyboardInterrupt: 

# Use Model

## Evaluation

In [None]:
eva_input = train_x[:500 , :19]
eva_label = train_y[:500 , :]

eva_input = test_x[:500 , :19]
eva_label = test_y[:500 , :]

eva_pred , eva_mean , eva_log_var = model(eva_input)

In [None]:
dim  = 19
cols = 3
rows = math.ceil(dim / cols)

# ตั้งค่าสถานะว่า eva_sigma เป็น std หรือ var
ASSUME_SIGMA_IS_STD = True  # ถ้า eva_sigma คือ std ให้ True, ถ้าเป็น variance อยู่แล้ว ให้ False

# เตรียมแกน x (จะพล็อตตามลำดับ index ของ sample)
T = eva_mean.shape[0]
x = np.arange(T)

# แปลงเป็น numpy
y_np = eva_label.detach().cpu().numpy()
pred_np = eva_pred.detach().cpu().numpy()
mu_np  = eva_mean.detach().cpu().numpy()                   # [T, 19]
sig_np = torch.exp(eva_log_var).detach().cpu().numpy()                  # [T, 19]
var_np = (sig_np**2) if ASSUME_SIGMA_IS_STD else sig_np    # [T, 19]
# var_np = np.clip(var_np, 1e-12, None)                      # กันติดลบ/ศูนย์

In [None]:
# เริ่มวาด 19 subplot, 3 columns
fig, axes = plt.subplots(rows, cols, figsize=(cols*5, rows*3), sharex=True)
axes = axes.flatten()

for d in range(dim):
    ax = axes[d]
    mu   = mu_np[:, d]
    var  = var_np[:, d]
    pred = pred_np[:, d]
    std  = np.sqrt(var)
    lo   = mu - 1.96 * std
    up   = mu + 1.96 * std
    y    = y_np[:, d]

    # เส้น mean
    ax.plot(x, pred, label=f"predicted (dim {d})")
    # ax.plot(x, y, label=f"label (dim {d})")
    # ax.plot(x, mu, label=f"mean (dim {d})")
    # แถบ 95% CI
    ax.fill_between(x, lo, up, alpha=0.4, label="95% CI")

    ax.set_title(f"Dim {d}")
    ax.grid(True)

fig.tight_layout()
# ดึง legend จากซับพล็อตแรกเพื่อลดความรก
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="upper center", ncol=3, bbox_to_anchor=(0.5, 1.04))
plt.show()