In [1]:
!pip install minigrid
import gymnasium as gym
from minigrid.wrappers import ImgObsWrapper
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import math
import numpy as np
from collections import deque, namedtuple
import matplotlib.pyplot as plt

# --- ハイパーパラメータ設定 ---
# カリキュラム学習の設定
MAZE_SIZES = [5, 8, 16]
EPISODES_PER_STAGE = 1000

# DQNエージェントの設定
GAMMA = 0.99
EPSILON_START = 1.0
EPSILON_END = 0.05
EPSILON_DECAY = 30000
LEARNING_RATE = 3e-5  # ベースラインと合わせた学習率
REPLAY_BUFFER_SIZE = 50000
BATCH_SIZE = 128
TARGET_UPDATE_FREQ = 100

# デバイス設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 経験を保存するためのデータ構造
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'terminated'))

# リプレイバッファ
class ReplayBuffer:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
    def push(self, *args):
        self.memory.append(Transition(*args))
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def __len__(self):
        return len(self.memory)

# CNNベースのQネットワーク
class QNetwork(nn.Module):
    def __init__(self, obs_space_shape, action_space_n):
        super(QNetwork, self).__init__()
        h, w, c = obs_space_shape
        self.cnn = nn.Sequential(
            nn.Conv2d(c, 16, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.Flatten(),
        )
        with torch.no_grad():
            cnn_out_size = self.cnn(torch.zeros(1, c, h, w)).shape[1]
        self.fc = nn.Sequential(
            nn.Linear(cnn_out_size, 256), nn.ReLU(),
            nn.Linear(256, action_space_n)
        )
    def forward(self, x):
        x = x.to(device).float() / 255.0
        x = x.permute(0, 3, 1, 2)
        return self.fc(self.cnn(x))

# DQNエージェント
class DQNAgent:
    def __init__(self, obs_space_shape, action_space_n):
        self.action_space_n = action_space_n
        self.policy_net = QNetwork(obs_space_shape, action_space_n).to(device)
        self.target_net = QNetwork(obs_space_shape, action_space_n).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE)
        self.replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)
        self.steps_done = 0

    def select_action(self, state):
        self.steps_done += 1
        eps_threshold = EPSILON_END + (EPSILON_START - EPSILON_END) * \
                        math.exp(-1. * self.steps_done / EPSILON_DECAY)
        if random.random() > eps_threshold:
            with torch.no_grad():
                return self.policy_net(state).max(1)[1].view(1, 1)
        else:
            return torch.tensor([[random.randrange(self.action_space_n)]], device=device, dtype=torch.long)

    def update_model(self):
        if len(self.replay_buffer) < BATCH_SIZE: return
        transitions = self.replay_buffer.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        current_q_values = self.policy_net(state_batch).gather(1, action_batch)
        with torch.no_grad():
            next_q_values = torch.zeros(BATCH_SIZE, device=device)
            next_q_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0]
            target_q_values = reward_batch + (GAMMA * next_q_values)
        loss = F.mse_loss(current_q_values, target_q_values.unsqueeze(1))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def sync_target_network(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

# --- メインの学習ループ ---
if __name__ == "__main__":
    agent = None
    all_stage_rewards = {}

    for stage, size in enumerate(MAZE_SIZES):
        print(f"--- Curriculum Stage {stage + 1}: {size}x{size} Maze ---")
        env = gym.make(f'MiniGrid-Empty-{size}x{size}-v0')
        env = ImgObsWrapper(env)

        if agent is None:
            obs_shape = env.observation_space.shape
            action_n = env.action_space.n
            agent = DQNAgent(obs_shape, action_n)

        stage_rewards = []
        for episode in range(EPISODES_PER_STAGE):
            obs, info = env.reset()
            state = torch.tensor(obs, device=device).unsqueeze(0).float()
            terminated, truncated, episode_reward = False, False, 0
            while not terminated and not truncated:
                action = agent.select_action(state)
                obs, reward, terminated, truncated, info = env.step(action.item())
                episode_reward += reward
                next_state = torch.tensor(obs, device=device).unsqueeze(0).float() if not terminated else None
                agent.replay_buffer.push(state, action, next_state, torch.tensor([reward], device=device), torch.tensor(terminated, device=device))
                state = next_state
                agent.update_model()

            stage_rewards.append(episode_reward)

            if (episode + 1) % TARGET_UPDATE_FREQ == 0:
                agent.sync_target_network()

            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(stage_rewards[-100:])
                print(f"Stage {stage+1}, Episode {episode+1}, Avg Reward (last 100): {avg_reward:.2f}")

        all_stage_rewards[f"{size}x{size}"] = stage_rewards
        print(f"--- Stage {stage + 1} Complete ---")

    env.close()

    # 結果を保存
    curriculum_rewards_flat = [reward for stage_rewards in all_stage_rewards.values() for reward in stage_rewards]
    np.save("curriculum_rewards.npy", np.array(curriculum_rewards_flat))
    print("\nCurriculum experiment results saved to curriculum_rewards.npy")

Collecting minigrid
  Downloading minigrid-3.0.0-py3-none-any.whl.metadata (6.7 kB)
Downloading minigrid-3.0.0-py3-none-any.whl (136 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/136.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m136.7/136.7 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: minigrid
Successfully installed minigrid-3.0.0
Using device: cuda
--- Curriculum Stage 1: 5x5 Maze ---
Stage 1, Episode 100, Avg Reward (last 100): 0.31
Stage 1, Episode 200, Avg Reward (last 100): 0.46
Stage 1, Episode 300, Avg Reward (last 100): 0.55
Stage 1, Episode 400, Avg Reward (last 100): 0.80
Stage 1, Episode 500, Avg Reward (last 100): 0.86
Stage 1, Episode 600, Avg Reward (last 100): 0.87
Stage 1, Episode 700, Avg Reward (last 100): 0.84
Stage 1, Episode 800, Avg Reward (last 100): 0.88
Stage 1, Episode 900, Avg Reward (last 100): 0.90
Stage 1, Episode 1000, Avg Reward (last