## 19.3 PETS代码实践(Pendulum-v1)

导入相关库：

In [24]:
# 基本库
import numpy as np
from scipy.stats import truncnorm  # 用于生成和操作截断正态分布（Truncated Normal Distribution）
import random
from tqdm import tqdm

from utils.smoothing import moving_average
from utils.advantage import compute_advantage
from utils.training import train_on_policy_agent
# 神经网络
import torch
import torch.nn.functional as F
import torch.nn as nn
# Gymnasium 是一个用于开发和测试强化学习算法的工具库，为 OpenAI Gym 的更新版本（2021迁移开发）
import gymnasium as gym

***候选动作序列的生成：交叉熵方法（CEM）：***

In [25]:
class CEM:  # 优化随机采样动作的分布的均值和方差
    def __init__(self, n_sequence, elite_ratio, fake_env, upper_bound, lower_bound):
        self.n_sequence = n_sequence
        self.elite_ratio = elite_ratio  # 精英策略的比例（选择最好的前elite_ratio部分）
        self.upper_bound = upper_bound
        self.lower_bound = lower_bound
        self.fake_env = fake_env  # 用来模拟环境的虚拟环境对象

    def optimize(self, state, init_mean, init_var):
        mean, var = init_mean, init_var  # 初始的动作分布的均值和方差，mean 控制动作的中心位置，var 控制动作的分散程度
        X = truncnorm(-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))  # 创建n维（动作维度）截断正态分布对象
        state = np.tile(state, (self.n_sequence, 1))  # 扩展 n_sequence 个相同的状态，以便在同一时刻生成多个动作序列

        for _ in range(5):  # 重复进行多次优化
            lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean
            constrained_var = np.minimum(np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)), var)  # 方差的约束
            # 生成动作序列
            action_sequences = [X.rvs() for _ in range(self.n_sequence)] * np.sqrt(constrained_var) + mean
            # 计算每条动作序列的累积奖励
            returns = self.fake_env.propagate(state, action_sequences)[:, 0]
            # 选取累积奖励高的若干条动作序列（精英序列）
            elites = action_sequences[np.argsort(returns)][-int(self.elite_ratio * self.n_sequence):]
            new_mean = np.mean(elites, axis=0)
            new_var = np.var(elites, axis=0)
            # 指数加权平均平滑更新更新动作序列分布
            mean = 0.1 * mean + 0.9 * new_mean
            var = 0.1 * var + 0.9 * new_var

        return mean

### 1.定义模型中每一层的构造

***激活函数定义：***

In [26]:
class Swish(nn.Module):
    """ Swish激活函数 """
    def __init__(self):
        super(Swish, self).__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)


![Swish_ReLU_Sigmoid三个激活函数的曲线对比图](Illustrations/Swish_ReLU_Sigmoid三个激活函数的曲线对比图.png)

In [27]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

***初始化网络层参数：***

In [28]:
def init_weights(m):  # m 为某个网络层
    """ 初始化模型权重 """
    def truncated_normal_init(t, mean=0.0, std=0.01):
        """ 截断正态分布 """
        torch.nn.init.normal_(t, mean=mean, std=std)  # 用均值为 mean、标准差为 std 的正态分布随机数填充张量t 
        while True:
            cond = (t < mean - 2 * std) | (t > mean + 2 * std)
            if not torch.sum(cond):  # 如果所有值都在范围内 (torch.sum(cond) == 0)，就退出循环
                break
            t = torch.where(
                cond,
                torch.nn.init.normal_(torch.ones(t.shape, device=device),mean=mean,std=std), 
                t)  # 如果有越界的值，就重新采样，并用 torch.where 把这些位置替换成新的采样值
        return t

    if type(m) == nn.Linear or isinstance(m, FCLayer):  # 若 m 是全连接层（nn.Linear）或 自定义层 FCLayer
        truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))  # 权重 m.weight 用截断正态分布初始化
        m.bias.data.fill_(0.0)  # 偏置 m.bias 全部设为 0


> 标准差缩放技巧，保证方差不会太大：
$$\mathrm{std}=\frac{1}{2\sqrt{m-inputdim}}$$

***自定义的全连接层：***

In [29]:
class FCLayer(nn.Module):
    """ 自定义的全连接层 (FCLayer)，支持 ensemble（集成） """
    # ensemble_size: 集成的个数（即同时训练多少组独立的参数）
    # activation: 激活函数（如 ReLU、Swish）
    def __init__(self, input_dim, output_dim, ensemble_size, activation):
        super(FCLayer, self).__init__()
        self._input_dim, self._output_dim = input_dim, output_dim
        self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
        self._activation = activation
        self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device))

    def forward(self, x):
        return self._activation(
            torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))  # 矩阵计算
            # X:(ensemble_size, batch_size, input_dim)
            # W:(ensemble_size, input_dim, output_dim)
            # 偏置bias扩展到 (ensemble_size, batch_size, output_dim)

### 2.环境模型集成

***softplus 函数：***
$$\mathrm{softplus}(x)=\log(1+e^x)$$
- **softplus** 常用于 **平滑 和 数值约束**
- 相比直接 clamp()，**softplus** **可导且更平滑**，不会破坏训练梯度

In [None]:
class EnsembleModel(nn.Module):
    """ 环境模型集成 """
    def __init__(self,
                 state_dim,
                 action_dim,
                 ensemble_size=5,  # 模型成员总数为ensemble_size，对应训练ensemble_size组权重
                 learning_rate=1e-3):
        super(EnsembleModel, self).__init__()
        self._output_dim = (state_dim + 1) * 2  # 预测状态与奖励以及对应方差
        # 方差上界和下界
        self._max_logvar = nn.Parameter((torch.ones(
            (1, self._output_dim // 2)).float() / 2).to(device),
                                        requires_grad=False)
        self._min_logvar = nn.Parameter((-torch.ones(
            (1, self._output_dim // 2)).float() * 10).to(device),
                                        requires_grad=False)
        # 集成模型中每个成员为5层神经网络
        self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size, Swish())
        self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer5 = FCLayer(200, self._output_dim, ensemble_size, nn.Identity())  # nn.Identity()，原样返回输入
        self.apply(init_weights)  # 对所有 nn.Linear 和 FCLayer 层做权重初始化
        self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x, return_log_var=False):  # 选择是否返回对数方差
        # 前向传播
        ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
        # 输出一分为二
        mean   = ret[:, :, :self._output_dim // 2]   # 前一半 -> 均值
        raw_lv = ret[:, :, self._output_dim // 2:]   # 后一半 -> 原始对数方差 logvar（未约束）
        # 使用softplus函数,将方差控制在最小值和最大值之间
        logvar = self._max_logvar - F.softplus(self._max_logvar - raw_lv)
        logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
        return mean, logvar if return_log_var else torch.exp(logvar)

    def loss(self, mean, logvar, labels, use_var_loss=True):  # 是否选择带方差的 loss
        inverse_var = torch.exp(-logvar)
        if use_var_loss:
            # 对应简化损失函数
            mse_loss = torch.mean(torch.mean(torch.pow(mean - labels, 2) * inverse_var,
                                             dim=-1),dim=-1)
            var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
            total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
        else:
            # 
            mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
            total_loss = torch.sum(mse_loss)
        return total_loss, mse_loss

    def train(self, loss):
        self.optimizer.zero_grad()
        loss += 0.01 * torch.sum(self._max_logvar) - 0.01 * torch.sum(self._min_logvar)
        loss.backward()
        self.optimizer.step()