In [None]:
import warnings

import torch

from omnisafe.common.experiment_grid import ExperimentGrid
import omnisafe

In [None]:
eg = ExperimentGrid(exp_name='BM_Safety_Lagrangian')

# Set the algorithms.
# base_policy = ['TRPO', 'PPO']
naive_lagrange_policy = ['PPOLag', 'TRPOLag']
# second_order_policy = ['CPO']

# Set the environments.
safety_envs = [
    'SafetyPointGoal1-v0',
]
eg.add('env_id', safety_envs)

# Set the device.
avaliable_gpus = list(range(torch.cuda.device_count()))
gpu_id = [0]

if gpu_id and not set(gpu_id).issubset(avaliable_gpus):
    warnings.warn('The GPU ID is not available, use CPU instead.', stacklevel=1)
    gpu_id = None

eg.add('algo', naive_lagrange_policy)
eg.add('logger_cfgs:use_wandb', [True])
eg.add('logger_cfgs:wandb_project', ['Benchmark_Safety'])
eg.add('train_cfgs:vector_env_nums', [4])
eg.add('train_cfgs:torch_threads', [8])
eg.add('algo_cfgs:steps_per_epoch', [20000])
eg.add('train_cfgs:total_steps', [20000 * 30])
eg.add('seed', [0])

In [None]:
"""Example of training a policy from exp-x config with OmniSafe."""
import os, sys
from omnisafe.typing import NamedTuple, Tuple


def train(
    exp_id: str, algo: str, env_id: str, custom_cfgs: NamedTuple
) -> Tuple[float, float, float]:
    """Train a policy from exp-x config with OmniSafe.

    Args:
        exp_id (str): Experiment ID.
        algo (str): Algorithm to train.
        env_id (str): The name of test environment.
        custom_cfgs (NamedTuple): Custom configurations.
        num_threads (int, optional): Number of threads. Defaults to 6.
    """
    terminal_log_name = 'terminal.log'
    error_log_name = 'error.log'
    if 'seed' in custom_cfgs:
        terminal_log_name = f'seed{custom_cfgs["seed"]}_{terminal_log_name}'
        error_log_name = f'seed{custom_cfgs["seed"]}_{error_log_name}'
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    print(f'exp-x: {exp_id} is training...')
    if not os.path.exists(custom_cfgs['logger_cfgs']['log_dir']):
        os.makedirs(custom_cfgs['logger_cfgs']['log_dir'], exist_ok=True)
    # pylint: disable-next=consider-using-with
    sys.stdout = open(
        os.path.join(f'{custom_cfgs["logger_cfgs"]["log_dir"]}', terminal_log_name),
        'w',
        encoding='utf-8',
    )
    # pylint: disable-next=consider-using-with
    sys.stderr = open(
        os.path.join(f'{custom_cfgs["logger_cfgs"]["log_dir"]}', error_log_name),
        'w',
        encoding='utf-8',
    )
    agent = omnisafe.Agent(algo, env_id, custom_cfgs=custom_cfgs)
    reward, cost, ep_len = agent.learn()
    return reward, cost, ep_len

In [None]:
# total experiment num must can be divided by num_pool
# meanwhile, users should decide this value according to their machine
eg.run(train, gpu_id=gpu_id)

In [None]:
# just fill in the name of the parameter of which value you want to compare.
# then you can specify the value of the parameter you want to compare,
# or you can just specify how many values you want to compare in single graph at most,
# and the function will automatically generate all possible combinations of the graph.
# but the two mode can not be used at the same time.
# eg.analyze(parameter='env_id', values=None, compare_num=6, cost_limit=25)
eg.render(num_episodes=1, render_mode='rgb_array', width=256, height=256)
eg.evaluate(num_episodes=1)