# 简单执行

构建一个执行各类强化学习算法的流水线。

目前执行三种算法：DQN（value-based近似q函数），DDPG（actor-critic），PPO（用的比较多）。

## 环境的描述

考虑一个简单的例子（Pendulum：单摆）

动作空间：尾部受力，一维连续空间
观测空间：尾部位置二维坐标，角速度，三维连续空间
奖励：r = -(theta^2 + 0.1 * theta_dt^2 + 0.001 * torque^2)

一个episode执行到200步发生truncation

In [25]:
from typing import Any, SupportsFloat
import gymnasium as gym
import numpy as np

class PendulumEnv(gym.Wrapper):
    def __init__(self, gym_id=None):
        gym.logger.set_level(40)
        if gym_id is None:
            gym_id = "Pendulum-v1"
        super().__init__(env=gym.make(gym_id))
        self.id = gym_id
        self.state_dim = self.observation_space.shape[0]
        self.action_dim = self.action_space.shape[0]
        self.if_discrete = False # 是否是离散动作空间
    def reset(self) -> "tuple[Any, dict[str, Any]]":
        return self.env.reset()
    def step(self, action: Any) -> "tuple[Any, SupportsFloat, bool, bool, dict[str, Any]]":
        # OpenAI Pendulum env set its action space as (-2, +2). It is bad.
        # We suggest that adjust action space to (-1, +1) when designing a custom env.
        state, reward, terminated, truncated, info = self.env.step(action * 2)
        state = state.reshape(self.state_dim)
        return state, float(reward), terminated, truncated, info

### 测试Env类

In [26]:
def check_pendulum_env():
    env = PendulumEnv()
    assert isinstance(env.id, str)
    assert isinstance(env.state_dim, int)
    assert isinstance(env.action_dim, int)
    assert isinstance(env.if_discrete, bool)

    state, _ = env.reset()
    assert state.shape == (env.state_dim,)

    action = np.random.uniform(-1, +1, size=env.action_dim)
    state, reward, terminated, truncated, info = env.step(action)
    assert isinstance(state, np.ndarray)
    assert state.shape == (env.state_dim,)
    assert isinstance(state, np.ndarray)
    assert isinstance(reward, float)
    assert isinstance(terminated, bool)
    assert isinstance(info, dict) or (info is None)


if __name__ == '__main__':
    check_pendulum_env()
    print('| Finish checking.')

| Finish checking.


## 配置解析

环境、智能体、智能体交互过程中探索环境、训练的超参数，和设备有关的存储目录、计算设备等超参

In [27]:
import os
import gymnasium as gym
import torch
import numpy as np

class Config:
    def __init__(self, agent_class = None, env_class = None, env_args = None) -> None:
        self.agent_class = agent_class # agent = agent_class(...)
        self.if_off_policy = self.get_if_off_policy() # 是否是off-policy

        self.env_class = env_class 
        self.env_args = env_args
        if self.env_args is None:
            env_args = {'id': None, 'state_dim': None, 'action_dim': None, 'if_discrete': None}
        self.id = env_args['id'] # 环境名称
        self.state_dim = env_args['state_dim'] # 观测空间维度
        self.action_dim = env_args['action_dim'] # 动作空间维度
        self.if_discrete = env_args['if_discrete'] # 动作空间是否是离散空间

        self.gamma = 0.99 # 折扣因子
        self.reward_scale = 1.0 # 控制奖励大小

        self.net_dims = (64,32) # 采用神经网络的维度：输入x输出
        self.learning_rate = 6e-5 
        self.soft_update_tau = 5e-3 
        if self.if_off_policy: 
            self.batch_size = int(64) # mini-batch的大小
            self.horizen_len = int(512) # 在每一个Episode中，智能体与环境互动的步数
            self.buffer_size = int(1e6) # FIFO，replaybuffer的大小
            self.repeat_times = 1.0 # 重复更新replaybuffer的次数
        else:
            self.batch_size = int(128)
            self.horizen_len = int(2000) 
            self.buffer_size = None
            self.repeat_times = 8.0

        self.gpu_id = 0
        self.thread_num = int(8) # 使用cpu数量
        self.random_seed = int(0) # 初始化随机种子

        self.cwd = None # 保存模型的当前工作目录
        self.is_remove = True # 是否移除cwd目录
        self.break_step = +np.inf # 如果神经网络的总训练次数超过该值则停止训练

        self.eval_times = int(32)
        self.eval_per_step = int(2e4)

    def init_before_training(self):
        np.random.seed(self.random_seed)
        torch.manual_seed(self.random_seed)
        torch.set_num_threads(self.thread_num)
        torch.set_default_dtype(torch.float32)

        if self.cwd is None:
            self.cwd = f'./{self.id}_{self.agent_class.__name__[5:]}_{self.random_seed}'

        if self.is_remove is None:
            self.if_remove = bool(input(f"| Arguments PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')
        if self.if_remove:
            import shutil
            shutil.rmtree(self.cwd, ignore_errors=True)
            print(f"| Arguments Remove cwd: {self.cwd}")
        else:
            print(f"| Arguments Keep cwd: {self.cwd}")
        
        os.makedirs(self.cwd, exist_ok=True)

    def get_if_off_policy(self) -> bool:
        agent_name = self.agent_class.__name__ if self.agent_class else ''
        on_policy_names = ('SARSA', 'VPG', 'A2C', 'A3C', 'TRPO', 'PPO', 'MPO')
        return all([agent_name.find(s) == -1 for s in on_policy_names])
    
def get_gym_env_args(env, if_print: bool) -> dict:
    """Get a dict ``env_args`` about a standard OpenAI gym env information.

    param env: a standard OpenAI gym env
    param if_print: [bool] print the dict about env information.
    return: env_args [dict]

    env_args = {
        'id': id,       # [str] the environment name, such as XxxXxx-v0
        'state_dim': state_dim,     # [int] the dimension of state
        'action_dim': action_dim,   # [int] the dimension of action or the number of discrete action
        'if_discrete': if_discrete, # [bool] action space is discrete or continuous
    }
    """
    if {'unwrapped', 'observation_space', 'action_space', 'spec'}.issubset(dir(env)):  # isinstance(env, gym.Env):
        id = env.unwrapped.spec.id

        state_shape = env.observation_space.shape
        state_dim = state_shape[0] if len(state_shape) == 1 else state_shape  # sometimes state_dim is a list

        if_discrete = isinstance(env.action_space, gym.spaces.Discrete)
        if if_discrete:  # make sure it is discrete action space
            action_dim = env.action_space.n
        elif isinstance(env.action_space, gym.spaces.Box):  # make sure it is continuous action space
            action_dim = env.action_space.shape[0]
            if any(env.action_space.high - 1):
                print('WARNING: env.action_space.high', env.action_space.high)
            if any(env.action_space.low + 1):
                print('WARNING: env.action_space.low', env.action_space.low)
        else:
            raise RuntimeError('\n| Error in get_gym_env_info(). Please set these value manually:'
                               '\n  `state_dim=int; action_dim=int; if_discrete=bool;`'
                               '\n  And keep action_space in range (-1, 1).')
    else:
        id = env.id
        state_dim = env.state_dim
        action_dim = env.action_dim
        if_discrete = env.if_discrete

    env_args = {'id': id,
                'state_dim': state_dim,
                'action_dim': action_dim,
                'if_discrete': if_discrete, }
    if if_print:
        env_args_str = repr(env_args).replace(',', f",\n{'':11}")
        print(f"env_args = {env_args_str}")
    return env_args

def kwargs_filter(function, kwargs: dict) -> dict:
    import inspect
    sign = inspect.signature(function).parameters.values()
    sign = {val.name for val in sign}
    common_args = sign.intersection(kwargs.keys())
    return {key: kwargs[key] for key in common_args}  # filtered kwargs


def build_env(env_class=None, env_args=None):
    if env_class.__module__ == 'gym.make':
        env = env_class(**kwargs_filter(env_class, env_args.copy()))
    if env_class.__module__ == 'gym.envs.registration':  # special rule
        gym.logger.set_level(40)  # Block warning
        env = env_class(id=env_args['id'])
    else:
        env = env_class(**kwargs_filter(env_class, env_args.copy()))
    for attr_str in ('id', 'state_dim', 'action_dim', 'if_discrete'):
        setattr(env, attr_str, env_args[attr_str])
    return env

if __name__ == '__main__':
    pass


## Net

Agent依赖Net作为状态价值估计或者策略估计

DQN：1，DDPG：actor和critic两个，PPO：actor和critic两个

In [17]:
import torch
import torch.nn as nn
from torch import Tensor
from torch.distributions.normal import Normal

from typing import List

def build_mlp(dims: List[int]) -> nn.Sequential:
    net_list = []
    for i in range(len(dims) - 1):
        net_list.extend([nn.Linear(dims[i], dims[i+1]), nn.ReLU()])
    del net_list[-1] # 移去最后一层的激活层
    return nn.Sequential(*net_list)

class QNet(nn.Module):
    def __init__(self, dims: List[int], state_dim: int, action_dim: int):
        super(QNet, self).__init__()
        self.net = build_mlp(dims=[state_dim, *dims, action_dim])
        self.explore_rate = None
        self.action_dim = action_dim

    def forward(self, state: Tensor) -> Tensor:
        return self.net(state)
    
    def get_action(self, state: Tensor) -> Tensor: 
        if self.explore_rate < torch.rand(1):
            action = self.net(state).argmax(dim=1, keepdim=True)
        else:
            action = torch.randint(self.action_dim, size=(state.shape[0],1))
        return action
    
class Actor(nn.Module):
    def __init__(self, dims: List[int], state_dim: int, action_dim: int):
        super(Actor, self).__init__()
        self.net = build_mlp(dims=[state_dim, *dims, action_dim])
        self.explore_noise_std = None

    def forward(self, state: Tensor) -> Tensor:
        action = self.net(state)
        return action.tanh()

    def get_action(self, state: Tensor) -> Tensor:
        action_avg = self.net(state).tanh() # 双曲正切，奇函数，值域(-1,1)
        dist = Normal(action_avg, self.explore_noise_std) # 对torch中每个值生成正态分布
        action = dist.sample()
        return action.clamp(-1.0, 1.0) # 截断函数
    
class Critic(nn.Module):
    def __init__(self, dims: List[int], state_dim: int, action_dim: int):
        super(Critic, self).__init__()
        self.net = build_mlp(dims=[state_dim + action_dim, *dims, 1])

    def forward(self, state: Tensor, action: Tensor):
        return self.net(torch.cat((state, action), dim=1)) # Q值
    
class ActorPPO(nn.Module):
    def __init__(self, dims: List[int], state_dim: int, action_dim: int):
        super(ActorPPO, self).__init__()
        self.net = build_mlp(dims=[state_dim, *dims, action_dim])
        self.action_std_log = nn.Parameter(torch.zeros((1, action_dim)), requires_grad=True)  # trainable parameter

    def forward(self, state: Tensor) -> Tensor:
        return self.net(state).tanh()
    
    def get_action(self, state: Tensor) -> (Tensor, Tensor):
        action_avg = self.net(state)
        action_std = self.action_std_log.exp()

        dist = Normal(action_avg, action_std)
        action = dist.sample()
        logprob = dist.log_prob(action).sum(1)
        return action, logprob
    
    def get_logprob_entropy(self, state: Tensor, action: Tensor) -> (Tensor, Tensor):
        action_avg = self.net(state)
        action_std = self.action_std_log.exp()

        dist = Normal(action_avg, action_std)
        logprob = dist.log_prob(action).sum(1)
        entropy = dist.entropy().sum(1)
        return logprob, entropy
    
    @staticmethod
    def convert_action_for_env(action: Tensor) -> Tensor:
        return action.tanh()
    
class CriticPPO(nn.Module):
    def __init__(self, dims: List[int], state_dim: int, _action_dim: int):
        super(CriticPPO, self).__init__()
        self.net = build_mlp(dims=[state_dim, *dims, 1])

    def forward(self, state: Tensor) -> Tensor:
        return self.net(state)  # advantage value

### 测试Net类

In [29]:
import torch.nn

def check_q_net(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device)

    '''check for agent.AgentDQN'''
    act = QNet(dims=net_dims, state_dim=state_dim, action_dim=action_dim).to(device)
    act.explore_rate = 0.1

    '''check for run.get_rewards_and_steps'''
    action = act(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.float}
    assert action.shape == (batch_size, action_dim)

    '''check for agent.AgentDQN.explore_env'''
    action = act.get_action(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.int, torch.long}
    assert action.shape == (batch_size, 1)


def check_actor(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device)

    '''check'''
    act = Actor(dims=net_dims, state_dim=state_dim, action_dim=action_dim).to(device)
    act.explore_noise_std = 0.1  # standard deviation of exploration action noise

    action = act(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.float}
    assert action.shape == (batch_size, action_dim)
    assert torch.any((-1.0 <= action) & (action <= +1.0))

    action = act.get_action(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.float}
    assert action.shape == (batch_size, action_dim)
    assert torch.any((-1.0 <= action) & (action <= +1.0))


def check_critic(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device)
    action = torch.rand(size=(batch_size, action_dim), dtype=torch.float32, device=device)

    '''check'''
    cri = Critic(dims=net_dims, state_dim=state_dim, action_dim=action_dim).to(device)

    q = cri(state=state, action=action)
    assert isinstance(q, Tensor)
    assert q.dtype in {torch.float}
    assert q.shape == (batch_size, 1)


def check_actor_ppo(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device)

    '''check'''
    act = ActorPPO(dims=net_dims, state_dim=state_dim, action_dim=action_dim).to(device)
    assert isinstance(act.action_std_log, nn.Parameter)
    assert act.action_std_log.requires_grad

    action = act(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.float}
    assert action.shape == (batch_size, action_dim)
    action = act.convert_action_for_env(action)
    assert torch.any((-1.0 <= action) & (action <= +1.0))

    action, logprob = act.get_action(state=state)
    assert isinstance(action, Tensor)
    assert action.dtype in {torch.float}
    assert action.shape == (batch_size, action_dim)
    assert torch.any((-1.0 <= action) & (action <= +1.0))
    assert isinstance(logprob, Tensor)
    assert logprob.shape == (batch_size,)

    action = torch.rand(size=(batch_size, action_dim), dtype=torch.float32, device=device)
    logprob, entropy = act.get_logprob_entropy(state=state, action=action)
    assert isinstance(logprob, Tensor)
    assert logprob.shape == (batch_size,)
    assert isinstance(entropy, Tensor)
    assert entropy.shape == (batch_size,)


def check_critic_ppo(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device)

    '''check'''
    cri = CriticPPO(dims=net_dims, state_dim=state_dim, _action_dim=action_dim).to(device)

    q = cri(state=state)
    assert isinstance(q, Tensor)
    assert q.dtype in {torch.float}
    assert q.shape == (batch_size, 1)


def check_build_mlp():
    net_dims = (64, 32)
    net = build_mlp(dims=net_dims)
    assert isinstance(net, nn.Sequential)
    assert len(net) == 1 == len(net_dims) * 2 - 3

    net_dims = (64, 32, 16)
    net = build_mlp(dims=net_dims)
    assert isinstance(net, nn.Sequential)
    assert len(net) == 3 == len(net_dims) * 2 - 3

    net_dims = (64, 32, 16, 8)
    net = build_mlp(dims=net_dims)
    assert isinstance(net, nn.Sequential)
    assert len(net) == 5 == len(net_dims) * 2 - 3


if __name__ == '__main__':
    check_q_net()
    check_actor()
    check_critic()
    check_actor_ppo()
    check_critic_ppo()
    check_build_mlp()
    print('| Finish checking.')

| Finish checking.


## Agent

三个Agent，DQN，DDPG，PPO

In [30]:
from copy import deepcopy
import torch
from torch import Tensor

class AgentBase:
    def __init__(self, net_dims: List[int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()):
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.gamma = args.gamma
        self.batch_size = args.batch_size
        self.repeat_times = args.repeat_times
        self.reward_scale = args.reward_scale
        self.learning_rate = args.learning_rate
        self.if_off_policy = args.if_off_policy
        self.soft_update_tau = args.soft_update_tau

        self.last_state = None
        self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
        
        act_class = getattr(self, "act_class", None)
        cri_class = getattr(self, "cri_class", None)

        self.act = self.act_target = act_class(net_dims, state_dim, action_dim).to(self.device)
        self.cri = self.cri_target = cri_class(net_dims, state_dim, action_dim).to(self.device) if cri_class else self.act

        self.act_optimizer = torch.optim.Adam(self.act.parameters(), self.learning_rate)
        self.cri_optimizer = torch.optim.Adam(self.cri.parameters(), self.learning_rate) if cri_class else self.act_optimizer

        self.criterion = torch.nn.SmoothL1Loss()

    @staticmethod
    def optimizer_update(optimizer, objective: Tensor):
        optimizer.zero_grad()
        objective.backward()
        optimizer.step()

    @staticmethod
    def soft_update(target_net: torch.nn.Module, current_net: torch.nn.Module, tau: float):
        # 假定目标网络不是当前训练的网络
        for tar, cur in zip(target_net.parameters(), current_net.parameters()):
            tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))

class AgentDQN(AgentBase):
    def __init__(self, net_dims: List[int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()):
        self.act_class = getattr(self, "act_class", QNet)
        self.cri_class = getattr(self, "cri_class", None) # DQN只有一个网络
        super().__init__(net_dims, state_dim, action_dim, gpu_id, args)
        self.act_target = deepcopy(self.act)
        self.cri_target = deepcopy(self.cri)

        self.act.explore_rate = getattr(args, "explore_rate", 0.25) 
        # epsilon贪婪算法的探索概率

    def explore_env(self, env, horizon_len: int, if_random: bool = False) -> [Tensor]:
        states = torch.zeros((horizon_len, self.state_dim), dtype=torch.float32).to(self.device)
        actions = torch.zeros((horizon_len, self.action_dim), dtype=torch.float32).to(self.device)
        rewards = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        terminateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        truncateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        
        ary_state = self.last_state

        get_action = self.act.get_action
        for i in range(horizon_len):
            state = torch.as_tensor(ary_state, dtype=torch.float32, device=self.device)
            if if_random:
                action = torch.randint(self.action_dim, size=(1,))[0]
            else:
                action = get_action(state.unsqueeze(0))[0,0]

            ary_action = action.detach().cpu().numpy()
            ary_state, reward, terminated, truncated, _ = env.step(ary_action)
            if terminated or truncated:
                ary_state, _ = env.reset()
            
            states[i] = state
            actions[i] = action
            rewards[i] = reward
            terminateds[i] = terminated
            truncateds[i] = truncated

        self.last_state = ary_state
        rewards = (rewards * self.reward_scale).unsqueeze(1)
        undones = (1.0 - terminateds.type(torch.float32)) * (1.0 - truncateds.type(torch.float32)) 
        undones = undones.unsqueeze(1)
        return states, actions, rewards, undones
    
    def update_net(self, buffer) -> [float]:
        obj_critics = 0.0
        q_values = 0.0

        update_times = int(buffer.cur_size * self.repeat_times / self.batch_size)
        assert update_times >= 1
        for i in range(update_times):
            obj_critic, q_value = self.get_obj_critic(buffer, self.batch_size)
            self.optimizer_update(self.cri_optimizer, obj_critic)
            self.soft_update(self.cri_target, self.cri, self.soft_update_tau)

            obj_critics += obj_critic.item()
            q_values += q_value.item()

        return obj_critics / update_times, q_values / update_times
    
    def get_obj_critic(self, buffer, batch_size: int) -> (Tensor, Tensor):
        with torch.no_grad():
            state, action, reward, undone, next_state = buffer.sample(self.batch_size)
            next_q = self.cri_target(next_state).max(dim=1, keepdim=True)[0]
            q_label = reward + undone * self.gamma * next_q

        q_value = self.cri(state).gather(1, action.long())
        obj_critic = self.criterion(q_value, q_label)
        return obj_critic, q_value.mean()
    
class AgentDDPG(AgentBase):
    def __init__(self, net_dims: List[int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()):
        self.act_class = getattr(self, 'act_class', Actor)
        self.cri_class = getattr(self, 'cri_class', Critic)
        super().__init__(net_dims, state_dim, action_dim, gpu_id, args)
        self.act_target = deepcopy(self.act)
        self.cri_target = deepcopy(self.cri)

        self.act.explore_noise_std = getattr(args, 'explore_noise', 0.1)

    def explore_env(self, env, horizon_len: int, if_random: bool = False) -> [Tensor]:
        states = torch.zeros((horizon_len, self.state_dim), dtype=torch.float32).to(self.device)
        actions = torch.zeros((horizon_len, self.action_dim), dtype=torch.float32).to(self.device)
        rewards = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        terminateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        truncateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)

        ary_state = self.last_state
        get_action = self.act.get_action
        for i in range(horizon_len):
            state = torch.as_tensor(ary_state, dtype=torch.float32, device=self.device)
            action = torch.rand(self.action_dim) * 2 - 1.0 if if_random else get_action(state.unsqueeze(0)).squeeze(0)

            ary_action = action.detach().cpu().numpy()
            ary_state, reward, terminated, truncated, _ = env.step(ary_action)

            if terminated or truncated:
                ary_state, _ = env.reset()

            states[i] = state
            actions[i] = action
            rewards[i] = reward
            terminateds[i] = terminated
            truncateds[i] = truncated

        self.last_state = ary_state
        rewards = rewards.unsqueeze(1)
        undones = (1.0 - terminateds.type(torch.float32)) * (1.0 - truncateds.type(torch.float32)) 
        undones = undones.unsqueeze(1)
        return states, actions, rewards, undones
    
    def update_net(self, buffer) -> [float]:
        obj_critics = obj_actors = 0.0
        update_times = int(buffer.cur_size * self.repeat_times / self.batch_size)
        assert update_times > 0
        for i in range(update_times):
            obj_critic, state = self.get_obj_critic(buffer, self.batch_size)
            self.optimizer_update(self.cri_optimizer, obj_critic)
            self.soft_update(self.cri_target, self.cri, self.soft_update_tau)
            obj_critics += obj_critic.item()

            action = self.act(state)
            obj_actor = self.cri_target(state, action).mean()
            self.optimizer_update(self.act_optimizer, -obj_actor)
            self.soft_update(self.act_target, self.act, self.soft_update_tau)
            obj_actors += obj_actor.item()

        return obj_critics / update_times, obj_actors / update_times
    
    def get_obj_critic(self, buffer, batch_size: int) -> (Tensor, Tensor):
        with torch.no_grad():
            states, actions, rewards, undones, next_states = buffer.sample(batch_size)
            next_actions = self.act_target(next_states)
            next_q_values = self.cri_target(next_states, next_actions)
            q_labels = rewards + undones * self.gamma * next_q_values

        q_values = self.cri(states, actions)
        obj_critic = self.criterion(q_values, q_labels)
        return obj_critic, states
    
class AgentPPO(AgentBase):
    def __init__(self, net_dims: List[int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()):
        self.if_off_policy = False
        self.act_class = getattr(self, "act_class", ActorPPO)
        self.cri_class = getattr(self, "cri_class", CriticPPO)
        super().__init__(net_dims, state_dim, action_dim, gpu_id, args)

        self.ratio_clip = getattr(args, "ratio_clip", 0.25) # ratio.clamp(1 - clip, 1 + clip)
        self.lambda_gae_adv = getattr(args, "lambda_gae_adv", 0.95) # 0.80~0.99
        self.lambda_entropy = getattr(args, "lambda_entropy", 0.01) # 0.00~0.10
        self.lambda_entropy = torch.tensor(self.lambda_entropy, dtype=torch.float32, device=self.device)

    def explore_env(self, env, horizon_len: int) -> [Tensor]:
        states = torch.zeros((horizon_len, self.state_dim), dtype=torch.float32).to(self.device)
        actions = torch.zeros((horizon_len, self.action_dim), dtype=torch.float32).to(self.device)
        logprobs = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        rewards = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        terminateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)
        truncateds = torch.zeros(horizon_len, dtype=torch.float32).to(self.device)

        ary_state = self.last_state

        get_action = self.act.get_action
        convert = self.act.convert_action_for_env
        for i in range(horizon_len):
            state = torch.as_tensor(ary_state, dtype=torch.float32, device=self.device)
            action, logprob = [t.squeeze(0) for t in get_action(state.unsqueeze(0))[:2]]

            ary_action = convert(action).detach().cpu().numpy()
            ary_state, reward, terminated, truncated, _ = env.step(ary_action)
            if terminated or truncated:
                ary_state, _ = env.reset()

            states[i] = state
            actions[i] = action
            logprobs[i] = logprob
            rewards[i] = reward
            terminateds[i] = terminated
            truncateds[i] = truncated

        self.last_state = ary_state
        rewards = (rewards * self.reward_scale).unsqueeze(1)
        undones = (1.0 - terminateds.type(torch.float32)) * (1.0 - truncateds.type(torch.float32)) 
        undones = undones.unsqueeze(1)
        return states, actions, logprobs, rewards, undones
    
    def update_net(self, buffer) -> [float]:
        with torch.no_grad():
            states, actions, logprobs, rewards, undones = buffer
            buffer_size = states.shape[0]

            # 计算reward_sum
            bs = 2 ** 10 # 当显存较小的时候用小的batchsize
            values = [self.cri(states[i:i + bs]) for i in range(0, buffer_size, bs)]
            values = torch.cat(values, dim=0).squeeze(1) # values.shape == (buffer_size, ..)
            advantages = self.get_advantages(rewards, undones, values)
            reward_sums = advantages + values 
            del rewards, values, undones

            advantages = (advantages - advantages.mean()) / (advantages.std(dim=0) + 1e-5)
        assert logprobs.shape == advantages.shape == reward_sums.shape == (buffer_size,)

        obj_critics = 0.0
        obj_actors = 0.0

        update_times = int(buffer_size * self.repeat_times / self.batch_size)
        assert update_times >= 1
        for _ in range(update_times):
            indices = torch.randint(buffer_size, size=(self.batch_size,), requires_grad=False)
            state = states[indices]
            action = actions[indices]
            logprob = logprobs[indices]
            advantage = advantages[indices]
            reward_sum = reward_sums[indices]

            value = self.cri(state).squeeze(1)
            obj_critic = self.criterion(value, reward_sum)
            self.optimizer_update(self.cri_optimizer, obj_critic)

            new_logprob, obj_entropy = self.act.get_logprob_entropy(state, action)
            ratio = (new_logprob - logprob.detach()).exp()
            surrogate1 = advantage * ratio
            surrogate2 = advantage * ratio.clamp(1 - self.ratio_clip, 1 + self.ratio_clip)
            obj_surrogate = torch.min(surrogate1, surrogate2).mean()

            obj_actor = obj_surrogate + obj_entropy.mean() * self.lambda_entropy
            self.optimizer_update(self.act_optimizer, -obj_actor)

            obj_actors += obj_actor.item()
            obj_critics += obj_critic.item()

        a_std_log = getattr(self.act, 'a_std_log', torch.zeros(1)).mean()
        return obj_critics / update_times, obj_actors / update_times, a_std_log.item()
    
    def get_advantages(self, rewards: Tensor, undones: Tensor, values: Tensor) -> Tensor:
        advantages = torch.empty_like(values)
        
        masks = undones * self.gamma
        horizon_len = rewards.shape[0]

        next_state = torch.tensor(self.last_state, dtype=torch.float32).to(self.device)
        next_value = self.cri(next_state.unsqueeze(0)).detach().squeeze(1).squeeze(0)

        advantage = 0
        for t in range(horizon_len - 1, -1, -1):
            delta = rewards[t] + masks[t] * next_value - values[t]
            advantages[t] = advantage = delta + masks[t] * self.lambda_gae_adv * advantage
            next_value = values[t] 

        return advantages
    
class ReplayBuffer:
    def __init__(self, max_size: int, state_dim: int, action_dim: int, gpu_id: int = 0) -> None:
        self.p = 0
        self.if_full = False
        self.cur_size = 0
        self.max_size = max_size
        self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")

        self.states = torch.empty((max_size, state_dim), dtype=torch.float32, device=self.device)
        self.actions = torch.empty((max_size, action_dim), dtype=torch.float32, device=self.device)
        self.rewards = torch.empty((max_size, 1), dtype=torch.float32, device=self.device)
        self.undones = torch.empty((max_size, 1), dtype=torch.float32, device=self.device)

    def update(self, items: [Tensor]):
        states, actions, rewards, undones = items
        p = self.p + rewards.shape[0] # 指针移动
        if p > self.max_size:
            self.if_full = True
            p0 = self.p
            p1 = self.max_size
            p2 = self.max_size - self.p
            p = p - self.max_size

            self.states[p0:p1], self.states[0:p] = states[:p2], states[-p:]
            self.actions[p0:p1], self.actions[0:p] = actions[:p2], actions[-p:]
            self.rewards[p0:p1], self.rewards[0:p] = rewards[:p2], rewards[-p:]
            self.undones[p0:p1], self.undones[0:p] = undones[:p2], undones[-p:]
        else:
            self.states[self.p:p] = states
            self.actions[self.p:p] = actions
            self.rewards[self.p:p] = rewards
            self.undones[self.p:p] = undones
        self.p = p
        self.cur_size = self.max_size if self.if_full else self.p

    def sample(self, batch_size: int) -> [Tensor]:
        ids = torch.randint(self.cur_size - 1, size=(batch_size,), requires_grad=False)
        return self.states[ids], self.actions[ids], self.rewards[ids], self.undones[ids], self.states[ids + 1]  

### 测试Agent

In [31]:

def check_agent_base(state_dim=4, action_dim=2, batch_size=3, net_dims=(64, 32), gpu_id=0):
    device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")
    state = torch.rand(size=(batch_size, state_dim), dtype=torch.float32, device=device).detach()
    action = torch.rand(size=(batch_size, action_dim), dtype=torch.float32, device=device).detach()

    '''check AgentBase'''
    agent = AgentDDPG(net_dims, state_dim, action_dim, gpu_id=gpu_id, args=Config())
    AgentBase.__init__(agent, net_dims, state_dim, action_dim, gpu_id=gpu_id, args=Config())

    '''check for run.render_agent'''
    action_grad = agent.act(state)
    q_value = agent.cri(state, action_grad)
    obj_act = -q_value.mean()
    assert agent.optimizer_update(agent.act_optimizer, obj_act) is None
    q_value = agent.cri(state, action)
    obj_cri = agent.criterion(q_value, torch.zeros_like(q_value).detach()).mean()
    assert agent.optimizer_update(agent.cri_optimizer, obj_cri) is None

    current_net = agent.cri
    target_net = deepcopy(agent.cri)
    assert agent.soft_update(target_net=target_net, current_net=current_net, tau=3e-5) is None


def check_agent_dqn(batch_size=3, horizon_len=16, net_dims=[64, 32], gpu_id=0):
    # env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env = build_env(env_class=gym.make, env_args=env_args)
    state_dim = env_args['state_dim']
    action_dim = env_args['action_dim']

    '''init agent'''
    buffer = ReplayBuffer(gpu_id=gpu_id, max_size=int(1e4), state_dim=state_dim, action_dim=action_dim)
    args = Config()
    args.batch_size = batch_size
    agent = AgentDQN(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args)
    agent.last_state, _ = env.reset()

    '''check for agent.explore_env'''
    buffer_items = agent.explore_env(env=env, horizon_len=horizon_len, if_random=True)
    buffer.update(buffer_items)
    states, actions, rewards, undones = buffer_items
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}
    assert actions.shape == (horizon_len, 2)
    assert actions.dtype in {torch.int, torch.long, torch.float32}
    assert rewards.shape == (horizon_len, 1)
    assert rewards.dtype == torch.float
    assert undones.shape == (horizon_len, 1)
    assert undones.dtype == torch.float  # undones is float, instead of int
    assert set(undones.squeeze(1).cpu().data.tolist()).issubset({0.0, 1.0})  # undones in {0.0, 1.0}

    buffer_items = agent.explore_env(env=env, horizon_len=horizon_len, if_random=False)
    buffer.update(buffer_items)
    states, actions, rewards, undones = buffer_items
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}
    assert actions.shape == (horizon_len, 2)
    assert actions.dtype in {torch.int, torch.long, torch.float32}
    assert rewards.shape == (horizon_len, 1)
    assert rewards.dtype == torch.float
    assert undones.shape == (horizon_len, 1)
    assert undones.dtype == torch.float  # undones is float, instead of int
    assert set(undones.squeeze(1).cpu().data.tolist()).issubset({0.0, 1.0})  # undones in {0.0, 1.0}

    '''check for agent.update_net'''
    buffer.update(buffer_items)
    obj_critic, state = agent.get_obj_critic(buffer=buffer, batch_size=batch_size)
    assert obj_critic.shape == ()
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}

    logging_tuple = agent.update_net(buffer=buffer)
    assert isinstance(logging_tuple, tuple)
    assert any([isinstance(item, float) for item in logging_tuple])
    assert len(logging_tuple) >= 2


def check_agent_ddpg(batch_size=3, horizon_len=16, net_dims=(64, 32), gpu_id=0):
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env = build_env(env_class=gym.make, env_args=env_args)
    state_dim = env_args['state_dim']
    action_dim = env_args['action_dim']

    '''init agent'''
    buffer = ReplayBuffer(gpu_id=gpu_id, max_size=int(1e4), state_dim=state_dim, action_dim=action_dim, )
    args = Config()
    args.batch_size = batch_size
    agent = AgentDDPG(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args)
    agent.last_state, _ = env.reset()

    '''check for agent.explore_env'''
    buffer_items = agent.explore_env(env=env, horizon_len=horizon_len, if_random=True)
    states, actions, rewards, undones = buffer_items
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}
    assert actions.shape == (horizon_len, action_dim)
    assert actions.dtype == torch.float
    assert rewards.shape == (horizon_len, 1)
    assert rewards.dtype == torch.float
    assert undones.shape == (horizon_len, 1)
    assert undones.dtype == torch.float  # undones is float, instead of int
    assert set(undones.squeeze(1).cpu().data.tolist()).issubset({0.0, 1.0})  # undones in {0.0, 1.0}

    buffer_items = agent.explore_env(env=env, horizon_len=horizon_len, if_random=False)
    states, actions, rewards, undones = buffer_items
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}
    assert actions.shape == (horizon_len, action_dim)
    assert actions.dtype == torch.float
    assert rewards.shape == (horizon_len, 1)
    assert rewards.dtype == torch.float
    assert undones.shape == (horizon_len, 1)
    assert undones.dtype == torch.float  # undones is float, instead of int
    assert set(undones.squeeze(1).cpu().data.tolist()).issubset({0.0, 1.0})  # undones in {0.0, 1.0}

    '''check for agent.update_net'''
    buffer.update(buffer_items)
    obj_critic, state = agent.get_obj_critic(buffer=buffer, batch_size=batch_size)
    assert obj_critic.shape == ()
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}

    logging_tuple = agent.update_net(buffer=buffer)
    assert isinstance(logging_tuple, tuple)
    assert any([isinstance(item, float) for item in logging_tuple])
    assert len(logging_tuple) >= 2


def check_agent_ppo(batch_size=3, horizon_len=16, net_dims=(64, 32), gpu_id=0):
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 3, 'if_discrete': False}
    env = build_env(env_class=gym.make, env_args=env_args)
    state_dim = env_args['state_dim']
    action_dim = env_args['action_dim']

    '''init agent'''
    args = Config()
    args.batch_size = batch_size
    agent = AgentPPO(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args)
    agent.last_state, _ = env.reset()

    convert = agent.act.convert_action_for_env
    action = torch.rand(size=(batch_size, action_dim), dtype=torch.float32).detach() * 6 - 3
    assert torch.any((action < -1.0) | (+1.0 < action))
    action = convert(action)
    assert torch.any((-1.0 <= action) & (action <= +1.0))

    '''check for agent.explore_env'''
    buffer_items = agent.explore_env(env=env, horizon_len=horizon_len)
    states, actions, logprobs, rewards, undones = buffer_items
    assert states.shape == (horizon_len, state_dim)
    assert states.dtype in {torch.float, torch.int}
    assert actions.shape == (horizon_len, action_dim)
    assert actions.dtype == torch.float
    assert logprobs.shape == (horizon_len,)
    assert logprobs.dtype == torch.float
    assert rewards.shape == (horizon_len, 1)
    assert rewards.dtype == torch.float
    assert undones.shape == (horizon_len, 1)
    assert undones.dtype == torch.float  # undones is float, instead of int
    assert set(undones.squeeze(1).cpu().data.tolist()).issubset({0.0, 1.0})  # undones in {0.0, 1.0}

    '''check for agent.update_net'''
    values = agent.cri(states).squeeze(1)
    assert values.shape == (horizon_len,)
    advantages = agent.get_advantages(rewards=rewards, undones=undones, values=values)
    assert advantages.shape == (horizon_len,)
    assert advantages.dtype in {torch.float, torch.int}

    logging_tuple = agent.update_net(buffer=buffer_items)
    assert isinstance(logging_tuple, tuple)
    assert any([isinstance(item, float) for item in logging_tuple])
    assert len(logging_tuple) >= 2


if __name__ == '__main__':
    check_agent_base()
    check_agent_dqn()
    check_agent_ddpg()
    check_agent_ppo()
    print('| Finish checking.')

| Finish checking.


### 测试Config

In [37]:
from unittest.mock import patch


def check_config():
    args = Config()  # check dummy Config
    assert args.get_if_off_policy() is True

    env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env_class = gym.make
    args = Config(agent_class=AgentDQN, env_class=env_class, env_args=env_args)
    assert args.get_if_off_policy() is True

    env_args = {'id': 'Pendulum', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    args = Config(agent_class=AgentDDPG, env_class=env_class, env_args=env_args)
    assert args.get_if_off_policy() is True

    env_args = {'id': 'Pendulum', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    args = Config(agent_class=AgentPPO, env_class=env_class, env_args=env_args)
    assert args.get_if_off_policy() is False

    args.if_remove = False
    args.init_before_training()  # os.path.exists(args.cwd) == False
    args.init_before_training()  # os.path.exists(args.cwd) == True
    assert os.path.exists(args.cwd)
    os.rmdir(args.cwd)

    args.if_remove = True
    args.init_before_training()  # os.path.exists(args.cwd) == False
    args.init_before_training()  # os.path.exists(args.cwd) == True
    assert os.path.exists(args.cwd)
    os.rmdir(args.cwd)


@patch('builtins.input', lambda *args: 'y')
def check_config_init_before_training_yes():
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = gym.make
    args = Config(agent_class=AgentDDPG, env_class=env_class, env_args=env_args)
    args.if_remove = None
    args.init_before_training()
    assert os.path.exists(args.cwd)
    os.rmdir(args.cwd)


@patch('builtins.input', lambda *args: 'n')
def check_config_init_before_training_no():
    env_args = {'id': 'Pendulum', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    args = Config(agent_class=AgentDDPG, env_class=env_class, env_args=env_args)
    args.if_remove = None
    args.init_before_training()
    assert os.path.exists(args.cwd)
    os.rmdir(args.cwd)


@patch('builtins.input', lambda *args: 'input_str')
def tutorial_unittest_mock_patch():
    print('Print_input():', input())


def check_kwargs_filter():
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    env = env_class(**kwargs_filter(env_class.__init__, env_args.copy()))
    assert hasattr(env, 'reset')
    assert hasattr(env, 'step')


def check_build_env():
    env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env_class = gym.make
    env = build_env(env_class=env_class, env_args=env_args)
    assert isinstance(env.id, str)
    assert isinstance(env.state_dim, int)
    assert isinstance(env.action_dim, int)
    assert isinstance(env.if_discrete, bool)

    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    env = build_env(env_class=env_class, env_args=env_args)
    assert isinstance(env.id, str)
    assert isinstance(env.state_dim, int)
    assert isinstance(env.action_dim, int)
    assert isinstance(env.if_discrete, bool)


def check_get_gym_env_args():
    env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env_class = gym.make
    env = build_env(env_class=env_class, env_args=env_args)
    env_args = get_gym_env_args(env, if_print=True)
    assert isinstance(env_args['id'], str)
    assert isinstance(env_args['state_dim'], int)
    print(type(env_args['action_dim']))
    assert isinstance(env_args['action_dim'], np.int64)
    assert isinstance(env_args['if_discrete'], bool)

    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    env = build_env(env_class=env_class, env_args=env_args)
    env_args = get_gym_env_args(env, if_print=True)
    assert isinstance(env_args['id'], str)
    assert isinstance(env_args['state_dim'], int)
    assert isinstance(env_args['action_dim'], int)
    assert isinstance(env_args['if_discrete'], bool)


if __name__ == '__main__':
    check_config()
    check_config_init_before_training_no()
    check_config_init_before_training_yes()
    tutorial_unittest_mock_patch()

    check_kwargs_filter()
    check_build_env()
    check_get_gym_env_args()
    print('| Finish checking.')

| Arguments Keep cwd: ./Pendulum_PPO_0
| Arguments Keep cwd: ./Pendulum_PPO_0
| Arguments Remove cwd: ./Pendulum_PPO_0
| Arguments Remove cwd: ./Pendulum_PPO_0
| Arguments Keep cwd: ./Pendulum_DDPG_0
| Arguments Keep cwd: ./Pendulum-v1_DDPG_0
Print_input(): input_str
env_args = {'id': 'CartPole-v1',
            'state_dim': 4,
            'action_dim': 2,
            'if_discrete': True}
<class 'numpy.int64'>
env_args = {'id': 'Pendulum-v1',
            'state_dim': 3,
            'action_dim': 1,
            'if_discrete': False}
| Finish checking.


## Run

智能体与环境交互过程的控制

In [45]:
import os
import time

import numpy as np

class Evaluator:
    def __init__(self, eval_env, eval_per_step: int = 1e4, eval_times: int = 8, cwd: str = '.') -> None:
        self.cwd = cwd
        self.eval_env = eval_env
        self.eval_step = 0
        self.total_step = 0
        self.start_time = time.time()
        self.eval_times = eval_times # 获得episodic return的次数
        self.eval_per_step = eval_per_step # 每步训练的评估次数

        self.recorder = []
        print("| Evaluator:"
              "\n| `step`: Number of samples, or total training steps, or running times of `env.step()`."
              "\n| `time`: Time spent from the start of training to this moment."
              "\n| `avgR`: Average value of cumulative rewards, which is the sum of rewards in an episode."
              "\n| `stdR`: Standard dev of cumulative rewards, which is the sum of rewards in an episode."
              "\n| `avgS`: Average of steps in an episode."
              "\n| `objC`: Objective of Critic network. Or call it loss function of critic network."
              "\n| `objA`: Objective of Actor network. It is the average Q value of the critic network."
              f"\n| {'step':>8}  {'time':>8}  | {'avgR':>8}  {'stdR':>6}  {'avgS':>6}  | {'objC':>8}  {'objA':>8}")
        
    def evaluate_and_save(self, actor, horizon_len: int, logging_tuple: tuple):
        self.total_step += horizon_len
        if self.eval_step + self.eval_per_step > self.total_step:
            return
        self.eval_step = self.total_step

        rewards_steps_ary = [get_rewards_and_steps(self.eval_env, actor) for _ in range(self.eval_times)]
        rewards_steps_ary = np.array(rewards_steps_ary, dtype=np.float32)
        avg_r = rewards_steps_ary[:, 0].mean()  # 累积奖励的平均值
        std_r = rewards_steps_ary[:, 0].std()  # 累积奖励的标准差
        avg_s = rewards_steps_ary[:, 1].mean()  # 每个episode的平均补偿

        used_time = time.time() - self.start_time
        self.recorder.append((self.total_step, used_time, avg_r))

        save_path = f"{self.cwd}/actor_{self.total_step:012.0f}_{used_time:08.0f}_{avg_r:08.2f}.pth"
        torch.save(actor.state_dict(), save_path)
        print(f"| {self.total_step:8.2e}  {used_time:8.0f}  "
              f"| {avg_r:8.2f}  {std_r:6.2f}  {avg_s:6.0f}  "
              f"| {logging_tuple[0]:8.2f}  {logging_tuple[1]:8.2f}")
        
    def close(self):
        np.save(f"{self.cwd}/recorder.npy", np.array(self.recorder))
        draw_learning_curve_using_recorder(self.cwd)

def get_rewards_and_steps(env, actor, if_render: bool = False) -> (float, int):  # cumulative_rewards and episode_steps
    if_discrete = env.if_discrete
    device = next(actor.parameters()).device  # net.parameters() is a Python generator.

    state, _ = env.reset()
    episode_steps = 0
    cumulative_returns = 0.0  # sum of rewards in an episode
    for episode_steps in range(12345):
        tensor_state = torch.as_tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        tensor_action = actor(tensor_state).argmax(dim=1) if if_discrete else actor(tensor_state)
        action = tensor_action.detach().cpu().numpy()[0]  # not need detach(), because using torch.no_grad() outside
        state, reward, terminated, truncated, _ = env.step(action)
        cumulative_returns += reward

        if if_render:
            env.render()
            time.sleep(0.02)
        if terminated or truncated:
            break
    cumulative_returns = getattr(env, 'cumulative_returns', cumulative_returns)
    return cumulative_returns, episode_steps + 1

def draw_learning_curve_using_recorder(cwd: str):
    recorder = np.load(f"{cwd}/recorder.npy")

    import matplotlib as mpl
    mpl.use('Agg')  # write  before `import matplotlib.pyplot as plt`. `plt.savefig()` without a running X server
    import matplotlib.pyplot as plt
    x_axis = recorder[:, 0]
    y_axis = recorder[:, 2]
    plt.plot(x_axis, y_axis)
    plt.xlabel('#samples (Steps)')
    plt.ylabel('#Rewards (Score)')
    plt.grid()

    file_path = f"{cwd}/LearningCurve.jpg"
    # plt.show()  # if use `mpl.use('Agg')` to draw figures without GUI, then plt can't plt.show()
    plt.savefig(file_path)
    print(f"| Save learning curve in {file_path}")

def train_agent(args: Config):
    args.init_before_training()

    env = build_env(args.env_class, args.env_args)
    agent = args.agent_class(args.net_dims, args.state_dim, args.action_dim, gpu_id=args.gpu_id, args=args)
    agent.last_state = env.reset()

    evaluator = Evaluator(eval_env=build_env(args.env_class, args.env_args),
                          eval_per_step=args.eval_per_step,
                          eval_times=args.eval_times,
                          cwd=args.cwd)
    
    if args.if_off_policy:
        buffer = ReplayBuffer(gpu_id=args.gpu_id,
                              max_size=args.buffer_size,
                              state_dim=args.state_dim,
                              action_dim=1 if args.if_discrete else args.action_dim, )
        buffer_items = agent.explore_env(env, args.horizon_len * args.eval_times, if_random=True)
        buffer.update(buffer_items)  # warm up for ReplayBuffer
    else:
        buffer = []

    # 开始训练
    cwd = args.cwd
    break_step = args.break_step
    horizon_len = args.horizon_len
    if_off_policy = args.if_off_policy
    del args

    torch.set_grad_enabled(False)
    while True:
        buffer_items = agent.explore_env(env, horizon_len)
        if if_off_policy:
            buffer.update(buffer_items)
        else:
            buffer[:] = buffer_items

        torch.set_grad_enabled(True)
        logging_tuple = agent.update_net(buffer)
        torch.set_grad_enabled(False)

        evaluator.evaluate_and_save(agent.act, horizon_len, logging_tuple)
        if (evaluator.total_step > break_step) or os.path.exists(f"{cwd}/stop"):
            break  # stop training when reach `break_step` or `mkdir cwd/stop`
    evaluator.close()

### 测试Run

In [47]:
import shutil

import numpy as np



def check_get_rewards_and_steps(net_dims=(64, 32)):
    pass

    """discrete env"""
    env_args = {'id': 'CartPole-v1', 'state_dim': 4, 'action_dim': 2, 'if_discrete': True}
    env_class = gym.make
    env = build_env(env_class=env_class, env_args=env_args)

    '''discrete env, on-policy'''
    actor = QNet(dims=net_dims, state_dim=env.state_dim, action_dim=env.action_dim)
    cumulative_returns, episode_steps = get_rewards_and_steps(env=env, actor=actor)
    assert isinstance(cumulative_returns, float)
    assert isinstance(episode_steps, int)
    assert episode_steps >= 1

    """continuous env"""
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    env = build_env(env_class=env_class, env_args=env_args)

    '''continuous env, off-policy'''
    actor = Actor(dims=net_dims, state_dim=env.state_dim, action_dim=env.action_dim)
    cumulative_returns, episode_steps = get_rewards_and_steps(env=env, actor=actor)
    assert isinstance(cumulative_returns, float)
    assert isinstance(episode_steps, int)
    assert episode_steps >= 1

    '''continuous env, on-policy'''
    actor = ActorPPO(dims=net_dims, state_dim=env.state_dim, action_dim=env.action_dim)
    cumulative_returns, episode_steps = get_rewards_and_steps(env=env, actor=actor)
    assert isinstance(cumulative_returns, float)
    assert isinstance(episode_steps, int)
    assert episode_steps >= 1


def check_draw_learning_curve_using_recorder(cwd='./temp'):
    os.makedirs(cwd, exist_ok=True)
    recorder_path = f"{cwd}/recorder.npy"
    recorder_len = 8

    recorder = np.zeros((recorder_len, 3), dtype=np.float32)
    recorder[:, 0] = np.linspace(1, 100, num=recorder_len)  # total_step
    recorder[:, 1] = np.linspace(1, 200, num=recorder_len)  # used_time
    recorder[:, 2] = np.linspace(1, 300, num=recorder_len)  # average of cumulative rewards
    np.save(recorder_path, recorder)
    draw_learning_curve_using_recorder(cwd)
    assert os.path.exists(f"{cwd}/LearningCurve.jpg")
    shutil.rmtree(cwd)


def check_evaluator(net_dims=(64, 32), horizon_len=1024, eval_per_step=16, eval_times=2, cwd='./temp'):
    env_args = {'id': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False}
    env_class = PendulumEnv
    env = build_env(env_class, env_args)
    actor = Actor(dims=net_dims, state_dim=env.state_dim, action_dim=env.action_dim)

    os.makedirs(cwd, exist_ok=True)
    evaluator = Evaluator(eval_env=env, eval_per_step=eval_per_step, eval_times=eval_times, cwd=cwd)
    evaluator.evaluate_and_save(actor=actor, horizon_len=horizon_len, logging_tuple=(0.1, 0.2))
    evaluator.evaluate_and_save(actor=actor, horizon_len=horizon_len, logging_tuple=(0.3, 0.4))
    evaluator.close()
    assert os.path.exists(f"{evaluator.cwd}/recorder.npy")
    assert os.path.exists(f"{evaluator.cwd}/LearningCurve.jpg")
    shutil.rmtree(cwd)


if __name__ == '__main__':
    check_draw_learning_curve_using_recorder()
    check_get_rewards_and_steps()
    check_evaluator()
    print('| Finish checking.')

| Save learning curve in ./temp/LearningCurve.jpg
| Evaluator:
| `step`: Number of samples, or total training steps, or running times of `env.step()`.
| `time`: Time spent from the start of training to this moment.
| `avgR`: Average value of cumulative rewards, which is the sum of rewards in an episode.
| `stdR`: Standard dev of cumulative rewards, which is the sum of rewards in an episode.
| `avgS`: Average of steps in an episode.
| `objC`: Objective of Critic network. Or call it loss function of critic network.
| `objA`: Objective of Actor network. It is the average Q value of the critic network.
|     step      time  |     avgR    stdR    avgS  |     objC      objA
| 1.02e+03         0  | -1451.29   17.97     200  |     0.10      0.20
| 2.05e+03         0  | -1025.02  106.17     200  |     0.30      0.40
| Save learning curve in ./temp/LearningCurve.jpg
| Finish checking.
