In [None]:
import numpy as np
import torch
from .runner.jsbsim_runner import _t2n


@torch.no_grad()
def collect(self, step):
    self.policy.prep_rollout()
    values, actions, action_log_probs, rnn_states_actor, rnn_states_critic = (
        self.policy.get_actions(
            np.concatenate(self.buffer.obs[step]),
            np.concatenate(self.buffer.rnn_states_actor[step]),
            np.concatenate(self.buffer.rnn_states_critic[step]),
            np.concatenate(self.buffer.masks[step]),
        )
    )
    # split parallel data [N*M, shape] => [N, M, shape]
    values = np.array(np.split(_t2n(values), self.n_rollout_threads))
    actions = np.array(np.split(_t2n(actions), self.n_rollout_threads))
    action_log_probs = np.array(
        np.split(_t2n(action_log_probs), self.n_rollout_threads)
    )
    rnn_states_actor = np.array(
        np.split(_t2n(rnn_states_actor), self.n_rollout_threads)
    )
    rnn_states_critic = np.array(
        np.split(_t2n(rnn_states_critic), self.n_rollout_threads)
    )
    return values, actions, action_log_probs, rnn_states_actor, rnn_states_critic

<!--
 * @Author       : zzp@buaa.edu.cn
 * @Date         : 2024-11-14 14:56:49
 * @LastEditTime : 2024-11-14 15:45:26
 * @FilePath     : /LAG/notes.ipynb
 * @Description  : 
-->
`JSBSIMRunner(): def collect(self, step)`: 这段代码定义了一个方法 collect，用于在强化学习的采样过程中收集数据。以下是对代码的逐步解析

1. `@torch.no_grad()` 装饰器：

   * 标记 `collect` 方法在执行时不计算梯度，这样可以节省内存和计算资源，因为在采样时不需要反向传播

2. `self.policy.prep_rollout()`：

   * 准备策略模型进入 `rollout`（采样）模式。通常涉及将模型设置为推理模式，比如 PyTorch 中的 `model.eval()`

3. 调用 `self.policy.get_actions` 获取行动数据:

   * `np.concatenate(...)`：通过拼接操作，将多个并行线程的数据在第0维（batch维度）合并，输入给策略网络

   * `rnn_states_actor` 和 `rnn_states_critic`：分别是 `actor` 和 `critic` 网络的 RNN 隐状态，用于在下一个时间步保持信息的连续性

4. 并行数据的分割与重组:

   * `np.array(np.split(..., self.n_rollout_threads))`：这一步通过 `np.split` 将每个变量从 [N*M, shape] 形状（N 表示采样步数，M 表示并行 rollout 线程数）拆分为 [N, M, shape] 形状。这样做的目的是方便并行处理不同的 rollout 线程

   * 由于之前数据是拼接的，将其分割成 [N, M, shape] 格式，其中 N 表示 rollout 线程数，M 表示批次大小（每个线程的数据量）。通过 _t2n 方法将 Tensor 转换为 numpy 数组，再用 np.split 按线程分割

   * `_t2n`：将 PyTorch tensor 转换为 NumPy 数组，以便与 NumPy 操作兼容

5. 返回：

   * 最终返回 values, actions, action_log_probs, rnn_states_actor, rnn_states_critic，这些数据会被存储到经验回放缓冲区，用于后续的策略优化和训练

总体而言，这段代码用于在并行环境中执行 rollout，采样多个线程的数据并进行分割和重组，为强化学习的下一步训练准备所需的数据。

In [None]:
import torch
import torch.nn as nn

from .algorithms.utils.mlp import MLPBase
from .algorithms.utils.gru import GRULayer
from .algorithms.utils.act import ACTLayer
from .algorithms.utils.utils import check


class PPOActor(nn.Module):
    def __init__(self, args, obs_space, act_space, device=torch.device("cpu")):
        super(PPOActor, self).__init__()
        # network config
        self.gain = args.gain
        self.hidden_size = args.hidden_size
        self.act_hidden_size = args.act_hidden_size
        self.activation_id = args.activation_id
        self.use_feature_normalization = args.use_feature_normalization
        self.use_recurrent_policy = args.use_recurrent_policy
        self.recurrent_hidden_size = args.recurrent_hidden_size
        self.recurrent_hidden_layers = args.recurrent_hidden_layers
        self.tpdv = dict(dtype=torch.float32, device=device)
        self.use_prior = args.use_prior
        # (1) feature extraction module
        self.base = MLPBase(
            obs_space,
            self.hidden_size,
            self.activation_id,
            self.use_feature_normalization,
        )
        # (2) rnn module
        input_size = self.base.output_size
        if self.use_recurrent_policy:
            self.rnn = GRULayer(
                input_size, self.recurrent_hidden_size, self.recurrent_hidden_layers
            )
            input_size = self.rnn.output_size
        # (3) act module
        self.act = ACTLayer(
            act_space, input_size, self.act_hidden_size, self.activation_id, self.gain
        )

        self.to(device)

    def forward(self, obs, rnn_states, masks, deterministic=False):
        obs = check(obs).to(**self.tpdv)
        rnn_states = check(rnn_states).to(**self.tpdv)
        masks = check(masks).to(**self.tpdv)
        if self.use_prior:
            # prior knowledge for controlling shoot missile
            attack_angle = torch.rad2deg(obs[:, 11])  # unit degree
            distance = obs[:, 13] * 10000  # unit m
            alpha0 = torch.full(size=(obs.shape[0], 1), fill_value=3).to(**self.tpdv)
            beta0 = torch.full(size=(obs.shape[0], 1), fill_value=10).to(**self.tpdv)
            alpha0[distance <= 12000] = 6
            alpha0[distance <= 8000] = 10
            beta0[attack_angle <= 45] = 6
            beta0[attack_angle <= 22.5] = 3

        actor_features = self.base(obs)

        if self.use_recurrent_policy:
            actor_features, rnn_states = self.rnn(actor_features, rnn_states, masks)

        if self.use_prior:
            actions, action_log_probs = self.act(
                actor_features, deterministic, alpha0=alpha0, beta0=beta0
            )
        else:
            actions, action_log_probs = self.act(actor_features, deterministic)

        return actions, action_log_probs, rnn_states

    def evaluate_actions(self, obs, rnn_states, action, masks, active_masks=None):
        obs = check(obs).to(**self.tpdv)
        rnn_states = check(rnn_states).to(**self.tpdv)
        action = check(action).to(**self.tpdv)
        masks = check(masks).to(**self.tpdv)
        if self.use_prior:
            # prior knowledge for controlling shoot missile
            attack_angle = torch.rad2deg(obs[:, 11])  # unit degree
            distance = obs[:, 13] * 10000  # unit m
            alpha0 = torch.full(size=(obs.shape[0], 1), fill_value=3).to(**self.tpdv)
            beta0 = torch.full(size=(obs.shape[0], 1), fill_value=10).to(**self.tpdv)
            alpha0[distance <= 12000] = 6
            alpha0[distance <= 8000] = 10
            beta0[attack_angle <= 45] = 6
            beta0[attack_angle <= 22.5] = 3

        if active_masks is not None:
            active_masks = check(active_masks).to(**self.tpdv)

        actor_features = self.base(obs)

        if self.use_recurrent_policy:
            actor_features, rnn_states = self.rnn(actor_features, rnn_states, masks)

        if self.use_prior:
            action_log_probs, dist_entropy = self.act.evaluate_actions(
                actor_features, action, active_masks, alpha0=alpha0, beta0=beta0
            )
        else:
            action_log_probs, dist_entropy = self.act.evaluate_actions(
                actor_features, action, active_masks
            )

        return action_log_probs, dist_entropy

这段代码定义了一个 PPOActor 类，它是一个强化学习模型的 actor 模块，基于 Proximal Policy Optimization (PPO) 算法。该模块主要实现策略网络的结构和前向传播过程，并支持特征提取、循环神经网络（RNN）、动作输出和先验知识。下面是对各部分的详细解析：

# 初始化方法 __init__

* 初始化参数：该方法接受模型的参数 args，观察空间 obs_space，动作空间 act_space，以及计算设备 device（默认为 CPU）。

* 网络配置：从 args 中获取模型的超参数和配置，包括增益 gain、隐藏层大小 hidden_size、激活函数选择 activation_id、是否使用特征标准化 use_feature_normalization、是否使用循环策略 use_recurrent_policy、以及先验知识 use_prior

* 特征提取模块 (self.base)：调用 MLPBase 初始化特征提取网络，处理观察输入 obs_space 后输出隐藏层特征

* RNN 模块 (self.rnn)：如果启用了循环策略 use_recurrent_policy，则创建 GRU 层（即 GRULayer），用于处理时间序列的特征提取

* 动作输出模块 (self.act)：ACTLayer 生成模型的动作输出，输入为特征提取后的数据，输出相应的动作空间 act_space 中的动作。

# 前向传播方法 forward

* 输入处理：检查并将 obs、rnn_states、masks 转换为指定数据类型和设备，以保证输入数据的一致性

* 先验知识应用：

  * 如果 use_prior 为真，则根据先验知识调整策略网络的输出。这里的先验知识基于攻击角度 attack_angle 和距离 distance，通过调整参数 alpha0 和 beta0 控制模型对发射导弹的策略倾向：
    * 当距离较近时增大 alpha0，使得动作更趋向发射导弹；
    * 当攻击角度较小（即更接近正前方）时，减小 beta0，增加导弹发射的可能性

* 特征提取：将观察输入 obs 通过 MLPBase 提取特征 actor_features

* RNN 特征提取：
  * 如果使用了循环策略，将特征和 RNN 状态 rnn_states 输入到 GRU 层进行时间序列处理，并更新 RNN 隐状态

* 动作输出：
  * 根据 use_prior 的设定，调用 ACTLayer 获取最终的动作 actions 和动作的对数概率 action_log_probs。如果启用了先验知识，则会将 alpha0 和 beta0 作为参数传递给 ACTLayer，否则直接使用提取的特征进行输出

* 输出： 返回动作 actions，动作的对数概率 action_log_probs，以及更新后的 RNN 隐状态 rnn_states

# 总结

PPOActor 是一个基于多层感知器 (MLP) 和 GRU 的 PPO 策略网络。它的结构主要由特征提取模块、RNN 模块和动作输出模块组成。PPOActor 在 forward 中整合了先验知识，可以根据特定条件（如攻击角度和距离）动态调整输出策略。这种设计适用于需要长序列特征和先验知识辅助的复杂任务，例如导弹发射决策的强化学习

In [None]:
@torch.no_grad()
def collect(self, step):
    self.policy.prep_rollout()
    values, actions, action_log_probs, rnn_states_actor, rnn_states_critic = (
        self.policy.get_actions(
            np.concatenate(self.buffer.obs[step]),
            np.concatenate(self.buffer.rnn_states_actor[step]),
            np.concatenate(self.buffer.rnn_states_critic[step]),
            np.concatenate(self.buffer.masks[step]),
        )
    )
    # split parallel data [N*M, shape] => [N, M, shape]
    values = np.array(np.split(_t2n(values), self.n_rollout_threads))
    actions = np.array(np.split(_t2n(actions), self.n_rollout_threads))
    action_log_probs = np.array(
        np.split(_t2n(action_log_probs), self.n_rollout_threads)
    )
    rnn_states_actor = np.array(
        np.split(_t2n(rnn_states_actor), self.n_rollout_threads)
    )
    rnn_states_critic = np.array(
        np.split(_t2n(rnn_states_critic), self.n_rollout_threads)
    )

    # [Selfplay] get actions of opponent policy
    opponent_actions = np.zeros_like(actions)
    for policy_idx, policy in enumerate(self.opponent_policy):
        env_idx = self.opponent_env_split[policy_idx]
        opponent_action, opponent_rnn_states = policy.act(
            np.concatenate(self.opponent_obs[env_idx]),
            np.concatenate(self.opponent_rnn_states[env_idx]),
            np.concatenate(self.opponent_masks[env_idx]),
        )
        opponent_actions[env_idx] = np.array(
            np.split(_t2n(opponent_action), len(env_idx))
        )
        self.opponent_rnn_states[env_idx] = np.array(
            np.split(_t2n(opponent_rnn_states), len(env_idx))
        )
    actions = np.concatenate((actions, opponent_actions), axis=1)

    return values, actions, action_log_probs, rnn_states_actor, rnn_states_critic

`SelfplayJSBSimRunner`

# [自对弈] 获取对手策略的动作数据

* 如果使用自对弈，该部分会依次遍历每个对手策略 self.opponent_policy，并获取对手的动作 opponent_action 和 RNN 状态 opponent_rnn_states
  * 单次选取的对手只有一个，即 num_opponents == 1
  * opponent_policy (__type__: list) 实际上列表中只有一个，所有的 opponent 使用的都是同一种 policy，即 PPOPolicy
  * opponent_env_split (__type__: list): 将 n_rollout_threads 分成若干份，每一份包含 len(opponent_policy) 个thread，实际上 32 个 threads 也就分成了 32 份
  * 这样就更新了所有 32 个对手的每一个的 action and states，然后更新到 opponent_actions and opponent_rnn_states当中
  * 最后将 actions and opponent_actions 拼接在一起

* 返回：
  * 函数返回值包括：状态值估计 values、合并后的动作 actions、动作对数概率 action_log_probs、以及代理的 RNN 状态 rnn_states_actor 和 rnn_states_critic

该 collect 函数的核心是获取当前策略（包含代理和对手）的行为数据，并将这些数据按 rollout 线程数进行整理。这些数据可用于后续的策略更新或训练阶段

`init_states = {'ic_long_gc_deg': 120.0, 'ic_lat_geod_deg': 60.0, 'ic_h_sl_ft': 20000, 'ic_psi_true_deg': 0.0, 'ic_u_fps': 800.0}`