In [1]:
# For tips on running notebooks in Google Colab, see
# https://pytorch.org/tutorials/beginner/colab
%matplotlib inline

训练一个玛丽奥玩家的强化学习代理
作者: Yuansong Feng, Suraj Subramanian, Howard Wang, Steven Guo。

本教程将引导您了解深度强化学习的基础知识。最终，您将实现一个可以自主玩游戏的AI驱动的玛丽奥（使用Double Deep Q-Networks_）。

尽管此教程无需强化学习的先前知识，但您可以通过这些强化学习的概念进行熟悉，并随时使用这个速查表 作为您的参考。完整的代码可在这里_获取。

.. 图片:: /_static/img/mario.gif
:alt: 玛丽奥

In [2]:
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros

from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage

强化学习定义
环境（Environment）：代理与之交互并从中学习的世界。

动作（Action） 
�
a：代理如何响应环境。所有可能的动作集合被称为动作空间。

状态（State） 
�
s：环境的当前特征。环境可能处于的所有可能状态的集合被称为状态空间。

奖励（Reward） 
�
r：奖励是环境给代理的关键反馈。这是驱使代理学习并改变其未来动作的因素。在多个时间步骤上的奖励汇总被称为回报（Return）。

最优动作-价值函数（Optimal Action-Value function） 
�
∗
(
�
,
�
)
Q 
∗
 (s,a)：给出了如果您从状态 
�
s 开始，采取任意动作 
�
a ，然后对于每个未来时间步骤采取最大化回报的动作的预期回报。
�
Q 可以被认为是状态中动作的“质量”。我们尝试逼近这个函数。

环境
初始化环境
在玛丽奥游戏中，环境由管道、蘑菇和其他组件构成。

当玛丽奥执行一个动作时，环境会响应并返回改变后的（下一个）状态、奖励以及其他信息。

In [3]:
# Initialize Super Mario environment (in v0.26 change render mode to 'human' to see results on the screen)
if gym.__version__ < '0.26':
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True,render_mode='human')
else:
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='human', apply_api_compatibility=True)

# Limit the action-space to
#   0. walk right
#   1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])

env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

  logger.warn(
  logger.warn(


(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}


  if not isinstance(terminated, (bool, np.bool8)):


环境预处理
环境数据以 next_state 的形式返回给代理。如上所示，每个状态由一个大小为 [3, 240, 256] 的数组表示。通常，这比我们的代理需要的信息要多；例如，玛丽奥的行动并不依赖于管道或天空的颜色！

我们使用 包装器（Wrappers） 在将数据发送给代理之前对环境数据进行预处理。

GrayScaleObservation 是一个常见的包装器，用于将RGB图像转换为灰度图像；这样做可以减小状态表示的大小，而不会丢失有用的信息。现在每个状态的大小为： [1, 240, 256]

ResizeObservation 将每个观察结果缩小到一个正方形图像中。新的大小为： [1, 84, 84]

SkipFrame 是一个自定义的包装器，继承自 gym.Wrapper 并实现了 step() 函数。由于连续的帧差异不大，我们可以跳过n个中间帧而不会丢失太多信息。第n帧会累计每个跳过帧上累积的奖励。

FrameStack 是一个包装器，允许我们将环境的连续帧压缩成单个观察点，以供我们的学习模型使用。这样，我们可以根据玛丽奥在前几帧中的移动方向来确定他是降落还是跳跃。








In [4]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """每隔 `skip` 帧返回一次"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """重复动作并累计奖励"""
        total_reward = 0.0
        for i in range(self._skip):
            # 累计奖励并重复相同的动作
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # 重排 [H, W, C] 数组为 [C, H, W] 张量
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation


# 将包装器应用于环境
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
    env = FrameStack(env, num_stack=4, new_step_api=True)
else:
    env = FrameStack(env, num_stack=4)


经过上述包装器对环境的应用后，最终的包装状态由4个连续的灰度帧堆叠在一起组成，如上图左侧所示。每当玛丽奥执行一个动作时，环境都会以这种结构的状态进行响应。这种结构由一个大小为[4, 84, 84]的3D数组表示。

.. 图片:: /_static/img/mario_env.png
:alt: 图片

这段描述解释了经过处理后的环境状态是如何由四个灰度帧堆叠而成，并给出了相应的图像以进一步说明这一概念。

代理
我们创建一个名为Mario的类来代表游戏中的代理。玛丽奥应该能够：

根据当前状态（环境的） 执行基于最优动作策略的动作。

记住 经验。经验 = （当前状态，当前动作，奖励，下一个状态）。玛丽奥会缓存并稍后回忆他的经验以更新他的动作策略。

随着时间的推移学习 更好的动作策略。

这段描述概述了代理的三个主要功能：根据当前状态执行动作、记住和回忆经验，以及随时间学习并改进其策略。

In [5]:
class Mario:
    def __init__(self):
        pass

    def act(self, state):
        """给定一个状态，选择一个ε-贪婪（epsilon-greedy）动作"""
        pass

    def cache(self, experience):
        """将经验添加到内存中"""
        pass

    def recall(self):
        """从内存中抽样经验"""
        pass

    def learn(self):
        """使用一批经验更新在线动作值（Q值）函数"""
        pass



在接下来的部分中，我们将填充玛丽奥的参数并定义他的函数。


执行
对于任何给定的状态，代理可以选择执行最优的动作（利用）或随机的动作（探索）。

玛丽奥有一个概率 self.exploration_rate 来进行随机探索；当他选择利用时，他依赖于 MarioNet（在学习部分中实现）来提供最优的动作。

In [6]:
class Mario:
    def __init__(self, state_dim, action_dim, save_dir):
        # 初始化玛丽奥的状态维度、动作维度和保存目录
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.save_dir = save_dir

        # 检测设备是否支持CUDA，如果支持，则使用CUDA，否则使用CPU
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # 初始化Mario的DNN以预测最优动作 - 我们在后面的“学习”部分实现这一部分
        self.net = MarioNet(self.state_dim, self.action_dim).float()
        self.net = self.net.to(device=self.device)

        # 设置探索率相关的参数
        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0

        # 每隔多少经验保存一次Mario Net
        self.save_every = 5e5  

    def act(self, state):
        """
        给定一个状态，选择一个ε-贪婪动作并更新步骤的值。
        
        输入:
        state(``LazyFrame``): 当前状态的单个观察值，维度为 (state_dim)
        
        输出:
        ``action_idx`` (``int``): 表示玛丽奥将执行的动作的整数索引
        """
        # 探索
        if np.random.rand() < self.exploration_rate:
            action_idx = np.random.randint(self.action_dim)
        # 利用
        else:
            state = state[0].__array__() if isinstance(state, tuple) else state.__array__()
            state = torch.tensor(state, device=self.device).unsqueeze(0)
            action_values = self.net(state, model="online")
            action_idx = torch.argmax(action_values, axis=1).item()

        # 减少探索率
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)

        # 增加步骤
        self.curr_step += 1
        return action_idx


缓存和回忆
这两个函数充当玛丽奥的“记忆”过程。

cache()：每次玛丽奥执行一个动作时，他都会将experience（经验）存储到他的记忆中。他的经验包括当前的状态、执行的动作、从动作中得到的奖励、下一个状态以及游戏是否完成。

recall()：玛丽奥从他的记忆中随机抽样一批经验，并使用这些经验来学习游戏。

In [7]:
class Mario(Mario):  # 继承父类以保持连续性
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        # 初始化记忆回放缓冲区
        self.memory = TensorDictReplayBuffer(storage=LazyMemmapStorage(100000, device=torch.device("cpu")))
        self.batch_size = 32  # 批处理大小

    def cache(self, state, next_state, action, reward, done):
        """
        将经验存储到self.memory（回放缓冲区）中

        输入:
        state (``LazyFrame``),
        next_state (``LazyFrame``),
        action (``int``),
        reward (``float``),
        done(``bool``))
        """
        # 辅助函数，如果输入是元组，则取第一个元素
        def first_if_tuple(x):
            return x[0] if isinstance(x, tuple) else x
        
        # 将LazyFrame转换为数组
        state = first_if_tuple(state).__array__()
        next_state = first_if_tuple(next_state).__array__()

        # 转换为PyTorch张量
        state = torch.tensor(state)
        next_state = torch.tensor(next_state)
        action = torch.tensor([action])
        reward = torch.tensor([reward])
        done = torch.tensor([done])

        # 将经验添加到记忆中
        self.memory.add(TensorDict({"state": state, "next_state": next_state, "action": action, "reward": reward, "done": done}, batch_size=[]))

    def recall(self):
        """
        从记忆中检索一批经验
        """
        # 从记忆中随机抽样一批经验，并将其移到适当的设备上
        batch = self.memory.sample(self.batch_size).to(self.device)
        
        # 获取批量中的各个经验项
        state, next_state, action, reward, done = (batch.get(key) for key in ("state", "next_state", "action", "reward", "done"))
        
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()


学习
玛丽奥在其内部使用了DDQN算法。DDQN使用两个卷积神经网络 - 
�
�
�
�
�
�
�
Q 
online
​
  和 
�
�
�
�
�
�
�
Q 
target
​
 ，它们独立地逼近最优动作值函数。

在我们的实现中，我们跨 
�
�
�
�
�
�
�
Q 
online
​
  和 
�
�
�
�
�
�
�
Q 
target
​
  共享特征生成器 features，但为每个网络维护独立的全连接分类器。为了防止通过反向传播更新，
�
�
�
�
�
�
�
θ 
target
​
 （
�
�
�
�
�
�
�
Q 
target
​
  的参数）被冻结。相反，它会定期与 
�
�
�
�
�
�
�
θ 
online
​
  同步（稍后会详细说明）。

神经网络
接下来将讨论神经网络的具体实现和细节。

In [8]:
class MarioNet(nn.Module):
    """迷你CNN结构
  输入 -> (conv2d + relu) x 3 -> 展平 -> (dense + relu) x 2 -> 输出
  """

    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, h, w = input_dim

        if h != 84:
            raise ValueError(f"期望输入的高度为: 84, 实际为: {h}")
        if w != 84:
            raise ValueError(f"期望输入的宽度为: 84, 实际为: {w}")

        # 构建在线模型和目标模型
        self.online = self.__build_cnn(c, output_dim)
        self.target = self.__build_cnn(c, output_dim)
        
        # 将在线模型的权重加载到目标模型中
        self.target.load_state_dict(self.online.state_dict())

        # 冻结目标模型的参数，不进行反向传播更新
        for p in self.target.parameters():
            p.requires_grad = False

    def forward(self, input, model):
        """前向传播函数，根据模型参数决定是在线模型还是目标模型"""
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

    def __build_cnn(self, c, output_dim):
        """构建CNN结构"""
        return nn.Sequential(
            nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim),
        )



TD 估计与 TD 目标
在学习过程中涉及到两个值：

TD 估计 - 对于给定状态 
�
s，预测的最优 
�
∗
Q 
∗
  值为：

�
�
�
=
�
�
�
�
�
�
�
∗
(
�
,
�
)
TD 
e
​
 =Q 
online
∗
​
 (s,a)
TD 目标 - 当前奖励和下一个状态 
�
′
s 
′
  中估计的 
�
∗
Q 
∗
  值的聚合，计算方法如下：

选择在下一个状态 
�
′
s 
′
  中使得 
�
�
�
�
�
�
�
(
�
′
,
�
)
Q 
online
​
 (s 
′
 ,a) 最大化的动作 
�
′
a 
′
 ：
�
′
=
argmax
�
�
�
�
�
�
�
�
(
�
′
,
�
)
a 
′
 =argmax 
a
​
 Q 
online
​
 (s 
′
 ,a)
计算 TD 目标：
�
�
�
=
�
+
�
�
�
�
�
�
�
�
∗
(
�
′
,
�
′
)
TD 
t
​
 =r+γQ 
target
∗
​
 (s 
′
 ,a 
′
 )
由于我们不知道下一个动作 
�
′
a 
′
  会是什么，因此在下一个状态 
�
′
s 
′
  中我们使用最大化 
�
�
�
�
�
�
�
Q 
online
​
  的动作 
�
′
a 
′
 。

值得注意的是，我们在 td_target() 方法上使用了 @torch.no_grad() 装饰器来禁用此处的梯度计算（因为我们不需要在 
�
�
�
�
�
�
�
θ 
target
​
  上进行反向传播）。

In [9]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.gamma = 0.9  # 定义折扣因子为 0.9

    def td_estimate(self, state, action):
        """
        计算TD估计值
        """
        # 获取当前动作对应的在线模型下的Q值
        current_Q = self.net(state, model="online")[
            np.arange(0, self.batch_size), action
        ]  # Q_online(s,a)
        return current_Q

    @torch.no_grad()
    def td_target(self, reward, next_state, done):
        """
        计算TD目标值
        """
        # 获取下一个状态在在线模型下的Q值
        next_state_Q = self.net(next_state, model="online")
        
        # 选择在下一个状态中最优的动作
        best_action = torch.argmax(next_state_Q, axis=1)
        
        # 获取目标模型下的Q值
        next_Q = self.net(next_state, model="target")[
            np.arange(0, self.batch_size), best_action
        ]
        
        # 计算TD目标值
        return (reward + (1 - done.float()) * self.gamma * next_Q).float()


更新模型
当玛丽奥从其回放缓冲区中采样输入时，我们计算 
�
�
�
TD 
t
​
  和 
�
�
�
TD 
e
​
 ，然后反向传播这个损失到 
�
online
Q 
online
​
 ，以更新其参数 
�
online
θ 
online
​
 （
�
α 是传递给 optimizer 的学习率 lr）：

�
online
←
�
online
+
�
∇
(
�
�
�
−
�
�
�
)
θ 
online
​
 ←θ 
online
​
 +α∇(TD 
e
​
 −TD 
t
​
 )
�
target
θ 
target
​
  不通过反向传播进行更新。相反，我们定期将 
�
online
θ 
online
​
  复制到 
�
target
θ 
target
​
 ：

�
target
←
�
online
θ 
target
​
 ←θ 
online
​
 







In [10]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()

    def update_Q_online(self, td_estimate, td_target):
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())

#### Save checkpoint




In [11]:
class Mario(Mario):
    def save(self):
        """
        保存 MarioNet 模型和探索率
        """
        save_path = (
            self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
        )
        torch.save(
            dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
            save_path,
        )
        print(f"MarioNet saved to {save_path} at step {self.curr_step}")


#### Load checkpoint

In [12]:
def load_model_weights(agent, path):
    """加载模型权重到代理网络"""
    if torch.cuda.is_available():
        state_dict = torch.load(path)
    else:
        state_dict = torch.load(path, map_location=torch.device('cpu'))
    
    agent.net.load_state_dict(state_dict['model'])
    agent.exploration_rate = state_dict['exploration_rate']


综合所有内容
在这里，我们将所有的内容整合在一起，包括环境的初始化、预处理、代理的定义、网络结构、学习算法以及模型的保存和加载等步骤。通过上述步骤，我们可以构建一个完整的玛丽奥强化学习代理，使其能够在游戏中学习和执行复杂的任务。

In [13]:
# 定义一个新的 Mario 类，该类从基础 Mario 类继承
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        
        # 设置开始训练前需要的最小经验数量
        self.burnin = 1e4  
        
        # 每3个经验更新一次 Q_online
        self.learn_every = 3  
        
        # 每1e4个经验，进行 Q_target 和 Q_online 的同步
        self.sync_every = 1e4  

    # 定义学习方法
    def learn(self):
        # 如果当前步数可以被 sync_every 整除，同步 Q_target 和 Q_online
        if self.curr_step % self.sync_every == 0:
            self.sync_Q_target()

        # 如果当前步数可以被 save_every 整除，保存模型
        if self.curr_step % self.save_every == 0:
            self.save()

        # 如果当前步数小于 burnin，暂时不进行学习
        if self.curr_step < self.burnin:
            return None, None

        # 如果当前步数不能被 learn_every 整除，暂时不进行学习
        if self.curr_step % self.learn_every != 0:
            return None, None

        # 从记忆中随机抽样获取经验数据
        state, next_state, action, reward, done = self.recall()

        # 计算 TD 估计值
        td_est = self.td_estimate(state, action)

        # 计算 TD 目标值
        td_tgt = self.td_target(reward, next_state, done)

        # 通过 Q_online 反向传播并更新网络权重
        loss = self.update_Q_online(td_est, td_tgt)

        return (td_est.mean().item(), loss)


### Logging




In [14]:
# 导入所需库
import numpy as np
import time, datetime
import matplotlib.pyplot as plt

# 定义 MetricLogger 类用于日志记录和性能度量
class MetricLogger:
    def __init__(self, save_dir):
        # 设置保存日志文件路径
        self.save_log = save_dir / "log"
        # 在日志文件中写入列标题
        with open(self.save_log, "w") as f:
            f.write(
                f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
                f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
                f"{'TimeDelta':>15}{'Time':>20}\n"
            )
        
        # 设置保存图像的路径
        self.ep_rewards_plot = save_dir / "reward_plot.jpg"
        self.ep_lengths_plot = save_dir / "length_plot.jpg"
        self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
        self.ep_avg_qs_plot = save_dir / "q_plot.jpg"

        # 初始化历史性能指标
        self.ep_rewards = []
        self.ep_lengths = []
        self.ep_avg_losses = []
        self.ep_avg_qs = []

        # 初始化移动平均指标
        self.moving_avg_ep_rewards = []
        self.moving_avg_ep_lengths = []
        self.moving_avg_ep_avg_losses = []
        self.moving_avg_ep_avg_qs = []

        # 初始化当前的回合指标
        self.init_episode()

        # 记录时间
        self.record_time = time.time()

    # 记录每一步的信息
    def log_step(self, reward, loss, q):
        self.curr_ep_reward += reward
        self.curr_ep_length += 1
        if loss:
            self.curr_ep_loss += loss
            self.curr_ep_q += q
            self.curr_ep_loss_length += 1
 
    # 标记回合结束
    def log_episode(self):
        self.ep_rewards.append(self.curr_ep_reward)
        self.ep_lengths.append(self.curr_ep_length)
        if self.curr_ep_loss_length == 0:
            ep_avg_loss = 0
            ep_avg_q = 0
        else:
            ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
            ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
        self.ep_avg_losses.append(ep_avg_loss)
        self.ep_avg_qs.append(ep_avg_q)
        self.init_episode()

    # 初始化当前回合的指标
    def init_episode(self):
        self.curr_ep_reward = 0.0
        self.curr_ep_length = 0
        self.curr_ep_loss = 0.0
        self.curr_ep_q = 0.0
        self.curr_ep_loss_length = 0

    # 记录每一回合的性能指标
    def record(self, episode, epsilon, step):
        # 计算最近100回合的平均性能
        mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
        mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
        self.moving_avg_ep_rewards.append(mean_ep_reward)
        self.moving_avg_ep_lengths.append(mean_ep_length)
        self.moving_avg_ep_avg_losses.append(mean_ep_loss)
        self.moving_avg_ep_avg_qs.append(mean_ep_q)

        # 计算时间差
        last_record_time = self.record_time
        self.record_time = time.time()
        time_since_last_record = np.round(self.record_time - last_record_time, 3)

        # 打印当前回合的性能指标
        print(
            f"Episode {episode} - "
            f"Step {step} - "
            f"Epsilon {epsilon} - "
            f"Mean Reward {mean_ep_reward} - "
            f"Mean Length {mean_ep_length} - "
            f"Mean Loss {mean_ep_loss} - "
            f"Mean Q Value {mean_ep_q} - "
            f"Time Delta {time_since_last_record} - "
            f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
        )

        # 在日志文件中追加当前回合的性能指标
        with open(self.save_log, "a") as f:
            f.write(
                f"{episode:8d}{step:8d}{epsilon:10.3f}"
                f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
                f"{time_since_last_record:15.3f}"
                f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
            )

        # 绘制并保存性能图像
        for metric in ["ep_lengths", "ep_avg_losses", "ep_avg_qs", "ep_rewards"]:
            plt.clf()
            plt.plot(getattr(self, f"moving_avg_{metric}"), label=f"moving_avg_{metric}")
            plt.legend()
            plt.savefig(getattr(self, f"{metric}_plot"))


让我们开始吧！
在这个示例中，我们运行了40个回合的训练循环，但为了Mario真正学会他的世界，我们建议至少运行40,000个回合！

In [15]:
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()

save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

logger = MetricLogger(save_dir)

# 是否加载预训练模型的标志
load_pretrained_model = True # 设置为True或False，取决于你是否想要加载预训练模型
# # 查找.pth文件
# pretrained_model_path = None
# for file_path in save_dir.glob("*.pth"):
#     pretrained_model_path = file_path
#     break  # 找到第一个.pth文件后立即停止
pretrained_model_path = "./saved_models/mario_net_29.chkpt"

if load_pretrained_model:
    # 检查是否找到了.pth文件
    if pretrained_model_path:
        print(f"找到预训练模型：{pretrained_model_path}")
        load_model_weights(mario, pretrained_model_path)
        print("已加载预训练模型")
    else:
        print("未找到任何.pth文件")
    

# 进行40个回合的训练
episodes = 40
for e in range(episodes):

    # 重置环境状态
    state = env.reset()

    # 开始游戏循环
    while True:

        # 使代理基于当前状态做出行动
        action = mario.act(state)

        # 代理执行动作
        next_state, reward, done, trunc, info = env.step(action)

        # 记录经验
        mario.cache(state, next_state, action, reward, done)

        # 学习和更新代理
        q, loss = mario.learn()

        # 记录日志
        logger.log_step(reward, loss, q)

        # 更新状态并渲染环境
        state = next_state
        #env.render()

        # 检查游戏是否结束
        if done or info["flag_get"]:
            break

    # 记录每个回合的总结
    logger.log_episode()

    # 每20个回合或最后一个回合时记录和打印日志
    if (e % 20 == 0) or (e == episodes - 1):
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)


Using CUDA: False

找到预训练模型：./saved_models/mario_net_22.chkpt
已加载预训练模型
