# OmniSafe Tutorial - Environment Customization From Zero

OmniSafe: https://github.com/PKU-Alignment/omnisafe

Documentation: https://omnisafe.readthedocs.io/en/latest/

Safety-Gymnasium: https://www.safety-gymnasium.com/

[Safety-Gymnasium](https://www.safety-gymnasium.com/) is a highly scalable and customizable Safe Reinforcement Learning library, aiming to deliver a good view of benchmarking Safe Reinforcement Learning (Safe RL) algorithms and a more standardized setting of environments. 

## Introduction

This section, along with [Tutorial 4: Environment Customization from Community](./4.Environment%20Customization%20from%20Community.ipynb), introduces how to enjoy the full set of training, recording, and saving frameworks provided by OmniSafe for customized environments. This section focuses on introducing beginners to SafeRL on how to create an environment from scratch, while [Tutorial 4: Environment Customization from Community](./4.Gymnasium%20Customization.ipynb) focuses on how to make minimal adaptations to existing community environments, such as [Gymnasium](https://github.com/Farama-Foundation/Gymnasium), to embed them in OmniSafe.

Specifically, this section provides a simplest template for customizing environments. Through this template, you will understand:

- How to create and register an environment in OmniSafe.
- How to specify customization parameters when creating an environment.
- How to record environment-specific information.

## Quick Installation

In [None]:
# Install via pip (ignore it if you have already installed).
%pip install omnisafe

In [None]:
# Install from source (ignore it if you have already installed).
## clone the repo
%git clone https://github.com/PKU-Alignment/omnisafe
%cd omnisafe

## install it
%pip install -e .

## The Simplest Custom Environment Template
The customized environment of OmniSafe can be implemented through a single file. We will introduce you to the simplest custom environment template, which will serve as a quick start.

### Custom Environment Design
Here, we will detail the design process of a simple random environment. If you are an expert in RL or an experienced researcher, you can skip this module to [Custom Environment Embedding](#custom-environment-embedding) or [Tutorial 4: Environment Customization from Community](./4.Gymnasium%20Customization.ipynb).

In [1]:
# import all we need
from __future__ import annotations

import random
import omnisafe
from typing import Any, ClassVar

import torch
from gymnasium import spaces

from omnisafe.envs.core import CMDP, env_register, env_unregister

In [2]:
# Define environment class
class ExampleEnv(CMDP):
    _support_envs: ClassVar[list[str]] = ['Example-v0']  # Supported task names

    need_auto_reset_wrapper = True  # Whether `AutoReset` Wrapper is needed
    need_time_limit_wrapper = True  # Whether `TimeLimit` Wrapper is needed

You need to pay attention to the following details in the above code:

- **Task name definition** The supported task names for the environment are provided in `_support_envs`.
- **Wrapper configuration** Automatic reset and time limit are defined by setting `need_auto_reset_wrapper` and `need_time_limit_wrapper`.
- **Number of parallel environments** If your environment supports vectorized parallelism, set it through the `_num_envs` parameter.

In [3]:
class ExampleEnv(CMDP):
    _support_envs: ClassVar[list[str]] = ['Example-v0', 'Example-v1']  # Supported task names

    need_auto_reset_wrapper = True  # Whether `AutoReset` Wrapper is needed
    need_time_limit_wrapper = True  # Whether `TimeLimit` Wrapper is needed

    def __init__(self, env_id: str, **kwargs) -> None:
        self._count = 0
        self._num_envs = 1
        self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
        self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))

Complete the `__init__` method definition. Here, you need to specify the action space and observation space of the environment. You need to define these according to the specific task you are currently designing. For example:
```python
if env_id == 'Example-v0':
    self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
    self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))
elif env_id == 'Example-v1':
    self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(4,))
    self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
else:
    raise NotImplementedError
```
**Note:** As it is necessary to provide a standard interface for the higher-level modules, please follow these two variable names, i.e., `self._observation_space` and `self._action_space`, when designing the environment.

Complete the definition of methods related to environment initialization. `reset` and `set_seed` are the standard interfaces for OmniSafe environment initialization. Where `reset` resets the environment state and the step counter. Meanwhile, `set_seed` ensures the reproducibility of experiments by setting the random seed. The `max_episode_steps` method, decorated with `@property`, is used to pass the maximum number of steps per episode that need to be limited to the `TimeLimit` Wrapper. The code is as follows:

In [4]:
class ExampleEnv(CMDP):
    _support_envs: ClassVar[list[str]] = ['Example-v0', 'Example-v1']  # Supported task names

    need_auto_reset_wrapper = True  # Whether `AutoReset` Wrapper is needed
    need_time_limit_wrapper = True  # Whether `TimeLimit` Wrapper is needed

    def __init__(self, env_id: str, **kwargs) -> None:
        self._count = 0
        self._num_envs = 1
        self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
        self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))

    def set_seed(self, seed: int) -> None:
        random.seed(seed)

    def reset(
        self,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[torch.Tensor, dict]:
        if seed is not None:
            self.set_seed(seed)
        obs = torch.as_tensor(self._observation_space.sample())
        self._count = 0
        return obs, {}

    @property
    def max_episode_steps(self) -> None:
        """The max steps per episode."""
        return 10

Complete the definition of functional methods. The `render` method is used for rendering the environment; the `close` method is used for cleanup after training ends.

In [5]:
class ExampleEnv(CMDP):
    _support_envs: ClassVar[list[str]] = ['Example-v0', 'Example-v1']  # Supported task names

    need_auto_reset_wrapper = True  # Whether `AutoReset` Wrapper is needed
    need_time_limit_wrapper = True  # Whether `TimeLimit` Wrapper is needed

    def __init__(self, env_id: str, **kwargs) -> None:
        self._count = 0
        self._num_envs = 1
        self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
        self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))

    def set_seed(self, seed: int) -> None:
        random.seed(seed)

    def reset(
        self,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[torch.Tensor, dict]:
        if seed is not None:
            self.set_seed(seed)
        obs = torch.as_tensor(self._observation_space.sample())
        self._count = 0
        return obs, {}

    @property
    def max_episode_steps(self) -> None:
        """The max steps per episode."""
        return 10

    def render(self) -> Any:
        pass

    def close(self) -> None:
        pass

Complete the definition of the step method. Here is the core interaction logic of your customized environment. You only need to adjust according to the data input and output format in this example. You can also directly change the random interaction dynamics in this example to the dynamics of your environment.

In [6]:
class ExampleEnv(CMDP):
    _support_envs: ClassVar[list[str]] = ['Example-v0', 'Example-v1']  # Supported task names

    need_auto_reset_wrapper = True  # Whether `AutoReset` Wrapper is needed
    need_time_limit_wrapper = True  # Whether `TimeLimit` Wrapper is needed

    def __init__(self, env_id: str, **kwargs) -> None:
        self._count = 0
        self._num_envs = 1
        self._observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,))
        self._action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))

    def set_seed(self, seed: int) -> None:
        random.seed(seed)

    def reset(
        self,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[torch.Tensor, dict]:
        if seed is not None:
            self.set_seed(seed)
        obs = torch.as_tensor(self._observation_space.sample())
        self._count = 0
        return obs, {}

    @property
    def max_episode_steps(self) -> None:
        """The max steps per episode."""
        return 10

    def render(self) -> Any:
        pass

    def close(self) -> None:
        pass

    def step(
        self,
        action: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict]:
        self._count += 1
        obs = torch.as_tensor(self._observation_space.sample())
        reward = 2 * torch.as_tensor(random.random())
        cost = 2 * torch.as_tensor(random.random())
        terminated = torch.as_tensor(random.random() > 0.9)
        truncated = torch.as_tensor(self._count > 10)
        return obs, reward, cost, terminated, truncated, {'final_observation': obs}

Next, let's try to run the environment for 10 time steps and observe the interaction information.

In [7]:
env = ExampleEnv(env_id='Example-v0')
env.reset(seed=0)
while True:
    action = env.action_space.sample()
    obs, reward, cost, terminated, truncated, info = env.step(action)
    print('-' * 20)
    print(f'obs: {obs}')
    print(f'reward: {reward}')
    print(f'cost: {cost}')
    print(f'terminated: {terminated}')
    print(f'truncated: {truncated}')
    print('*' * 20)
    if terminated or truncated:
        break
env.close()

--------------------
obs: tensor([0.5903, 0.9825, 0.6966])
reward: 1.6888437271118164
cost: 1.5159088373184204
terminated: False
truncated: False
********************
--------------------
obs: tensor([-0.0615,  0.8932, -0.1051])
reward: 0.5178334712982178
cost: 1.0225493907928467
terminated: False
truncated: False
********************
--------------------
obs: tensor([ 0.7570, -0.0613,  0.9682])
reward: 1.5675971508026123
cost: 0.6066254377365112
terminated: False
truncated: False
********************
--------------------
obs: tensor([ 0.1937,  0.5437, -0.4663])
reward: 1.1667640209197998
cost: 1.8162257671356201
terminated: False
truncated: False
********************
--------------------
obs: tensor([-0.9458, -0.1812,  0.4118])
reward: 0.5636757016181946
cost: 1.511608362197876
terminated: False
truncated: False
********************
--------------------
obs: tensor([-0.9290, -0.0350,  0.3893])
reward: 0.5010126829147339
cost: 1.8194924592971802
terminated: True
truncated: False
******

Congratulations! You have successfully completed the basic environment definition. Next, we will introduce how to register this environment into OmniSafe, and implement steps such as environment parameter passing, interaction information recording, algorithm training, and result saving.

### Custom Environment Embedding

### Quick Training

Thanks to the carefully designed registration mechanism of OmniSafe, we only need one decorator to register this environment into the OmniSafe's environment list.

In [8]:
@env_register
class ExampleEnv(ExampleEnv):
    pass

Registering an environment with the same name will cause an error, due to **environment name conflict**.

In [9]:
@env_register
class CustomExampleEnv(ExampleEnv):
    example_configs = 1


env = CustomExampleEnv('Example-v0')
env.example_configs

1

So, you need to manually unregister the environment first.

In [10]:
@env_unregister
class CustomExampleEnv(ExampleEnv):
    pass

Afterwards, you can re-register the environment. In this tutorial, we will nest both the `env_register` and `env_unregister` decorators together. This is to avoid errors caused by repeated registration of the environment, ensuring that the environment is registered only once, so users can modify and run the code multiple times while reading this tutorial.

In [11]:
@env_register
@env_unregister
class CustomExampleEnv(ExampleEnv):
    example_configs = 2


env = CustomExampleEnv('Example-v0')
env.example_configs

CustomExampleEnv has not been registered yet


2

Subsequently, you can use the algorithms in OmniSafe to train this custom environment.

In [12]:
custom_cfgs = {
    'train_cfgs': {
        'total_steps': 30,
    },
    'algo_cfgs': {
        'steps_per_epoch': 10,
        'update_iters': 1,
    },
}
agent = omnisafe.Agent('PPOLag', 'Example-v0', custom_cfgs=custom_cfgs)
agent.learn()

Loading PPOLag.yaml from /home/safepo/dev-env/omnisafe_zjy/omnisafe/utils/../configs/on-policy/PPOLag.yaml


(6.297085762023926, 6.2187700271606445, 5.25)

Well done! We have completed the embedding and training of this customized environment. Next, we will further explore how to specify hyperparameters for the environment.

### Parameter Setting

Starting with a new example environment, assume this environment requires a parameter named `num_agents`. We will show how to complete the parameter setting without modifying OmniSafe's code.

In [13]:
@env_register
@env_unregister
class NewExampleEnv(ExampleEnv):  # make a new environment
    _support_envs: ClassVar[list[str]] = ['NewExample-v0', 'NewExample-v1']
    num_agents: ClassVar[int] = 1

    def __init__(self, env_id: str, **kwargs) -> None:
        super(NewExampleEnv, self).__init__(env_id, **kwargs)
        self.num_agents = kwargs.get('num_agents', 1)

NewExampleEnv has not been registered yet


Now, the `num_agents` parameter is set to a default value: `1`.

In [14]:
new_env = NewExampleEnv('NewExample-v0')
new_env.num_agents

1

Below we will show how to modify this parameter through OmniSafe's interface and train:

In [15]:
custom_cfgs.update({'env_cfgs': {'num_agents': 2}})
agent = omnisafe.Agent('PPOLag', 'NewExample-v0', custom_cfgs=custom_cfgs)
agent.agent._env._env.num_agents

Loading PPOLag.yaml from /home/safepo/dev-env/omnisafe_zjy/omnisafe/utils/../configs/on-policy/PPOLag.yaml


2

Excellent! We have set `num_agents` to 2. This means we have successfully implemented hyperparameter setting without modifying the code.

### Training Information Recording

While running the training code, you may have noticed that OmniSafe records training information through `Logger`, for example:

```bash
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Metrics                        ┃ Value                   ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Metrics/EpRet                  │ 2.046875                │
│ Metrics/EpCost                 │ 2.89453125              │
│ Metrics/EpLen                  │ 3.25                    │
│ Train/Epoch                    │ 3.0                     │
...
```
So, can we output information from the environment into the log? The answer is yes, and this process also does not require modifying OmniSafe's code. You only need to implement two standard interfaces:
1. In the `__init__` function, add the information you want to output to `self.env_spec_log`.
2. Instantiate the `spec_log` function to record the required information.

**Please note:** Currently, OmniSafe only supports recording this information at the end of each epoch, not after each step.

In [16]:
@env_register
@env_unregister
class NewExampleEnv(ExampleEnv):
    _support_envs: ClassVar[list[str]] = ['NewExample-v0', 'NewExample-v1']

    # define what to log
    def __init__(self, env_id: str, **kwargs) -> None:
        super(NewExampleEnv, self).__init__(env_id, **kwargs)
        self.env_spec_log = {'Env/Success_counts': 0}

    # interact with the environment and log
    def step(self, action):
        obs, reward, cost, terminated, truncated, info = super().step(action)
        success = int(reward > cost)
        self.env_spec_log['Env/Success_counts'] += success
        return obs, reward, cost, terminated, truncated, info

    # write to logger
    def spec_log(self, logger) -> dict[str, Any]:
        logger.store({'Env/Success_counts': self.env_spec_log['Env/Success_counts']})
        self.env_spec_log['Env/Success_counts'] = 0

Next, we will briefly train and observe whether this information has been successfully recorded.

In [17]:
custom_cfgs.update({'train_cfgs': {'total_steps': 10}})
agent = omnisafe.Agent('PPOLag', 'NewExample-v0', custom_cfgs=custom_cfgs)
agent.learn()

Loading PPOLag.yaml from /home/safepo/dev-env/omnisafe_zjy/omnisafe/utils/../configs/on-policy/PPOLag.yaml


(5.625942230224609, 6.960921287536621, 5.0)

Nice! The above code has outputted the environment-specific information `Env/Success_counts` to the terminal. This process does not require any modifications to the original code.

## Summary
OmniSafe aims to become the foundational software for safe reinforcement learning. We will continue to refine the environmental interface standards of OmniSafe, enabling it to adapt to various safe reinforcement learning tasks and empower diverse safety scenarios.