# Create Custom Environment

In [15]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces


class GoLeftEnv(gym.Env):
    # Because of ipynb, we cannot implement the GUI ('human' render mode)
    metadata = {"render_modes": ["console"]}

    LEFT = 0
    RIGHT = 1

    def __init__(self, grid_size=10, render_mode="console"):
        super(GoLeftEnv, self).__init__()
        self.render_mode = render_mode
        self.grid_size = grid_size # Size of the 1D-grid
        self.agent_pos = grid_size - 1 # Initialize the agent at the right of the grid

        # Define action and observation space
        # They must be gym.spaces objects
        n_actions = 2
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the coordinate of the agent
        # this can be described both by Discrete and Box space
        self.observation_space = spaces.Box(
            low=0, high=self.grid_size, shape=(1,), dtype=np.float32
        )

    def reset(self, seed=None, options=None):
        """
        Important: the observation must be a numpy array
        :return: (np.array)
        """
        super().reset(seed=seed, options=options)
        self.agent_pos = self.grid_size - 1
        # here we convert to float32 to make it more general (in case we want to use continuous actions)
        return np.array([self.agent_pos]).astype(np.float32), {}  # empty info dict

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos -= 1
        elif action == self.RIGHT:
            self.agent_pos += 1
        else:
            raise ValueError(
                f"Received invalid action={action} which is not part of the action space"
            )

        # Account for the boundaries of the grid
        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

        # Are we at the left of the grid?
        terminated = bool(self.agent_pos == 0)
        truncated = False  # we do not limit the number of steps here

        # Null reward everywhere except when reaching the goal (left of the grid)
        reward = 1 if self.agent_pos == 0 else 0
        
        info = {}

        return (
            np.array([self.agent_pos]).astype(np.float32),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        # agent is represented as a cross, rest as a dot
        if self.render_mode == "console":
            print("." * self.agent_pos, end="")
            print("x", end="")
            print("." * (self.grid_size - self.agent_pos))

    def close(self):
        pass

Validate the env

In [16]:
from stable_baselines3.common.env_checker import check_env

env = GoLeftEnv()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

Test the env

In [17]:
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
# Hardcoded best agent: always go left!
n_steps = 20
for step in range(n_steps):
    print(f"Step {step + 1}")
    obs, reward, terminated, truncated, info = env.step(GO_LEFT)
    done = terminated or truncated
    print("obs=", obs, "reward=", reward, "done=", done)
    env.render()
    if done:
        print("Goal reached!", "reward=", reward)
        break

.........x.
Box(0.0, 10.0, (1,), float32)
Discrete(2)
0
Step 1
obs= [8.] reward= 0 done= False
........x..
Step 2
obs= [7.] reward= 0 done= False
.......x...
Step 3
obs= [6.] reward= 0 done= False
......x....
Step 4
obs= [5.] reward= 0 done= False
.....x.....
Step 5
obs= [4.] reward= 0 done= False
....x......
Step 6
obs= [3.] reward= 0 done= False
...x.......
Step 7
obs= [2.] reward= 0 done= False
..x........
Step 8
obs= [1.] reward= 0 done= False
.x.........
Step 9
obs= [0.] reward= 1 done= True
x..........
Goal reached! reward= 1


# Train on RL

In [19]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
vec_env = make_vec_env(GoLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))

# Train the agent
model = A2C("MlpPolicy", env, verbose=1).learn(5000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 15.2     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 1336     |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -0.179   |
|    explained_variance | -0.716   |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -0.0149  |
|    value_loss         | 0.000451 |
------------------------------------
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 11.7      |
|    ep_rew_mean        | 1         |
| time/                 |           |
|    fps                | 1400      |
|    iterations         | 200       |
|    time_

In [21]:
# Test the trained agent
# using the vecenv
obs = vec_env.reset()
n_steps = 20
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=True)
    print(f"Step {step + 1} and action={action}")
    obs, reward, done, info = vec_env.step(action)
    print("obs=", obs, "reward=", reward, "done=", done)
    vec_env.render() # ........x..: x is current position
    if done:
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        print("Goal reached!", "reward=", reward)
        break

Step 1 and action=[0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2 and action=[0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3 and action=[0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4 and action=[0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5 and action=[0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6 and action=[0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7 and action=[0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8 and action=[0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9 and action=[0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]


Train on Multiple Env and another RL algorithm

In [29]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
vec_env = make_vec_env(GoLeftEnv, n_envs=4, env_kwargs=dict(grid_size=10))

# Train the agent
model = PPO("MlpPolicy", env, verbose=1).learn(5000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 95.8     |
|    ep_rew_mean     | 1        |
| time/              |          |
|    fps             | 2723     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 56.1        |
|    ep_rew_mean          | 1           |
| time/                   |             |
|    fps                  | 2167        |
|    iterations           | 2           |
|    time_elapsed         | 1           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.020065323 |
|    clip_fraction        | 0.401       |
|    clip_range           | 0.2         |
|    entropy_loss  

In [30]:
# Test the trained agent
# using the vecenv
obs = vec_env.reset()
n_steps = 20
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=True)
    print(f"Step {step + 1} and action={action}")
    obs, reward, done, info = vec_env.step(action)
    print("obs=", obs, "reward=", reward, "done=", done)
    vec_env.render() # ........x..: x is current position
    
    if done.all():
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        print("Goal reached!", "reward=", reward)
        break

Step 1 and action=[0 0 0 0]
obs= [[8.]
 [8.]
 [8.]
 [8.]] reward= [0. 0. 0. 0.] done= [False False False False]
........x..
........x..
........x..
........x..
Step 2 and action=[0 0 0 0]
obs= [[7.]
 [7.]
 [7.]
 [7.]] reward= [0. 0. 0. 0.] done= [False False False False]
.......x...
.......x...
.......x...
.......x...
Step 3 and action=[0 0 0 0]
obs= [[6.]
 [6.]
 [6.]
 [6.]] reward= [0. 0. 0. 0.] done= [False False False False]
......x....
......x....
......x....
......x....
Step 4 and action=[0 0 0 0]
obs= [[5.]
 [5.]
 [5.]
 [5.]] reward= [0. 0. 0. 0.] done= [False False False False]
.....x.....
.....x.....
.....x.....
.....x.....
Step 5 and action=[0 0 0 0]
obs= [[4.]
 [4.]
 [4.]
 [4.]] reward= [0. 0. 0. 0.] done= [False False False False]
....x......
....x......
....x......
....x......
Step 6 and action=[0 0 0 0]
obs= [[3.]
 [3.]
 [3.]
 [3.]] reward= [0. 0. 0. 0.] done= [False False False False]
...x.......
...x.......
...x.......
...x.......
Step 7 and action=[0 0 0 0]
obs= [[2.]
 