In [1]:
import gymnasium as gym
from gymnasium import spaces
from gymnasium.envs.registration import register
import numpy as np
import pygame

# ====================== 1. 定义自定义环境类 ======================
class GridWorldEnv(gym.Env):
    """2D网格世界环境，代理需导航至随机目标位置"""
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, size=5, render_mode=None):
        super().__init__()
        self.size = size
        self.window_size = 512

        # 动作空间：4个方向 [右, 上, 左, 下]
        self.action_space = spaces.Discrete(4)

        # 观测空间：字典包含代理和目标位置
        self.observation_space = spaces.Dict({
            "agent": spaces.Box(0, size-1, shape=(2,), dtype=int),
            "target": spaces.Box(0, size-1, shape=(2,), dtype=int)
        })

        # 动作到方向的映射
        self._action_to_direction = {
            0: np.array([1, 0]),  # 右
            1: np.array([0, 1]),  # 上
            2: np.array([-1, 0]), # 左
            3: np.array([0, -1]), # 下
        }

        self.render_mode = render_mode
        self.window = None
        self.clock = None

    def _get_obs(self):
        """返回观察值字典"""
        return {"agent": self._agent_location, "target": self._target_location}

    def _get_info(self):
        """返回曼哈顿距离信息"""
        return {"distance": np.linalg.norm(self._agent_location - self._target_location, ord=1)}

    def reset(self, seed=None, options=None):
        """重置环境并返回初始状态"""
        super().reset(seed=seed)

        # 随机生成代理位置
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # 生成不与代理重合的目标位置
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # 初始化渲染
        if self.render_mode == "human":
            self._render_frame()

        return self._get_obs(), self._get_info()

    def step(self, action):
        """执行一个动作"""
        direction = self._action_to_direction[action]
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        # 判断是否到达目标
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1 if terminated else 0
        truncated = False

        # 更新渲染
        if self.render_mode == "human":
            self._render_frame()

        return self._get_obs(), reward, terminated, truncated, self._get_info()

    def render(self):
        """渲染当前帧（PyGame实现）"""
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        """内部渲染逻辑"""
        if self.window is None and self.render_mode == "human":
            pygame.init()
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))  # 白色背景

        pix_square_size = self.window_size / self.size

        # 绘制目标（红色方块）
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )

        # 绘制代理（蓝色圆形）
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # 绘制网格线
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas, 0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3
            )
            pygame.draw.line(
                canvas, 0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()
            self.clock.tick(self.metadata["render_fps"])
        else:
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )

    def close(self):
        """关闭渲染资源"""
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

# ====================== 2. 注册环境到Gymnasium ======================
register(
    id="GridWorld-v0",
    entry_point=GridWorldEnv,  # 直接指向当前模块的类
    kwargs={"size": 5},        # 默认参数
)

# ====================== 3. 测试代码 ======================
if __name__ == "__main__":
    # 创建环境实例（启用可视化）
    env = gym.make(
        "GridWorld-v0",
        render_mode="human",  # 可选 "human" 或 "rgb_array"
        size=5                # 可覆盖默认参数
    )

    # 运行10步随机动作测试
    obs, info = env.reset()
    print("初始位置:", obs["agent"])
    print("目标位置:", obs["target"])

    for step in range(10):
        action = env.action_space.sample()  # 随机策略
        obs, reward, terminated, truncated, info = env.step(action)

        print(f"\nStep {step + 1}")
        print("动作:", ["右", "上", "左", "下"][action])
        print("新位置:", obs["agent"])
        print("奖励:", reward)
        print("距离目标:", info["distance"])

        if terminated:
            print("到达目标！环境重置。")
            obs, info = env.reset()

    env.close()

ModuleNotFoundError: No module named 'gymnasium'

In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordEpisodeStatistics, RecordVideo

# ====================== 参数配置 ======================
num_eval_episodes = 5                # 评估回合数
video_folder = "./gridworld_videos"   # 视频保存目录
env_id = "GridWorld-v0"               # 已注册的环境ID

# ====================== 创建带记录功能的环境 ======================
# 创建基础环境（必须启用rgb_array渲染模式）
base_env = gym.make(
    env_id,
    render_mode="rgb_array",  # 必须为视频录制启用
    size=5                     # 自定义环境参数（根据需求调整）
)

# 应用视频录制包装器（记录每个回合）
env = RecordVideo(
    base_env,
    video_folder=video_folder,
    name_prefix="eval",          # 视频文件名前缀（eval-episode-0.mp4）
    episode_trigger=lambda x: True  # 每个回合都录制
)

# 应用统计数据记录包装器
env = RecordEpisodeStatistics(env, buffer_length=num_eval_episodes)

# ====================== 评估循环 ======================
for episode_num in range(num_eval_episodes):
    obs, info = env.reset()
    done = False

    while not done:
        # 替换为你的智能体策略（此处使用随机动作）
        action = env.action_space.sample()  # 例如：agent.get_action(obs)

        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

env.close()

# ====================== 输出统计数据 ======================
print(f"\n评估结果（{num_eval_episodes} 回合）:")
print(f"耗时队列: {env.time_queue}")
print(f"总奖励队列: {env.return_queue}")
print(f"步数队列: {env.length_queue}")


评估结果（5 回合）:
耗时队列: deque([0.277017, 0.2498, 0.083586, 0.449519, 0.125515], maxlen=5)
总奖励队列: deque([1.0, 1.0, 1.0, 1.0, 1.0], maxlen=5)
步数队列: deque([44, 21, 14, 56, 16], maxlen=5)


In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordEpisodeStatistics, RecordVideo
import logging

# ====================== 参数配置 ======================
training_period = 250         # 每隔多少回合记录一次视频
num_training_episodes = 1000  # 总训练回合数
video_folder = "./training_videos"  # 视频保存目录
env_id = "GridWorld-v0"             # 已注册的环境ID
grid_size = 8                      # 网格尺寸

# ====================== 初始化日志 ======================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(message)s",
    handlers=[logging.FileHandler("training.log"), logging.StreamHandler()]
)

# ====================== 创建带记录功能的环境 ======================
base_env = gym.make(
    env_id,
    render_mode="rgb_array",  # 必须启用rgb_array渲染
    size=grid_size            # 自定义环境参数
)

# 应用视频录制包装器（每隔training_period回合记录一次）
env = RecordVideo(
    base_env,
    video_folder=video_folder,
    name_prefix="gridworld_training",
    episode_trigger=lambda x: x % training_period == 0  # 关键！定期触发
)

# 应用统计数据记录包装器（记录所有回合）
env = RecordEpisodeStatistics(env)

# ====================== 训练循环 ======================
for episode in range(num_training_episodes):
    obs, info = env.reset()
    done = False

    while not done:
        # 替换为你的智能体策略（此处使用随机动作示例）
        action = env.action_space.sample()  # 例如：agent.get_action(obs)

        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

    # 记录回合统计信息
    logging.info(
        f"Episode {episode + 1} | "
        f"Reward: {env.return_queue[-1]:.1f} | "
        f"Steps: {env.length_queue[-1]} | "
        f"Distance: {info['distance']}"
    )

env.close()

# ====================== 输出最终统计数据 ======================
print("\n训练统计摘要:")
print(f"平均奖励: {np.mean(env.return_queue):.2f}")
print(f"最大奖励: {np.max(env.return_queue)}")
print(f"平均步数: {np.mean(env.length_queue):.1f}")

  logger.warn(
2025-05-22 19:20:04,004 - Episode 1 | Reward: 1.0 | Steps: 468 | Distance: 0.0
2025-05-22 19:20:14,872 - Episode 2 | Reward: 1.0 | Steps: 298 | Distance: 0.0
2025-05-22 19:20:14,877 - Episode 3 | Reward: 1.0 | Steps: 22 | Distance: 0.0
2025-05-22 19:20:14,888 - Episode 4 | Reward: 1.0 | Steps: 118 | Distance: 0.0
2025-05-22 19:20:14,896 - Episode 5 | Reward: 1.0 | Steps: 53 | Distance: 0.0
2025-05-22 19:20:14,898 - Episode 6 | Reward: 1.0 | Steps: 2 | Distance: 0.0
2025-05-22 19:20:14,901 - Episode 7 | Reward: 1.0 | Steps: 12 | Distance: 0.0
2025-05-22 19:20:14,923 - Episode 8 | Reward: 1.0 | Steps: 190 | Distance: 0.0
2025-05-22 19:20:14,944 - Episode 9 | Reward: 1.0 | Steps: 153 | Distance: 0.0
2025-05-22 19:20:14,980 - Episode 10 | Reward: 1.0 | Steps: 149 | Distance: 0.0
2025-05-22 19:20:14,985 - Episode 11 | Reward: 1.0 | Steps: 57 | Distance: 0.0
2025-05-22 19:20:14,995 - Episode 12 | Reward: 1.0 | Steps: 102 | Distance: 0.0
2025-05-22 19:20:15,008 - Episode 13 | R


训练统计摘要:
平均奖励: 1.00
最大奖励: 1.0
平均步数: 170.4
