## Step1 初始化 Jupyter 环境 & 导入包

In [None]:
# 用于在 Jupyter 中强制刷新参数
%reset -f

# 导入相关的包
import copy
import os
import sys
from collections import deque
from pathlib import Path

import torch
import pygame
import imageio
import gymnasium as gym
import numpy as np

from tqdm.notebook import tqdm
from torch.distributions import Categorical
from loguru import logger

## Step2 设置相关参数

In [None]:
# 相关功能
is_training = 1                     # 是否进行训练
is_evaluate = 0                     # 是否进行评估, 此时会渲染游戏画面
need_record = 0                     # 是否开启录像, 前提是 is_evaluate=1 才有效, 不会渲染游戏画面

# 日志等级
log_level = "INFO"
logger.remove()
logger.add(sys.stderr, level=log_level)

# 环境信息
env_id = "CartPole-v1"              # 游戏环境名
max_steps = 500                     # 每个回合的最大步数
render_mode = "rgb_array"           # 渲染模式，可选 "human"、"rgb_array" 等

# TRPO 算法参数
memory_buffer_size = 10000          # 记忆缓存区大小
frame_stack = 2                     # 帧堆叠的数量
gamma = 0.95                        # 折扣因子, 控制未来奖励的重要性
lmbda = 0.9                         # GAE 参数, 控制轨迹长度
kl_constraint = 0.001               # KL散度距离约束
linesearch_alpha = 0.9              # 线性搜索的 alpha
epslion = 1e-8                      # 用于数值稳定性的小数

# 训练参数
num_train_episodes = 2000           # 训练的总回合数
lr = 2e-3                           # 学习率

# 评估参数
num_eval_episodes = 10              # 评估的回合数
reward_threshold = 500              # 评估奖励阈值, 如果高于阈值时, 日志等级为 Success, 否则为 Warning
eval_sample_action = True           # 评估时的动作是否基于概率来采样, True 则基于概率来选取动作, False 则直接选取最大概率


# 保存策略
save_dir = "./Gym_Classic_CartPole_TRPO_FrameStack"              # 数据保存的目录
save_freq = 100                                                  # 模型保存的频率
max_checkpoints = 5                                              # 最大保存的模型数量
checkpoint_perfix_A = "CheckPoint_Gym_Classic_CartPole_A_"       # 模型保存的前缀 Actor
checkpoint_perfix_C = "CheckPoint_Gym_Classic_CartPole_C_"       # 模型保存的前缀 Critic
evaluate_record_perfix = "Video_Gym_Classic_CartPole_"           # 评估记录保存的前缀
evaluate_record_fps = 30                                         # 评估记录保存的帧率
evaluate_record_quality = 10                                     # 评估记录保存的质量, 值为 0 ~ 10

# 其余参数初始化
device = "cuda" if torch.cuda.is_available() else "cpu"
# torch.autograd.set_detect_anomaly(True)                        # 开启 PyTorch 自动微分异常检测, 用于调试 NaN 梯度问题

## Step3 预处理函数 & 工具

In [None]:
def get_max_checkpoint_id(checkpoint_perfix, save_dir=save_dir):
    """
    获取最新的模型路径, 并返回 "模型路径" 和 checkpoint 对应的 id
    """
    # 如果指定目录不存在, 则直接创建该目录
    if not Path(save_dir).exists():
        Path(save_dir).mkdir(parents=True)
        logger.debug("The specified directory does not exist, will create this folder")
        return None
    
    # 获取所有的模型文件
    checkpoints = []
    current_path = Path(save_dir)
    for entry in current_path.iterdir():
        if entry.is_file() and entry.suffix == ".pth" and entry.name.startswith(checkpoint_perfix):
            id = entry.name.split(checkpoint_perfix)[-1].split(".")[0]
            checkpoints.append(int(id))
    
    # 寻找最大的 checkpoint id
    if checkpoints.__len__() == 0:
        logger.info(f"Not found any {checkpoint_perfix} files, will random initialization of network parameters")
        return None
    else:
        max_checkpoint_id = max(checkpoints)
        max_checkpoint_path = os.path.abspath(f"{save_dir}/{checkpoint_perfix}{max_checkpoint_id}.pth")
        logger.info(f"Found max checkpoints, max_checkpoint_id is {max_checkpoint_id}")
        return {"max_checkpoint_path" : max_checkpoint_path, "max_checkpoint_id" : max_checkpoint_id}

In [None]:
def del_old_checkpoint(checkpoint_perfix, save_dir=save_dir, max_checkpoints=max_checkpoints):
    """
    删除旧的模型文件, 只保留最新的 max_checkpoints 个模型文件
    """
    if Path(save_dir).exists():
        checkpoints = []
        for entry in Path(save_dir).iterdir():
            if entry.is_file() and entry.suffix == ".pth" and entry.name.startswith(checkpoint_perfix):
                id = int(entry.name.split(checkpoint_perfix)[-1].split(".")[0])
                checkpoints.append(id)
    
    if checkpoints.__len__() > max_checkpoints:
        min_checkpoint_id = min(checkpoints)
        min_checkpoint_path = os.path.abspath(f"{save_dir}/{checkpoint_perfix}{min_checkpoint_id}.pth")
        os.remove(min_checkpoint_path)
        logger.warning(f"Delete old checkpoint file {min_checkpoint_path}")

## Step4 定义智能体

In [None]:
class RLAgent:
    """
    智能体类, 封装了智能体所需要的各种方法
    """
    def __init__(self, action_size):
        # Global Args
        self.max_checkpoint_a = get_max_checkpoint_id(checkpoint_perfix=checkpoint_perfix_A)
        self.max_checkpoint_c = get_max_checkpoint_id(checkpoint_perfix=checkpoint_perfix_C)
        self.memory_buffer = deque(maxlen=memory_buffer_size)

        # Init Actor Network
        self.actor_network = torch.nn.Sequential(
            torch.nn.LazyLinear(out_features=128),
            torch.nn.LeakyReLU(),
            torch.nn.LazyLinear(out_features=128),
            torch.nn.LeakyReLU(),
            torch.nn.LazyLinear(out_features=action_size),
        )
        if self.max_checkpoint_a is not None:
            self.actor_network.load_state_dict(torch.load(self.max_checkpoint_a["max_checkpoint_path"]))
        
        # Init Critic Network
        self.critic_network = torch.nn.Sequential(
            torch.nn.LazyLinear(out_features=128),
            torch.nn.LeakyReLU(),
            torch.nn.LazyLinear(out_features=128),
            torch.nn.LeakyReLU(),
            torch.nn.LazyLinear(out_features=1),
        )
        if self.max_checkpoint_c is not None:
            self.critic_network.load_state_dict(torch.load(self.max_checkpoint_c["max_checkpoint_path"]))

        # Move to designated device
        self.actor_network.to(device)
        self.critic_network.to(device)

        # optimizer
        self.c_optimizer = torch.optim.AdamW(self.critic_network.parameters(), lr=lr)

    def processing_states(self, frame_buffer):
        """
        对输入的 frame_buffer 进行预处理, 并返回模型可以处理的 Tensor 对象
        """
        # 将形状处理为 [1, size * frame_buffer_size]
        states = torch.tensor(np.array(frame_buffer))
        states = states.reshape(1, -1)
        logger.debug(f"Processing states shape: {states.shape}")
        return states
    
    def select_action(self, state, sample=True):
        """
        选择动作, Policy Gradient 算法需要对模型的输出进行采样
        :param state:  神经网络可以接收的输入形状: [batch_size, color_channel * stack_size, height, width]
        :param sample: 动作是否是采样, 如果不是则直接选择概率最高的动作
        """
        state = state.to(device)
        if sample:
            # https://pytorch.ac.cn/docs/stable/distributions.html#categorical
            # 采样 & 动作的对数概率最好采用这种方法, 可以避免梯度消失的问题
            logits = self.actor_network(state)
            action_dist = Categorical(logits=logits)
            action = action_dist.sample()
            log_prob = action_dist.log_prob(action).detach()
            return {"action": action, "log_prob": log_prob}
        else:
            action_logits = self.actor_network(state)
            action = action_logits.argmax(dim=1).item()
            return {"action": action}
    
    def hessian_matrix_vector_product(self, state, old_action_dists, vector):
        """
        计算黑塞矩阵和一个向量的乘积, 即 (Hessian-Vector Product, HVP)
        该方法用于 TRPO 中的 "共轭梯度法(conjugate gradient)", 用来求解步长方向
        """
        # 获取新的 action 状态分布
        new_action_dists = torch.distributions.Categorical(logits=self.actor_network(state))
        # 计算新旧策略分布之间的平均 "KL散度"
        kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists, new_action_dists))
        # 对 "KL散度" 关于 "策略参数" 求一阶导数(梯度), 并保留计算图 (create_graph=True), 以便之后还能继续对这个梯度再求导
        kl_grad = torch.autograd.grad(kl, self.actor_network.parameters(), create_graph=True, retain_graph=True)
        # 把梯度参数展平, 拼成一个向量
        kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])
        # 计算 "KL散度" 的梯度 和 外部给定的向量的点积 (这一步是自动微分技巧的关键)
        kl_grad_vector_product = torch.dot(kl_grad_vector, vector)
        # 对上一步结果再次对 "策略参数" 求导, 这就是 Hessian-Vector Product (HVP)
        grad2 = torch.autograd.grad(kl_grad_vector_product, self.actor_network.parameters(), retain_graph=True)
        # 展平并拼成一个向量并返回
        grad2_vector = torch.cat([grad.view(-1) for grad in grad2])
        return grad2_vector
    
    def conjugate_gradient(self, grad, state, old_action_dists):
        """
        共轭梯度法求解方程
        """
        x = torch.zeros_like(grad)
        r = grad.clone()
        p = grad.clone()
        rdotr = torch.dot(r, r)

        # 共轭梯度主循环
        for i in range(40):
            Hp = self.hessian_matrix_vector_product(state, old_action_dists, p)
            # 数值稳定性: alpha = rdotr / torch.dot(p, Hp)
            # torch.sign(denom) 会返回 denom 的符号, 通过这种方式可以保留原始输入的符号
                # 如果为正数则返回 1.0; 
                # 如果为负数则返回 -1.0; 
                # 如果为零则返回 0.0
            # 乘以 epslion 就是得到一个很小但带有原始符号的数
            denom = torch.dot(p, Hp)
            denom = denom if torch.abs(denom) > epslion else torch.sign(denom) * epslion
            alpha = rdotr / denom

            x += alpha * p
            r -= alpha * Hp
            new_rdotr = torch.dot(r, r)
            if new_rdotr < epslion:
                logger.debug(f"conjugate gradient loop number: {i+1}")
                break
            beta = new_rdotr / rdotr
            p = r + beta * p
            rdotr = new_rdotr
        return x
    
    def compute_surrogate_obj(self, state, actions, advantage, old_log_probs, actor_network): 
        """
        用于评估在指定策略网络参数下, 策略的期望提升程度, 即评估策略目标函数
        注意: actor_network 是必须要传入的参数, 因此评估的就是指定 actor_network 参数
        """
        log_probs = torch.distributions.Categorical(logits=actor_network(state)).log_prob(actions)
        # 如果新旧概率分布相同, 则 ratio 的结果为 1
        ratio = torch.exp(log_probs - old_log_probs)
        return torch.mean(ratio * advantage)

    def line_search(self, state, actions, advantage, old_log_probs, old_action_dists, max_vec):
        """
        线性搜索
        """
        old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor_network.parameters())
        # 计算在指定 "策略网络参数" 的情况下, 目标函数的期望估计(old_obj)
        old_obj = self.compute_surrogate_obj(state, actions, advantage, old_log_probs, self.actor_network)
        # 先复制, 然后再需要时直接转换参数
        new_actor = copy.deepcopy(self.actor_network).to(device=device)

        # 线性搜索主循环
        for i in range(15):
            # 每次缩小的比例
            coef = linesearch_alpha**i

            # 求出 actor_network 的新参数, 并赋值给 new_actor
            new_para = old_para + coef * max_vec
            torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())

            try:
                # 利用 new_actor 求概率分布, "KL散度", 目标函数的期望估计(new_obj)
                new_action_dists = torch.distributions.Categorical(logits=new_actor(state))
                kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists, new_action_dists))
                new_obj = self.compute_surrogate_obj(state, actions, advantage, old_log_probs, new_actor)
                # 比较新老参数优劣
                if new_obj > old_obj and kl_div < kl_constraint:
                    logger.info("line search result: updated actor_network parameters")
                    logger.debug(f"new_obj: {new_obj:.6f} | old_obj: {old_obj:.6f} | kl_div: {kl_div:.6f}")
                    return new_para
            except Exception as e:
                logger.error("line search result: numerical anomaly, skip this update")
                return old_para
        logger.warning("line search result: use old actor_network parameters")
        logger.warning(f"new_obj: {new_obj:.6f} | old_obj: {old_obj:.6f} | kl_div: {kl_div:.6f}")
        return old_para
    
    def policy_learn(self, state, actions, old_action_dists, old_log_probs, advantage):
        """
        更新 actor 网络参数
        """
        # 计算在指定 "策略网络参数" 的情况下, 目标函数的期望估计(old_obj)
        surrogate_obj = self.compute_surrogate_obj(state, actions, advantage, old_log_probs, self.actor_network)
        # 求 surrogate_obj = f(actor_network(state)) 这个函数的梯度
        # f(x) 代表着根据当前策略分布、旧策略分布、优势函数等计算出来的目标函数, 也就是 compute_surrogate_obj
        grads = torch.autograd.grad(surrogate_obj, self.actor_network.parameters(), retain_graph=True)
        # 展平梯度, 以备后续计算
        obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()

        # 检查相关变量的值是否合法
        if all((grad is not None) and torch.all(grad == 0) for grad in grads):
            # grads 的类型是 tuple, 因此需要分别判断
            logger.error(f"grad's value is NaN or all 0!")     
        if torch.all(obj_grad == 0):
            logger.error(f"obj_grad's value is be all 0!")

        # 用共轭梯度法计算x = H^(-1)g
        descent_direction = self.conjugate_gradient(obj_grad, state, old_action_dists)
        # Hessian 向量积
        Hd = self.hessian_matrix_vector_product(state, old_action_dists, descent_direction)
        # max_coef 就是需要更新的步长
        max_coef = torch.sqrt(2 * kl_constraint / (torch.dot(descent_direction, Hd) + epslion))

        # 检查相关变量的值是否合法
        if torch.isnan(descent_direction).any():
            logger.error(f"descent_direction contains NaN!")
        if torch.isnan(Hd).any():
            logger.error(f"Hd contains NaN!")
        if torch.isnan(max_coef):
            logger.error(f"max_coef is NaN!")

        # 线性搜索
        new_para = self.line_search(state, actions, advantage, old_log_probs, old_action_dists, descent_direction * max_coef)
        # 用线性搜索后的参数更新策略
        torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor_network.parameters())

    def compute_advantage(self, gamma, lmbda, td_err):
        """
        广义优势估计 (Generalized Advantage Estimation, GAE)
        Args:
        - gamma:  折扣因子 (0 ~ 1), 控制未来奖励的重要性
        - lmbda:  GAE 衰减参数 (0 ~ 1), 控制轨迹长度
        - td_err: 时间差分误差 (TD Error) 的张量
        """
        # 将 td_err 从计算图中卸载下来, 避免影响梯度传递(核心思想为对于常量, 最好都执行 detach 来避免影响梯度传递)
        td_err = td_err.detach().numpy()
        advantage_list = []
        advantage = 0.0
        # 逆序遍历 td_err
        for delta in td_err[::-1]:
            # 核心公式: A_t = γλA_{t+1} + δ_t
            # gamma * lmbda 控制信息衰减的乘子 (γλ)
            # delta 当前时间步的 td_err (δ_t)
            advantage = gamma * lmbda * advantage + delta
            advantage_list.append(advantage)
        # 将计算结果逆序回原始顺序 (因为之前是逆序的)
        advantage_list.reverse()
        # 将列表转换为 np.array, 这样转换为 tensor 的速度会更快
        advantages = np.array(advantage_list, dtype=np.float32)
        return torch.from_numpy(advantages)

    def update(self):
        """
        TRPO 算法更新
        """
        num_mems = len(self.memory_buffer)
        logger.debug(f"memory buffer size: {num_mems}")

        # 提取对应的数据
        # 注意, 这里要提前处理形状, 防止在计算时广播导致形状不对
        state = torch.cat([data["St"] for data in self.memory_buffer], dim=0).to(device)
        action = torch.tensor([data["At"] for data in self.memory_buffer]).to(device)
        reward = torch.tensor([data["Rt"] for data in self.memory_buffer]).unsqueeze(1).to(device)
        next_state = torch.cat([data["St+1"] for data in self.memory_buffer], dim=0).to(device)
        done = torch.tensor([data["Done"] for data in self.memory_buffer]).float().unsqueeze(1).to(device)
        logger.debug(f"state shape: {state.shape}, action shape: {action.shape}, reward shape: {reward.shape}, next_state shape: {next_state.shape}, done shape: {done.shape}")

        # Critic 网络: TD目标 & TD误差
        td_tgt = reward + gamma * self.critic_network(next_state) * (1 - done)
        td_err = td_tgt - self.critic_network(state)

        advantage = self.compute_advantage(gamma, lmbda, td_err.cpu()).to(device)
        # logger.info(f"advantage std: {advantage.std()} | advantage mean: {advantage.mean()}")

        # detach() 方法一定要添加, 目的是将 logits 从计算图中移除, 即不保存梯度, 让它变成常量
        # https://github.com/boyu-ai/Hands-on-RL/issues/96
        old_action_dists = torch.distributions.Categorical(logits=self.actor_network(state).detach())
        # 这里要求的是对应 action 的概率, 而不是对应 state 下所有 action 的概率分布
        # detach() 方法一定要添加, 目的是将 old_log_probs 从计算图中移除, 即不保存梯度, 让它变成常量
        # https://github.com/boyu-ai/Hands-on-RL/issues/96
        old_log_probs = old_action_dists.log_prob(action).detach()
        # 检查并核对 old_log_probs 中是否包含 nan 值
        if torch.isnan(old_log_probs).any():
            logger.error(f"old_log_probs size: {old_log_probs.shape}")
            logger.error(f"old_log_probs contains nan: {torch.isnan(old_log_probs).any()}")
        
        # 更新价值函数
        critic_loss = torch.mean(torch.nn.functional.mse_loss(self.critic_network(state), td_tgt.detach()))
        # logger.success(f"critic_loss: {critic_loss.item():.4f}")
        self.c_optimizer.zero_grad()
        critic_loss.backward()
        self.c_optimizer.step()

        # 更新策略函数
        self.policy_learn(state, action, old_action_dists, old_log_probs, advantage)

        # 清空经验池中的数据
        self.memory_buffer.clear()
    
    def save_model(self, episodes):
        """
        保存模型到指定路径, 并根据实际情况删除老的模型
        """
        # 没有任何已存在的模型文件, 即首次启动训练
        if self.max_checkpoint_a is None:
            max_checkpoint_path_a = os.path.abspath(f"{save_dir}/{checkpoint_perfix_A}{episodes}.pth")
        # 已存在模型文件的情况
        else:
            max_checkpoint_path_a = os.path.abspath(f"{save_dir}/{checkpoint_perfix_A}{episodes + int(self.max_checkpoint_a["max_checkpoint_id"])}.pth")

        # 没有任何已存在的模型文件, 即首次启动训练
        if self.max_checkpoint_c is None:
            max_checkpoint_path_c = os.path.abspath(f"{save_dir}/{checkpoint_perfix_C}{episodes}.pth")
        # 已存在模型文件的情况
        else:
            max_checkpoint_path_c = os.path.abspath(f"{save_dir}/{checkpoint_perfix_C}{episodes + int(self.max_checkpoint_c["max_checkpoint_id"])}.pth")

        # 保存模型参数
        torch.save(self.actor_network.state_dict(), max_checkpoint_path_a)
        torch.save(self.critic_network.state_dict(), max_checkpoint_path_c)
        logger.info(f"Actor Model saved to {max_checkpoint_path_a}")
        logger.info(f"Critic Model saved to {max_checkpoint_path_c}")

        # 删掉老模型
        del_old_checkpoint(checkpoint_perfix=checkpoint_perfix_A)
        del_old_checkpoint(checkpoint_perfix=checkpoint_perfix_C)

## Step5 调整环境

In [None]:
# 定制环境
class CustomEnv(gym.Wrapper):
    """
    定制环境, 继承自 gym.Wrapper 类, 用于修改环境的行为或奖励机制
    """
    def __init__(self, env):
        super().__init__(env)

    def reset(self):
        """
        重置环境
        """
        # 重置观察结果
        observation = self.env.reset()

        return observation
    
    def step(self, action):
        """
        执行动作, 并调整了env 的行为或奖励机制
        """
        # 调用原始环境的 step 方法
        observation, reward, terminated, truncated, info = self.env.step(action)

        # 返回最终结果: observation, reward, terminated, truncated, info
        return observation, reward, terminated, truncated, info


## Step6 训练智能体

In [None]:
if is_training:
    # 训练用的主环境
    env = gym.make(env_id, render_mode=render_mode)
    env = CustomEnv(env)

    # 实例化智能体 (动作空间必须是离散的)
    if isinstance(env.action_space, gym.spaces.Discrete):
        action_size = env.action_space.n
        Agent = RLAgent(action_size=action_size)
    else:
        logger.error("Action space is not Discrete!")
        raise ValueError("Action space is not Discrete!")

    # 循环每个回合
    for episode in tqdm(range(num_train_episodes)):
        # 初始化环境
        state, info = env.reset()
        steps = 0
        total_reward = 0
        frame_buffer = deque(maxlen=frame_stack)
        current_action = None
        current_states = None
        next_states = None
        
        # 初始化帧缓冲区
        for _ in range(frame_stack): 
            frame_buffer.append(state)

        # 回合中的每一步
        for step in range(max_steps):
            # 处理当前状态
            current_states = Agent.processing_states(frame_buffer)

            # 选择动作 & 对数概率
            if step % frame_stack == 0:
                output = Agent.select_action(current_states)
                action, log_prob = output['action'].item(), output['log_prob']
                current_action = action
                logger.debug(f"Selected action: {action}")
            # 执行当前动作: current_action & 更新帧缓冲区
            observation, reward, terminated, truncated, info = env.step(current_action)
            total_reward += reward
            frame_buffer.append(observation)
            next_states = Agent.processing_states(frame_buffer)
            logger.debug(f"Step {step + 1} | Reward: {reward} | Total Reward: {total_reward} | Terminated: {terminated} | Truncated: {truncated} | Info: {info}")

            # 保存到记忆区: 如果该帧是决策帧, 则新建记忆区记录
            if step % frame_stack == 0:
                Agent.memory_buffer.append({"St": current_states, "At": current_action, "Rt": reward, "St+1": next_states, "Done": terminated})
            # 如果该帧不是决策帧, 则调整 & 完善记忆区记录
            else:
                # 奖励叠加
                Agent.memory_buffer[-1]["Rt"] += reward
                # 将 St+1 替换为最新的状态
                Agent.memory_buffer[-1]["St+1"] = next_states

            # 判断是否结束该回合
            if terminated or truncated:
                if total_reward >= reward_threshold:
                    logger.success(f"Episode finish, total step {step + 1} | Total Reward: {total_reward}")
                else:
                    logger.warning(f"Episode finish, total step {step + 1} | Total Reward: {total_reward}")
                total_reward = 0
                Agent.memory_buffer[-1]["Done"] = terminated
                break
        
        # 更新模型
        Agent.update()
        
        # 保存模型
        if (episode + 1) % save_freq == 0 and episode != 0:
            episodes = episode + 1
            Agent.save_model(episodes)       

## Step7 评估智能体

In [None]:
# 评估但不录制视频
if is_evaluate == 1 and need_record == 0:
    eval_env = gym.make(env_id, render_mode="human")
    eval_env = CustomEnv(eval_env)
# 评估且需要录制视频
elif is_evaluate == 1 and need_record == 1:
    eval_env = gym.make(env_id, render_mode="rgb_array")
    eval_env = CustomEnv(eval_env)

# 如果启用了评估
if is_evaluate == 1:
    # 初始化用于评估的参数
    frame_record = []
    max_reward = 0

    # 实例化用于评估的智能体
    Agent = RLAgent(action_size=eval_env.action_space.n)

    # 每个回合
    for episode in tqdm(range(num_eval_episodes)):
        # 初始化环境
        state, info = eval_env.reset()
        steps = 0
        total_reward = 0
        frame_buffer = deque(maxlen=frame_stack)
        current_action = None
        # 初始化帧缓冲区
        for _ in range(frame_stack): 
            frame_buffer.append(state)
            
        # 回合中的每一步
        for step in range(max_steps):
            # 处理当前状态
            current_states = Agent.processing_states(frame_buffer)
            # 选择动作
            if step % frame_stack == 0:
                output = Agent.select_action(current_states, sample=eval_sample_action)
                current_action = output["action"].item()
            # 执行该动作
            observation, reward, terminated, truncated, info = eval_env.step(current_action)
            total_reward += reward
            # 更新帧缓冲区
            frame_buffer.append(observation)
            # 如果需要记录视频, 则渲染画面 eval_env.render(), 然后将此画面添加到 frame_record 中
            if need_record:
                frame_record.append(eval_env.render())
            # 判断是否结束
            if terminated or truncated:
                # 如果需要记录视频, 则保留最好的记录
                if need_record and total_reward > max_reward:
                    np_frame_record = np.array(frame_record)
                    max_reward = total_reward
                    frame_record.clear()
                # 评估奖励
                if total_reward >= reward_threshold:
                    logger.success(f"Step {step + 1} | Total Reward: {total_reward}")
                else:
                    logger.warning(f"Step {step + 1} | Total Reward: {total_reward}")
                break

    # 记录评估结果(只记录最好的奖励轮次)
    if need_record:
        record_file = f"{os.path.abspath(os.path.join(save_dir, evaluate_record_perfix))}{int(max_reward)}.mp4"
        imageio.mimsave(record_file, np_frame_record, fps=evaluate_record_fps, quality=evaluate_record_quality)
        logger.info(f"The best evaluation record is: {record_file}")

    # 关闭环境
    eval_env.close()
    pygame.quit()