This notebook has been inspired from [StableBaselines3 RL Colab Notebooks](https://github.com/Stable-Baselines-Team/rl-colab-notebooks)

# Stable Baselines 3

[Stable Baselines3 (SB3)](https://stable-baselines3.readthedocs.io/en/master/) is a set of reliable implementations of reinforcement learning algorithms in PyTorch.

In this notebook, you will learn the basics for using stable baselines3 library.

In [None]:
!pip install -q swig
!pip install -q gym[box2d]
!pip install -q gym[atari]
!pip install stable-baselines3[extra]

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m34.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m74.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m28.4 MB/s[0m eta [36m0:00:00[0m
[?25hLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting stable-baselines3[extra]
  Downloading stable_baselines3-2.0.0-py3-none-any.whl (178 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.4/178.4 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gymnasium==0.28.1 (from stable-baselines3[extra])


## Imports

Stable-Baselines works on environments that follow the [gym interface](https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html).


In [None]:
import gymnasium as gym
import numpy as np

  and should_run_async(code)


The first thing you need to import is the RL model, check the documentation to know what you can use on which problem

In [None]:
from stable_baselines3 import PPO, DQN

The next thing you need to import is the policy class that will be used to create the networks (for the policy/value functions).
This step is optional as you can directly use strings in the constructor:

```PPO('MlpPolicy', env)``` instead of ```PPO(MlpPolicy, env)```

Note that some algorithms like `SAC` have their own `MlpPolicy`, that's why using string for the policy is the recommened option.

In [None]:
from stable_baselines3.dqn import MlpPolicy, CnnPolicy
from stable_baselines3.common.env_util import make_vec_env

### Instantiate Environment

In [None]:
# Parallel Environments. Vectorized environments allow to easily multiprocess training.
vec_env = make_vec_env("LunarLander-v2", n_envs=4, wrapper_class=gym.wrappers.TimeLimit, wrapper_kwargs={"max_episode_steps":500})

### Instantiate Agent

In [None]:
model = PPO("MlpPolicy", vec_env, verbose=1)

Using cuda device


### Training the Agent

In [None]:
model.learn(total_timesteps=100000)

<stable_baselines3.ppo.ppo.PPO at 0x7f6bef614220>

### Evaluation

In [None]:
from stable_baselines3.common.evaluation import evaluate_policy
# Use a separate environement for evaluation
eval_env = make_vec_env("LunarLander-v2", n_envs=1, wrapper_class=gym.wrappers.TimeLimit, wrapper_kwargs={"max_episode_steps":500})

mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=5)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:-164.83 +/- 34.34


### Saving and Loading

In [None]:
model.save("ppo_lunarlander")

del model # remove to demonstrate saving and loading

model = PPO.load("ppo_lunarlander")

  and should_run_async(code)


### Visualization

In [None]:
# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob
import base64, io, os, shutil
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv

os.environ['SDL_VIDEODRIVER']='dummy'

In [None]:
shutil.rmtree('video', ignore_errors=True)
os.makedirs("video", exist_ok=True)

def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

def show_video_of_model():
    """
    :param env_id: (str)
    :param model: (RL model)
    :param video_length: (int)
    :param prefix: (str)
    :param video_folder: (str)
    """
    video_length=500
    eval_env = make_vec_env("LunarLander-v2", n_envs=1)
    # Start the video at step=0 and record 500 steps
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder="video/",
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix="",
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    # Close the video recorder
    eval_env.close()

In [None]:
show_video_of_model()

Saving video to /content/video/-step-0-to-step-500.mp4
Moviepy - Building video /content/video/-step-0-to-step-500.mp4.
Moviepy - Writing video /content/video/-step-0-to-step-500.mp4





Moviepy - Done !
Moviepy - video ready /content/video/-step-0-to-step-500.mp4


In [None]:
show_video()

# Gym and VecEnv wrappers

## Anatomy of a gym wrapper

A gym wrapper follows the [gym](https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html) interface: it has a `reset()` and `step()` method.

Because a wrapper is *around* an environment, we can access it with `self.env`, this allow to easily interact with it without modifying the original env.
There are many wrappers that have been predefined, for a complete list refer to [gym documentation](https://gymnasium.farama.org/api/wrappers/)

In [None]:
class CustomWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # Call the parent constructor, so we can access self.env later
        super().__init__(env)

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        obs, info = self.env.reset(**kwargs)

        return obs, info

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is this a final state (episode finished),
        is the max number of steps reached (episode finished artificially), additional informations
        """
        obs, reward, terminated, truncated, info = self.env.step(action)
        return obs, reward, terminated, truncated, info

## First example: limit the episode length

One practical use case of a wrapper is when you want to limit the number of steps by episode, for that you will need to overwrite the `done` signal when the limit is reached. It is also a good practice to pass that information in the `info` dictionary.

In [None]:
class TimeLimitWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    :param max_steps: (int) Max number of steps per episode
    """

    def __init__(self, env, max_steps=100):
        # Call the parent constructor, so we can access self.env later
        super(TimeLimitWrapper, self).__init__(env)
        self.max_steps = max_steps
        # Counter of steps per episode
        self.current_step = 0

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        # Reset the counter
        self.current_step = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is the episode over?, additional informations
        """
        self.current_step += 1
        obs, reward, terminated, truncated, info = self.env.step(action)
        # Overwrite the truncation signal when when the number of steps reaches the maximum
        if self.current_step >= self.max_steps:
            truncated = True
        return obs, reward, terminated, truncated, info

#### Test the wrapper

In [None]:
from gymnasium.envs.classic_control.pendulum import PendulumEnv

# Here we create the environment directly because gym.make() already wrap the environment in a TimeLimit wrapper otherwise
env = PendulumEnv()
# Wrap the environment
env = TimeLimitWrapper(env, max_steps=100)

In [None]:
obs, _ = env.reset()
done = False
n_steps = 0
while not done:
    # Take random actions
    random_action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(random_action)
    done = terminated or truncated
    n_steps += 1

print(n_steps, info)

100 {}


In practice, `gym` already have a wrapper for that named `TimeLimit` (`gym.wrappers.TimeLimit`) that is used by most environments.

## Second example: normalize actions

It is usually a good idea to normalize observations and actions before giving it to the agent, this prevents this [hard to debug issue](https://github.com/hill-a/stable-baselines/issues/473).

In this example, we are going to normalize the action space of *Pendulum-v1* so it lies in [-1, 1] instead of [-2, 2].

Note: here we are dealing with continuous actions, hence the `gym.Box` space

In [None]:
import numpy as np


class NormalizeActionWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # Retrieve the action space
        action_space = env.action_space
        assert isinstance(
            action_space, gym.spaces.Box
        ), "This wrapper only works with continuous action space (spaces.Box)"
        # Retrieve the max/min values
        self.low, self.high = action_space.low, action_space.high

        # We modify the action space, so all actions will lie in [-1, 1]
        env.action_space = gym.spaces.Box(
            low=-1, high=1, shape=action_space.shape, dtype=np.float32
        )

        # Call the parent constructor, so we can access self.env later
        super(NormalizeActionWrapper, self).__init__(env)

    def rescale_action(self, scaled_action):
        """
        Rescale the action from [-1, 1] to [low, high]
        (no need for symmetric action space)
        :param scaled_action: (np.ndarray)
        :return: (np.ndarray)
        """
        return self.low + (0.5 * (scaled_action + 1.0) * (self.high - self.low))

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float,bool, bool, dict) observation, reward, final state? truncated?, additional informations
        """
        # Rescale action from [-1, 1] to original [low, high] interval
        rescaled_action = self.rescale_action(action)
        obs, reward, terminated, truncated, info = self.env.step(rescaled_action)
        return obs, reward, terminated, truncated, info

#### Test before rescaling actions

In [None]:
original_env = gym.make("Pendulum-v1")

print(original_env.action_space.low)
for _ in range(10):
    print(original_env.action_space.sample())

[-2.]
[-0.9822863]
[0.57923865]
[1.1006286]
[0.6744999]
[-0.18531157]
[0.21119185]
[-1.6020843]
[-0.55515796]
[0.5308823]
[1.2406603]


#### Test the NormalizeAction wrapper

In [None]:
env = NormalizeActionWrapper(gym.make("Pendulum-v1"))

print(env.action_space.low)

for _ in range(10):
    print(env.action_space.sample())

[-1.]
[0.01409781]
[-0.47386414]
[-0.04637694]
[0.37356108]
[-0.24053073]
[0.61350214]
[0.17146954]
[-0.63801956]
[0.5656749]
[0.8749726]


#### Test with a RL algorithm

We are going to use the Monitor wrapper of stable baselines, which allow to monitor training stats (mean episode reward, mean episode length)

In [None]:
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

In [None]:
env = Monitor(gym.make("Pendulum-v1"))
env = DummyVecEnv([lambda: env])

In [None]:
model = PPO("MlpPolicy", env, verbose=1).learn(int(1000))

Using cuda device
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 200       |
|    ep_rew_mean     | -1.28e+03 |
| time/              |           |
|    fps             | 625       |
|    iterations      | 1         |
|    time_elapsed    | 3         |
|    total_timesteps | 2048      |
----------------------------------


With the action wrapper

In [None]:
normalized_env = Monitor(gym.make("Pendulum-v1"))
# Note that we can use multiple wrappers
normalized_env = NormalizeActionWrapper(normalized_env)
normalized_env = DummyVecEnv([lambda: normalized_env])

In [None]:
model_2 = PPO("MlpPolicy", normalized_env, verbose=1).learn(int(1000))

Using cuda device
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 200       |
|    ep_rew_mean     | -1.24e+03 |
| time/              |           |
|    fps             | 787       |
|    iterations      | 1         |
|    time_elapsed    | 2         |
|    total_timesteps | 2048      |
----------------------------------


## Additional wrappers: VecEnvWrappers

In the same vein as gym wrappers, stable baselines provide wrappers for `VecEnv`. Among the different wrappers that exist (and you can create your own), you should know:

- VecNormalize: it computes a running mean and standard deviation to normalize observation and returns
- VecFrameStack: it stacks several consecutive observations (useful to integrate time in the observation, e.g. successive frame of an atari game)

More info in the [documentation](https://stable-baselines3.readthedocs.io/en/master/guide/vec_envs.html#wrappers)

Note: when using `VecNormalize` wrapper, you must save the running mean and std along with the model, otherwise you will not get proper results when loading the agent again. If you use the [rl zoo](https://github.com/DLR-RM/rl-baselines3-zoo), this is done automatically

In [None]:
from stable_baselines3.common.vec_env import VecNormalize, VecFrameStack

env = DummyVecEnv([lambda: gym.make("Pendulum-v1")])
normalized_vec_env = VecNormalize(env)

In [None]:
obs = normalized_vec_env.reset()
for _ in range(10):
    action = [normalized_vec_env.action_space.sample()]
    obs, reward, _, _ = normalized_vec_env.step(action)
    print(obs, reward)

[[-0.93661344  0.9195261  -0.9990789 ]] [-10.]
[[-1.2592673  1.2820711 -1.2628134]] [-2.0185006]
[[-1.4355156  1.505124  -1.4633185]] [-1.3002148]
[[-1.4893022  1.6301751 -1.3982632]] [-1.0245482]
[[-1.4788879  1.7318426 -1.4727683]] [-0.86552024]
[[-1.3687711  1.789341  -1.4059162]] [-0.77612895]
[[-1.1442996  1.7758305 -1.0180179]] [-0.70622176]
[[-0.7836936  1.7690885 -1.0344601]] [-0.56917614]
[[-0.24591964  1.7347095  -0.80566925]] [-0.44439477]
[[ 0.43274188  1.6754254  -0.45007497]] [-0.34939522]


## PPO on Atari Game: MsPacMan

In [None]:
from stable_baselines3.common.env_util import make_atari_env
# from stable_baselines3.common.atari_wrappers import AtariWrapper
vec_env = make_atari_env("ALE/MsPacman-v5", n_envs=4)

model = PPO("CnnPolicy", vec_env, verbose=1)
model.learn(total_timesteps=100000)
model.save("ppo_Pacman")


Using cuda device
Wrapping the env in a VecTransposeImage.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 542      |
|    ep_rew_mean     | 456      |
| time/              |          |
|    fps             | 228      |
|    iterations      | 1        |
|    time_elapsed    | 35       |
|    total_timesteps | 8192     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 542         |
|    ep_rew_mean          | 451         |
| time/                   |             |
|    fps                  | 216         |
|    iterations           | 2           |
|    time_elapsed         | 75          |
|    total_timesteps      | 16384       |
| train/                  |             |
|    approx_kl            | 0.008085396 |
|    clip_fraction        | 0.074       |
|    clip_range           | 0.2         |
|    entropy_loss         | -2.19       |
|    explaine

### Visualization

In [None]:
# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob
import base64, io, os, shutil
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv

os.environ['SDL_VIDEODRIVER']='dummy'

In [None]:
shutil.rmtree('pacman_video', ignore_errors=True)
os.makedirs("pacman_video", exist_ok=True)

def show_video():
    mp4list = glob.glob('pacman_video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

def show_video_of_model():

    """
    :param env_id: (str)
    :param model: (RL model)
    :param video_length: (int)
    :param prefix: (str)
    :param video_folder: (str)
    """
    video_length=500
    eval_env = make_atari_env("ALE/MsPacman-v5", n_envs=1)
    # Start the video at step=0 and record 500 steps
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder="pacman_video/",
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix="",
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    # Close the video recorder
    eval_env.close()

In [None]:
show_video_of_model()

  logger.warn(


Saving video to /content/pacman_video/-step-0-to-step-500.mp4
Moviepy - Building video /content/pacman_video/-step-0-to-step-500.mp4.
Moviepy - Writing video /content/pacman_video/-step-0-to-step-500.mp4





Moviepy - Done !
Moviepy - video ready /content/pacman_video/-step-0-to-step-500.mp4


In [None]:
show_video()