了解如何使用 Gym Wrappers，它可以进行监控、标准化、限制步数、功能增强等。

查看加载和保存功能，以及如何读取输出的文件以进行可能的导出。

# gymnasium.wrappers包装器

Gymnasium的包装器可以通过继承gymnasium.Wrapper类来创建。可以重写以下方法来自定义环境的行为：

- reset(): 当环境重置时调用。
- step(action): 在每个步骤中调用，用于执行动作。
- render(mode='human'): 用于环境的渲染。
- close(): 当环境关闭时调用。

In [None]:
# 创建自定义包装器
# 假设我们想要创建一个简单的包装器，它会修改环境的奖励信号。以下是如何实现它的例子：

import gymnasium as gym
from gymnasium import spaces

class RewardModifierWrapper(gym.Wrapper):
    def __init__(self, env):
        super(RewardModifierWrapper, self).__init__(env)
    
    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        # 修改奖励：将所有的奖励乘以一个常数因子
        reward = reward * 3
        return observation, reward, terminated, truncated, info

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)


In [None]:
#使用自定义包装器
# 一旦定义了自定义包装器，我们就可以将其应用到任何Gymnasium环境中：

env = gym.make('CartPole-v1')
env = RewardModifierWrapper(env)

observation = env.reset()
for _ in range(1000):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    if terminated:
        observation = env.reset()


In [3]:
import gymnasium as gym
from stable_baselines3 import A2C, SAC, PPO, TD3

保存和加载稳定sb模型非常简单：我们可以直接在模型上调用 .save() 和 .load() 。

In [None]:
import os

# 这里定义了一个保存模型的目录/tmp/gym/。os.makedirs用于创建这个目录。参数exist_ok=True表示如果目录已存在，不会抛出错误。
save_dir = "/tmp/gym/"
os.makedirs(save_dir, exist_ok=True)

model = PPO("MlpPolicy", "Pendulum-v1", verbose=0).learn(8_000)
'''一行创建训练代码，创建并训练了一个使用PPO算法的模型。
"MlpPolicy"表示使用的是多层感知器（MLP，一种简单的前馈神经网络）策略。
"Pendulum-v1"是Gym提供的一个环境，挑战是控制一个倒立摆让它保持直立。
verbose=0意味着在训练过程中不输出额外的日志信息。
.learn(8_000)指定了训练步数为8000步。
'''
# The model will be saved under PPO_tutorial.zip
model.save(f"{save_dir}/PPO_tutorial")

# sample an observation from the environment
obs = model.env.observation_space.sample()
# Check prediction before saving
print("pre saved", model.predict(obs, deterministic=True))
'''
首先从模型的环境观测空间中随机采样一个观测值。然后，使用model.predict方法对这个观测值进行预测，
deterministic=True参数确保预测是确定性的，即在给定相同的观测值下总是产生相同的动作。
'''

del model  # delete trained model to demonstrate loading
loaded_model = PPO.load(f"{save_dir}/PPO_tutorial")
'''
为了演示加载功能，首先删除当前的model对象，然后使用PPO.load方法从保存的文件中加载模型。
'''
# Check that the prediction is the same after loading (for the same observation)
print("loaded", loaded_model.predict(obs, deterministic=True))
'''
使用同一个观测值obs，对加载后的模型进行预测，并打印结果。这里的目的是验证保存和加载过程是否正确无误，
即确保加载后的模型能够产生与之前相同的预测结果。
'''

sb的save方法非常强大，因为使用当前权重保存训练超参数。这意味着在实践中，可以简单地加载自定义模型，而无需重新定义参数，然后继续学习。

加载函数还可以在加载时更新模型的类变量。

In [None]:
import os
from stable_baselines3.common.vec_env import DummyVecEnv
# DummyVecEnv是Stable Baselines 3提供的一个工具，用于创建虚拟环境的容器，可以提高某些类型算法的训练效率。
# Create save dir
save_dir = "/tmp/gym/"
os.makedirs(save_dir, exist_ok=True)

model = A2C("MlpPolicy", "Pendulum-v1", verbose=0, gamma=0.9, n_steps=20).learn(8000)
'''
使用A2C算法训练模型，选择MlpPolicy，目标环境是Pendulum-v1。gamma=0.9设置了折扣因子，用于计算未来奖励的当前价值；
n_steps=20设置了每次更新模型之前的步数。训练过程中不显示额外的日志信息(verbose=0)，并训练8000个时间步。
'''
# The model will be saved under A2C_tutorial.zip
model.save(f"{save_dir}/A2C_tutorial")

del model  # delete trained model to demonstrate loading

# load the model, and when loading set verbose to 1
loaded_model = A2C.load(f"{save_dir}/A2C_tutorial", verbose=1)
#加载时设置verbose=1以显示日志信息。

# show the save hyperparameters
print(f"loaded: gamma={loaded_model.gamma}, n_steps={loaded_model.n_steps}")

# as the environment is not serializable, we need to set a new instance of the environment
loaded_model.set_env(DummyVecEnv([lambda: gym.make("Pendulum-v1")]))
'''
由于环境不是可序列化的，因此在加载模型后需要设置一个新的环境实例。这里使用DummyVecEnv来创建环境，
它允许模型以向量化的方式处理多个实例，即使在这个例子中只用到了一个环境实例。
'''
# and continue training
loaded_model.learn(8_000)

## 详细说明DummyVecEnv

`DummyVecEnv`是Stable Baselines3库中的一个工具，用于创建一个简单的向量化环境。向量化环境允许你同时运行多个实例的环境，这样可以加快训练过程，因为它可以并行地收集多个环境的经验。然而，与更高级的并行环境如`SubprocVecEnv`不同，`DummyVecEnv`并不在真正的并行进程中运行这些环境实例，而是在单一进程中顺序执行它们。尽管如此，它仍然是用于测试和开发时简化环境管理的有用工具。

### 为什么使用`DummyVecEnv`

- **简化API**：它提供了一个统一的API来处理单个环境或多个环境的情况，让算法的实现可以无缝地在多个环境上运行。
- **开发和测试**：在开发和测试阶段，`DummyVecEnv`可以帮助快速迭代和测试，而不需要设置复杂的多进程环境。
- **兼容性**：它确保了与Stable Baselines3中的算法兼容，因为这些算法期望环境是向量化的。

### 如何使用`DummyVecEnv`

创建`DummyVecEnv`通常涉及将一个或多个环境的构造函数传递给它。下面是一个如何使用`DummyVecEnv`来包装单个Gym环境的例子：



In [1]:
from stable_baselines3.common.vec_env import DummyVecEnv
import gymnasium as gym

# 创建环境的函数
def make_env():
    return gym.make('CartPole-v1')

# 使用DummyVecEnv包装环境
env = DummyVecEnv([make_env])  # 注意这里传递的是一个函数列表



在这个例子中，`make_env`函数返回一个新的`CartPole-v1`环境实例。`DummyVecEnv`接受一个函数列表，每个函数在调用时应该返回一个新的环境实例。即使你只有一个环境，你也需要将构造函数放在列表中，因为`DummyVecEnv`期望能够处理多个环境。

### `DummyVecEnv`和真实并行环境的对比

虽然`DummyVecEnv`提供了一个向量化环境的简单实现，但它并不提供真正的并行执行能力。对于需要大规模并行收集数据以加速训练的情况，你可能需要考虑使用`SubprocVecEnv`或其他并行环境实现。这些实现使用Python的多进程功能来在真正的并行进程中运行每个环境实例，可以显著减少数据收集的时间。

总之，`DummyVecEnv`是一个在单个进程中管理和执行多个环境实例的有用工具，非常适合于快速开发和测试，但在处理需要高效并行数据收集的复杂场景时，可能需要更高级的向量化环境实现。

## Gym and VecEnv wrappers包装器

gym 包装器遵循gym 接口：它有一个reset() 和step() 方法。

因为包装器是围绕环境的，所以我们可以使用 self.env 访问它，这允许轻松地与其交互，而无需修改原始环境。有许多已预定义的包装器，有关完整列表，请参阅gym 文档。

In [None]:
class CustomWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # Call the parent constructor, so we can access self.env later
        super().__init__(env)

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        obs, info = self.env.reset(**kwargs)

        return obs, info

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is this a final state (episode finished),
        is the max number of steps reached (episode finished artificially), additional informations
        """
        obs, reward, terminated, truncated, info = self.env.step(action)
        return obs, reward, terminated, truncated, info

## 第一个例子：限制episode长度

包装器的一个实际用例是当想要按episode限制步骤数时，使用包装器。因为达到限制时需要覆盖done信号。在信息字典中传递该信息也是一个很好的做法。

In [None]:
class TimeLimitWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    :param max_steps: (int) Max number of steps per episode
    """

    def __init__(self, env, max_steps=100):
        # Call the parent constructor, so we can access self.env later
        super(TimeLimitWrapper, self).__init__(env)
        self.max_steps = max_steps
        # Counter of steps per episode
        self.current_step = 0

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        # Reset the counter
        self.current_step = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is the episode over?, additional informations
        """
        self.current_step += 1
        obs, reward, terminated, truncated, info = self.env.step(action)
        # Overwrite the truncation signal when when the number of steps reaches the maximum
        if self.current_step >= self.max_steps:
            truncated = True
        return obs, reward, terminated, truncated, info

In [None]:
# Test the wrapper

from gymnasium.envs.classic_control.pendulum import PendulumEnv

# Here we create the environment directly because gym.make() already wrap the environment in a TimeLimit wrapper otherwise
env = PendulumEnv()
# Wrap the environment
env = TimeLimitWrapper(env, max_steps=100)

In [None]:
obs, _ = env.reset()
done = False
n_steps = 0
while not done:
    # Take random actions
    random_action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(random_action)
    done = terminated or truncated
    n_steps += 1

print(n_steps, info)

实际上，gym 已经有一个名为 TimeLimit (gym.wrappers.TimeLimit) 的包装器，大多数环境都使用该包装器。

## 第二个示例：规范动作

在将观察和动作提供给agent之前将其标准化通常是一个好主意，这可以防止出现难以调试的问题。

在此示例中，我们将标准化 Pendulum-v1 的动作空间，使其位于 [-1, 1] 而不是 [-2, 2]。

注意：这里我们处理的是连续动作，因此是gym.Box空间

In [2]:
import numpy as np


class NormalizeActionWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # Retrieve the action space
        action_space = env.action_space
        assert isinstance(
            action_space, gym.spaces.Box
        ), "This wrapper only works with continuous action space (spaces.Box)"
        # Retrieve the max/min values
        self.low, self.high = action_space.low, action_space.high

        # We modify the action space, so all actions will lie in [-1, 1]
        env.action_space = gym.spaces.Box(
            low=-1, high=1, shape=action_space.shape, dtype=np.float32
        )

        # Call the parent constructor, so we can access self.env later
        super(NormalizeActionWrapper, self).__init__(env)

    def rescale_action(self, scaled_action):
        """
        Rescale the action from [-1, 1] to [low, high]
        (no need for symmetric action space)
        :param scaled_action: (np.ndarray)
        :return: (np.ndarray)
        """
        return self.low + (0.5 * (scaled_action + 1.0) * (self.high - self.low))

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float,bool, bool, dict) observation, reward, final state? truncated?, additional informations
        """
        # Rescale action from [-1, 1] to original [low, high] interval
        rescaled_action = self.rescale_action(action)
        obs, reward, terminated, truncated, info = self.env.step(rescaled_action)
        return obs, reward, terminated, truncated, info

In [4]:
# 重新缩放动作之前进行测试

original_env = gym.make("Pendulum-v1")

print(original_env.action_space.low)
for _ in range(10):
    print(original_env.action_space.sample())

[-2.]
[1.8424448]
[-0.9342433]
[1.5242685]
[0.24446234]
[1.6761957]
[-0.29506642]
[0.5200963]
[-1.7365894]
[1.2014906]
[1.3566116]


In [5]:
# 测试 NormalizeAction 包装器

env = NormalizeActionWrapper(gym.make("Pendulum-v1"))

print(env.action_space.low)

for _ in range(10):
    print(env.action_space.sample())

[-1.]
[-0.07592568]
[0.56407446]
[0.19095245]
[0.37957114]
[-0.6485151]
[0.64774466]
[-0.5325325]
[-0.31494254]
[-0.8501827]
[0.38907593]


### 使用 RL 算法进行测试

In [6]:
# 我们将使用sb的监控包装器，它允许监控训练统计数据（平均episode奖励、平均episode长度）
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

In [7]:
env = Monitor(gym.make("Pendulum-v1"))
env = DummyVecEnv([lambda: env])

In [8]:
model = A2C("MlpPolicy", env, verbose=1).learn(int(1000))

Using cpu device
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 200       |
|    ep_rew_mean        | -1.38e+03 |
| time/                 |           |
|    fps                | 2916      |
|    iterations         | 100       |
|    time_elapsed       | 0         |
|    total_timesteps    | 500       |
| train/                |           |
|    entropy_loss       | -1.42     |
|    explained_variance | 0.0182    |
|    learning_rate      | 0.0007    |
|    n_updates          | 99        |
|    policy_loss        | -29.3     |
|    std                | 0.999     |
|    value_loss         | 939       |
-------------------------------------
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 200       |
|    ep_rew_mean        | -1.46e+03 |
| time/                 |           |
|    fps                | 3059      |
|    iterations         | 200       |
|    time_elapsed       | 0      

In [9]:
# 使用动作包装器

normalized_env = Monitor(gym.make("Pendulum-v1"))
# Note that we can use multiple wrappers
normalized_env = NormalizeActionWrapper(normalized_env)
normalized_env = DummyVecEnv([lambda: normalized_env])

In [10]:
model_2 = A2C("MlpPolicy", normalized_env, verbose=1).learn(int(1000))

Using cpu device
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 200      |
|    ep_rew_mean        | -1.3e+03 |
| time/                 |          |
|    fps                | 2684     |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -1.43    |
|    explained_variance | 0.0177   |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -27.1    |
|    std                | 1.01     |
|    value_loss         | 576      |
------------------------------------
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 200       |
|    ep_rew_mean        | -1.31e+03 |
| time/                 |           |
|    fps                | 2894      |
|    iterations         | 200       |
|    time_elapsed       | 0         |
|    total_ti

## Additional wrappers: VecEnvWrappers

与gym包装器一样，Stable Baselines为 VecEnv 提供了包装器。在现有的不同包装器中（您可以创建自己的包装器）：

VecNormalize：它计算运行平均值和标准差以标准化观察并返回
VecFrameStack：它堆叠多个连续的观察结果（有助于整合观察中的时间，例如 atari 游戏的连续帧）

注意：使用 VecNormalize 包装器时，必须将运行平均值和标准差与模型一起保存，否则再次加载代理时将无法获得正确的结果。

In [11]:
from stable_baselines3.common.vec_env import VecNormalize, VecFrameStack

env = DummyVecEnv([lambda: gym.make("Pendulum-v1")])
normalized_vec_env = VecNormalize(env)

In [12]:
obs = normalized_vec_env.reset()
for _ in range(10):
    action = [normalized_vec_env.action_space.sample()]
    obs, reward, _, _ = normalized_vec_env.step(action)
    print(obs, reward)

[[0.233566   0.9155393  0.98969907]] [-10.]
[[0.5814982 1.182631  0.9385529]] [-2.0202646]
[[0.7960477 1.2037266 1.5585014]] [-1.2303505]
[[0.8721311 1.1445833 1.5693926]] [-0.8966906]
[[0.66916335 0.86611915 1.7716519 ]] [-0.71118915]
[[0.47672254 0.6329985  1.4147632 ]] [-0.59599686]
[[0.01398723 0.09806831 1.60136   ]] [-0.5148258]
[[-0.33152267 -0.3354909   1.2133656 ]] [-0.4580722]
[[-0.7310699 -0.8750509  1.2344685]] [-0.4118374]
[[-0.9948084 -1.2466166  1.0051402]] [-0.37627134]


### 练习：编写自己的监视器包装器的代码

现在我们已经知道包装器如何工作以及可以用它做什么，是时候进行实验了。

这里的目标是创建一个包装器来监视训练进度，存储episode奖励（一个episode的奖励总和）和episode长度（最后一个episode的步数）。

我们将在每集结束后使用信息字典返回这些值。

In [13]:
class MyMonitorWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # Call the parent constructor, so we can access self.env later
        super().__init__(env)
        # === YOUR CODE HERE ===#
        # Initialize the variables that will be used
        # to store the episode length and episode reward

        # ====================== #

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        obs = self.env.reset(**kwargs)
        # === YOUR CODE HERE ===#
        # Reset the variables

        # ====================== #
        return obs

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict)
            observation, reward, is the episode over?, is the episode truncated?, additional information
        """
        obs, reward, terminated, truncated, info = self.env.step(action)
        # === YOUR CODE HERE ===#
        # Update the current episode reward and episode length

        # ====================== #

        if terminated or truncated:
            # === YOUR CODE HERE ===#
            # Store the episode length and episode reward in the info dict
            pass

            # ====================== #
        return obs, reward, terminated, truncated, info
    
env = gym.make("LunarLander-v2")
# === YOUR CODE HERE ===#
# Wrap the environment

# Reset the environment

# Take random actions in the environment and check
# that it returns the correct values after the end of each episode

# ====================== #

### 练习结果

In [14]:
import gymnasium as gym

class MyMonitorWrapper(gym.Wrapper):
    """
    Gym wrapper that monitors and stores episode reward and length.
    """
    def __init__(self, env):
        super().__init__(env)
        self.episode_reward = 0
        self.episode_length = 0

    def reset(self, **kwargs):
        self.episode_reward = 0
        self.episode_length = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.episode_reward += reward
        self.episode_length += 1

        if terminated or truncated:
            info['episode'] = {'reward': self.episode_reward, 'length': self.episode_length}

        return obs, reward, terminated, truncated, info

# 创建环境并包装
env = MyMonitorWrapper(gym.make("LunarLander-v2"))

# 重置环境以开始新的episode
obs = env.reset()

# 进行随机行动直到episode结束
done = False
while not done:
    action = env.action_space.sample()  # 选择一个随机动作
    obs, reward, done, truncated, info = env.step(action)  # 执行动作
    if done or truncated:
        print(f"Episode finished after {info['episode']['length']} steps with reward {info['episode']['reward']}")
        obs = env.reset()  # 重置环境


Episode finished after 96 steps with reward -125.72311089361091


这段代码首先定义了MyMonitorWrapper类，它继承自gym.Wrapper。这个包装器在每个episode结束时（无论是正常结束还是被截断），都会在info字典中返回累积的奖励(episode_reward)和步数(episode_length)。然后，我们创建了一个LunarLander-v2环境实例，并将其包装在MyMonitorWrapper中。通过随机选择动作并执行，我们演示了包装器如何在每个episode结束时通过打印来报告总奖励和步数。

## 结论

如何轻松保存和加载模型

什么是包装器以及我们可以用它做什么

如何创建自己的包装器