# Stable Baselines3 基础手册

Stable Baselines3（下文简称 sb3）是一个非常受欢迎的 RL 工具包，**用户只需要定义清楚环境和算法，sb3 就能十分优雅的完成训练和评估。**

这一篇会介绍 Stable Baselines3 的基础：
- 如何进行 RL 训练和测试？
- 如何可视化训练效果？
- 如何创建自定义环境？来适应新的任务？

**首先回顾一下，RL 中最核心的两个组件：智能体 Agent 和环境 Environment：**
- 智能体是 sb3 中提供的模型
- sb3 使用 Gym 作为交互环境，包括 Gym 中提供的、或者用户自定义的环境（需要继承 gym.Env)

使用安装有 sb3 的环境可以通过如下命令安装：
- `pip install stable-baselines3[extra]`

## 1. 如何进行 RL 训练和测试？

在 sb 中，如果我们使用现成 RL 算法和现成的 Gym 环境，我们通过一行就可以进行 RL 的训练：

`model = PPO('MlpPolicy', "CartPole-v1", verbose=1).learn(1000)`

下面我们详细了解一下里面发生了什么。

我们使用一个经典环境作为例子：CartPole

![Cartpole](./cartpole.gif)

智能体是一根杆和一个推车组成的钟摆，该推车沿着无摩擦的轨道移动，通过向推车施加 +1 或 -1 的力来控制系统。钟摆初始是直立的，目的是防止它倒下，杆保持直立的每个时间步都会获得 +1 的奖励。

我们使用 MLP 作为 Policy 网络，使用 PPO（AC算法）作为强化学习更新算法。

In [1]:
# import 环境
import gym
# import RL 算法
from stable_baselines3 import PPO
import numpy as np
from stable_baselines3.common.evaluation import evaluate_policy

In [2]:
# 指定使用的环境
env = gym.make('CartPole-v1')
# 指定使用的模型
# 第一个参数指定网络类型，可选MlpPolicy，CnnPolicy，MultiInputPolicy
# 如果想使用自定义的网络结构，可以在 policy_kwargs 参数中进行定义
model = PPO("MlpPolicy", env, verbose=0)
# 训练之前随机的 policy，可以获得的平均 reward 比较低
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")



mean_reward:147.60 +/- 60.15


In [3]:
# 训练 RL 模型
model.learn(total_timesteps=1000)

<stable_baselines3.ppo.ppo.PPO at 0x7fbb105b96d0>

In [4]:
# 评估训练后的 policy
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:191.00 +/- 58.00


**可以看出训练颇有效果，policy 取得的平均 Reward 有了很大的提升**

在 sb3 中仅仅用 `model.learn()` `evaluate_policy` 就完成了训练和测试。那这两个函数中发生了什么呢？

**`evaluate_policy` 中的细节**
```
def evaluate(model, num_episodes=100):
    # 创建环境
    env = model.get_env()
    all_episode_rewards = []
    for i in range(num_episodes):
        # 对于每一个回合
        episode_rewards = []
        done = False
        obs = env.reset()
        while not done:
            # 一直执行，直到env说这一局done了
            # 使用 policy 根据看到的观测预测行为
            action, _states = model.predict(obs)
            # 使用 env 执行行为，获得reward、新的观测、是否结束、其它信息等
            obs, reward, done, info = env.step(action)
            episode_rewards.append(reward)
        all_episode_rewards.append(sum(episode_rewards))
    # 计算获得的平均 Reward
    mean_episode_reward = np.mean(all_episode_rewards)
    print("Mean reward:", mean_episode_reward, "Num episodes:", num_episodes)

    return mean_episode_reward
```

而 `model.learn()` 在完成 `evaluate_policy` 的交互的基础上，增加了 loss 计算和梯度更新等常见的训练流程。

在sb3中，这些细节都无需用户再关心，用户定义好环境和算法就可以让sb3完成剩下的训练啦。

## 总结：一句话训练 RL 模型

In [5]:
model = PPO('MlpPolicy', "CartPole-v1", verbose=1).learn(1000)

Using cuda device
Creating environment from the given name 'CartPole-v1'
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 20.7     |
|    ep_rew_mean     | 20.7     |
| time/              |          |
|    fps             | 1175     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------


## 2. 如何可视化训练效果？

因为在我们的集群上，不方便弹窗把 env 界面实时渲染出来，所以我们通过使用工具包进行环境的视频录制，从而看到智能体的交互情况。

In [6]:
# Set up fake display; otherwise rendering will fail
import os
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'

In [7]:
import base64
from pathlib import Path
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
from IPython import display as ipythondisplay

# 下面定义一些帮助函数
def show_videos(video_path='', prefix=''):
  """
  Taken from https://github.com/eleurent/highway-env

  :param video_path: (str) Path to the folder containing videos
  :param prefix: (str) Filter the video, showing only the only starting with this prefix
  """
  html = []
  for mp4 in Path(video_path).glob("{}*.mp4".format(prefix)):
      video_b64 = base64.b64encode(mp4.read_bytes())
      html.append('''<video alt="{}" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>'''.format(mp4, video_b64.decode('ascii')))
  ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))


def record_video(env_id, model, video_length=500, prefix='', video_folder='videos/'):
  """
  :param env_id: (str)
  :param model: (RL model)
  :param video_length: (int)
  :param prefix: (str)
  :param video_folder: (str)
  """
  eval_env = DummyVecEnv([lambda: gym.make(env_id)])
  # Start the video at step=0 and record 500 steps
  eval_env = VecVideoRecorder(eval_env, video_folder=video_folder,
                              record_video_trigger=lambda step: step == 0, video_length=video_length,
                              name_prefix=prefix)

  obs = eval_env.reset()
  for _ in range(video_length):
    action, _ = model.predict(obs)
    obs, _, _, _ = eval_env.step(action)

  # Close the video recorder
  eval_env.close()

### 可视化训练好的智能体

In [8]:
# 录制视频
record_video('CartPole-v1', model, video_length=500, prefix='ppo2-cartpole')

(EE) 
Fatal server error:
(EE) Server is already active for display 1
	If this server is no longer running, remove /tmp/.X1-lock
	and start again.
(EE) 


Saving video to /weka-jd/prod/jupyter/bixiao/notebooks/Workspace/Codes/RL/rl-tutorial/videos/ppo2-cartpole-step-0-to-step-500.mp4


In [9]:
# 展示视频
show_videos('videos', prefix='ppo2')

## 3. 如何创建自定义环境？

下述为继承 gym.Env 的自定义环境例子：
```
import gym
from gym import spaces

class CustomEnv(gym.Env):
    """Custom Environment that follows gym interface"""
    def __init__(self, arg1, arg2, ...):
        super(CustomEnv, self).__init__()
        # 定义行为和观测空间，都需要继承于 gym.spaces
        # 下面是使用离散 Action 的例子:
        self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
        # 下面是使用图片作为观测的例子
        self.observation_space = spaces.Box(low=0, high=255,
                                            shape=(N_CHANNELS, HEIGHT, WIDTH), dtype=np.uint8)
    def step(self, action):
        # 输入 action，智能体执行 action 与环境交互，返回获得的（新的观测、奖励、是否结束、其他）
        ...
        return observation, reward, done, info
    def reset(self):
        # 重置环境
        ...
        return observation  # reward, done, info can't be included 
    def render(self, mode="human"):
        # 渲染环境
        ...
```

上面包括三个需要实现的函数：
- `reset()` 在每个回合最开始时执行，返回当前的观测（observation）
- `step(action)` 输入 action，智能体执行 action 与环境交互，返回获得的（新的观测、奖励、是否结束、其他）
- (Optional) `render(method='human')` 渲染环境

环境中需要定义两个变量：
- `observation_space` 需要是 gym spaces 类型 (`Discrete`, `Box`, ...) ，描述观测的形状
- `action_space` 需要是 gym spaces 类型，描述 action 的形状

gym spaces 最重要的两个类型：
- `gym.spaces.Box`: 任意 shape 的连续空间，例如 `Box(low=-1.0, high=2.0, shape=(3, 4), dtype=np.float32)` 代表 3x4 的一个 matrix
- `gym.spaces.Discrete`：维度为 1，且有 n 个枚举值的空间，如 n=5 的枚举空间 Discrete(5)，具体的枚举值为 0，1，2，3，4

有了上面的基础概念，下面来创建一个自定义环境吧

In [10]:
import numpy as np
import gym
from gym import spaces


class GoLeftEnv(gym.Env):
  """
  这是一个让智能体学习一直向左走的 1D grid 环境 
  """
  metadata = {'render.modes': ['console']}
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # 1D-grid 的大小
    self.grid_size = grid_size
    # agent 初始化在 grid 的最右边
    self.agent_pos = grid_size - 1

    # 定义 action  observation 
    # 离散行为空间: left、 right
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)
    # 观测是智能体现在的位置
    self.observation_space = spaces.Box(low=0, high=self.grid_size,
                                        shape=(1,), dtype=np.float32)

  def reset(self):
    """
    Important: 观测必须是一个 np.array
    :return: (np.array) 
    """
    # Initialize the agent at the right of the grid
    self.agent_pos = self.grid_size - 1
    # here we convert to float32 to make it more general (in case we want to use continuous actions)
    return np.array([self.agent_pos]).astype(np.float32)

  def step(self, action):
    if action == self.LEFT:
      self.agent_pos -= 1
    elif action == self.RIGHT:
      self.agent_pos += 1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))
    # 如果走到边缘就不能继续走了
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)
    # 如果走到最左边代表结束了
    done = bool(self.agent_pos == 0)
    # 走到最左边就给一个正的 reward
    reward = 1 if self.agent_pos == 0 else 0
    # 目前没有需要额外输出的信息
    info = {}
    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

  def render(self, mode='console'):
    # 在命令行中渲染
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size - self.agent_pos))

  def close(self):
    pass
    

In [11]:
from stable_baselines3 import PPO, A2C # DQN coming soon
from stable_baselines3.common.env_util import make_vec_env

# 构建环境
env = GoLeftEnv(grid_size=10)
env = make_vec_env(lambda: env, n_envs=1)


In [12]:
# 训练智能体
model = A2C('MlpPolicy', env, verbose=1).learn(5000)

Using cuda device
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 13.1     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 626      |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -0.25    |
|    explained_variance | -0.127   |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -0.00427 |
|    value_loss         | 0.000244 |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 11.5     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 628      |
|    iterations         | 200      |
|    time_elapsed       | 1        |
|    total_timesteps    | 1000     |
| train/            

In [13]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]


可以看到智能体已经会一直走，走到左边啦！