In [None]:
import gymnasium as gym
import numpy as np
import optuna
from optuna.visualization import plot_optimization_history, plot_param_importances
import matplotlib.pyplot as plt
from stable_baselines3 import TD3, SAC, PPO
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
import os
import torch
import kaleido
import warnings
warnings.filterwarnings('ignore')

# 1. Define the evaluation function
def evaluate_model(model, env, n_episodes=10):
    """
    Evaluate a RL model
    :param model: (BaseAlgorithm) the RL agent
    :param env: (gym.Env) the gym environment
    :param n_episodes: (int) number of episodes to evaluate
    :return: (float) mean reward
    """
    episode_rewards = []
    for _ in range(n_episodes):
        obs = env.reset()
        episode_reward = 0
        done = False
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated = env.step(action)
            done = terminated or truncated
            episode_reward += reward
        episode_rewards.append(episode_reward)
    return np.mean(episode_rewards)

# 2. Define the hyperparameter optimization objective
def objective(trial):
    # Create environment
    env = gym.make("Pendulum-v1")
    env = Monitor(env)  # For tracking rewards
    env = DummyVecEnv([lambda: env])
    
    # Sample hyperparameters
    hyperparams = {
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True),
        'batch_size': trial.suggest_categorical('batch_size', [128, 256]),
        'buffer_size': trial.suggest_categorical('buffer_size', [int(1e4), int(1e3)]),
        'learning_starts': trial.suggest_categorical('learning_starts', [100, 1000]),
        'train_freq': trial.suggest_int('train_freq', 1, 20),
        'gradient_steps': trial.suggest_int('gradient_steps', 1, 10),
        'policy_kwargs': {
            'net_arch': trial.suggest_categorical('net_arch', [[64, 64], [512, 256]])
        }
    }
    
    # Algorithm-specific parameters
    if algo == 'TD3':
        hyperparams['action_noise'] = OrnsteinUhlenbeckActionNoise(
            mean=np.zeros(env.action_space.shape[-1]),
            sigma=0.5 * np.ones(env.action_space.shape[-1])
        )
        hyperparams['tau'] = trial.suggest_float('tau', 0.001, 0.1)
    elif algo == 'SAC':
        hyperparams['ent_coef'] = trial.suggest_categorical('ent_coef', ['auto', 0.01, 0.1])
        hyperparams['tau'] = trial.suggest_float('tau', 0.001, 0.1)
    elif algo == 'PPO':
        hyperparams['n_steps'] = trial.suggest_categorical('n_steps', [64, 128, 256, 512, 1024, 2048])
        hyperparams['gae_lambda'] = trial.suggest_float('gae_lambda', 0.8, 0.99)
        hyperparams['clip_range'] = trial.suggest_float('clip_range', 0.1, 0.4)
    
    # Create model
    if algo == 'TD3':
        model = TD3("MlpPolicy", env, verbose=0, **hyperparams)
    elif algo == 'SAC':
        model = SAC("MlpPolicy", env, verbose=0, **hyperparams)
    elif algo == 'PPO':
        model = PPO("MlpPolicy", env, verbose=0, **hyperparams)
    
    # Train the model
    eval_callback = EvalCallback(
        env,
        # best_model_save_path=f"./logs/{algo}/",
        log_path=f"./logs/{algo}/",
        eval_freq=5000,
        deterministic=True,
        render=False
    )
    
    model.learn(total_timesteps=10000, callback=eval_callback)
    
    # Evaluate the model
    mean_reward = evaluate_model(model, env)
    
    # Clean up
    del model
    env.close()
    
    return mean_reward

# 3. Run the optimization
def optimize_hyperparams(algo, n_trials=20):
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)
    
    print("Best trial:")
    trial = study.best_trial
    print(f"Value: {trial.value}")
    print("Params:")
    for key, value in trial.params.items():
        print(f"    {key}: {value}")
    
    # Save best parameters
    with open(f"best_params_{algo}.txt", "w") as f:
        f.write(str(trial.params))
    
    return study

# 4. Plotting functions
def plot_learning_curve(log_dir, algo):
    """
    Plot the learning curve from Monitor logs
    """
    import pandas as pd
    
    # Load all Monitor logs
    df = pd.read_csv(f"{log_dir}/monitor.csv", skiprows=1)
    
    # Plot
    plt.figure(figsize=(12, 6))
    plt.plot(df['r'], label='Episode Reward')
    plt.plot(df['r'].rolling(10).mean(), label='Rolling Mean (10)')
    plt.xlabel('Timesteps')
    plt.ylabel('Reward')
    plt.title(f'{algo} Learning Curve')
    plt.legend()
    plt.grid()
    plt.savefig(f'{algo}_learning_curve.png')

def plot_reward_over_steps(study, algo):
    """
    Plot reward over optimization steps
    """
    fig = plot_optimization_history(study)
    fig.update_layout(title=f'{algo} Optimization History')
    print("hello")
    # fig.write_image(f"{algo}_optimization_history.png")


algo = 'TD3'

# Create directories
os.makedirs(f"./logs/{algo}", exist_ok=True)

# Run hyperparameter optimization
study = optimize_hyperparams(algo, n_trials=2)
print("Optimization completed.")


[I 2025-05-03 01:15:31,973] A new study created in memory with name: no-name-20c4db57-d27c-4e86-8652-a8092e59bbbd


Eval num_timesteps=5000, episode_reward=-1769.64 +/- 80.69
Episode length: 200.00 +/- 0.00
New best mean reward!


[I 2025-05-03 01:18:22,136] Trial 0 finished with value: -2.0927460193634033 and parameters: {'learning_rate': 2.5497540087101378e-05, 'batch_size': 256, 'buffer_size': 10000, 'learning_starts': 1000, 'train_freq': 12, 'gradient_steps': 5, 'net_arch': [512, 256], 'tau': 0.017726513288668456}. Best is trial 0 with value: -2.0927460193634033.


Eval num_timesteps=10000, episode_reward=-1484.94 +/- 144.76
Episode length: 200.00 +/- 0.00
New best mean reward!
Best trial:
Value: -2.0927460193634033
Params:
    learning_rate: 2.5497540087101378e-05
    batch_size: 256
    buffer_size: 10000
    learning_starts: 1000
    train_freq: 12
    gradient_steps: 5
    net_arch: [512, 256]
    tau: 0.017726513288668456
Optimization completed.


In [3]:

# Plot results
plot_reward_over_steps(study, algo) 
print("Plotting optimization history.")
# plot_learning_curve(f"./logs/{algo}", algo)
# Visualize parameter importance
fig = plot_param_importances(study)
fig.update_layout(title=f'{algo} Parameter Importance')
# fig.write_image(f"{algo}_param_importance.png")

fig.show()

hello
Plotting optimization history.


ValueError: Cannot evaluate parameter importances with only a single trial.