In [1]:
!pip install gymnasium minigrid imageio

Collecting minigrid
  Downloading minigrid-3.0.0-py3-none-any.whl.metadata (6.7 kB)
Downloading minigrid-3.0.0-py3-none-any.whl (136 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m136.7/136.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: minigrid
Successfully installed minigrid-3.0.0


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F  # ✅ Pour F.smooth_l1_loss
import torch.optim as optim
import numpy as np
from collections import deque, namedtuple
import gymnasium as gym
from minigrid.envs.empty import EmptyEnv
from minigrid.wrappers import RGBImgObsWrapper
import imageio
import os
import time
import random

  File "/usr/local/lib/python3.11/dist-packages/gymnasium/envs/registration.py", line 594, in load_plugin_envs
    fn()
  File "/usr/local/lib/python3.11/dist-packages/shimmy/registration.py", line 304, in register_gymnasium_envs
    _register_atari_envs()
  File "/usr/local/lib/python3.11/dist-packages/shimmy/registration.py", line 205, in _register_atari_envs
    import ale_py
  File "/usr/local/lib/python3.11/dist-packages/ale_py/__init__.py", line 68, in <module>
    register_v0_v4_envs()
  File "/usr/local/lib/python3.11/dist-packages/ale_py/registration.py", line 178, in register_v0_v4_envs
    _register_rom_configs(legacy_games, obs_types, versions)
  File "/usr/local/lib/python3.11/dist-packages/ale_py/registration.py", line 63, in _register_rom_configs
    gymnasium.register(
    ^^^^^^^^^^^^^^^^^^
AttributeError: partially initialized module 'gymnasium' has no attribute 'register' (most likely due to a circular import)
[0m
  logger.warn(f"plugin: {plugin.value} raised {trace

In [3]:
# Autoriser les types numpy pour charger le modèle
torch.serialization.add_safe_globals([np.core.multiarray.scalar, np.dtype])

In [4]:
# Architecture du réseau
class EnhancedDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((4, 4)),
            nn.Flatten()
        )
        with torch.no_grad():
            dummy = torch.zeros(1, *input_shape)
            conv_out = self.conv(dummy).shape[1]
        self.fc = nn.Sequential(
            nn.Linear(conv_out, 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )
    def forward(self, x):
        return self.fc(self.conv(x))

In [5]:
# Environnement personnalisé avec positions aléatoires
class RandomGoalEmptyEnv(EmptyEnv):
    def __init__(self, size=8):
        super().__init__(size=size)
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self._gen_grid(self.width, self.height)
        self.agent_pos = self._empty_cell()
        self._goal_pos = self._empty_cell()
        return self.gen_obs(), {}
    def _empty_cell(self):
        while True:
            pos = self.np_random.integers(0, self.width, size=2)
            if self.grid.get(*pos) is None and tuple(pos) != tuple(self.agent_pos):
                return tuple(pos)

In [6]:
# Prétraitement des observations
def preprocess_observation(obs):
    state = obs['image'].transpose(2, 0, 1).astype(np.float32) / 255.0
    return (state - 0.5) / 0.5  # Normalisation à [-1, 1]

# Structure de données pour le buffer de replay
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer(deque):
    def __init__(self, capacity):
        super().__init__([], maxlen=capacity)
    def push(self, *args):
        self.append(Transition(*args))
    def sample(self, batch_size):
        return random.sample(list(self), batch_size)

In [7]:
# Sélection d'action avec epsilon-greedy
def select_action(state, policy_net, device, steps_done, n_actions, eval_mode=False):
    eps = EPS_END + (EPS_START - EPS_END) * np.exp(-steps_done / EPS_DECAY)
    if eval_mode or random.random() > eps:
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0).to(device)
            return policy_net(state_t).argmax().item(), eps
    else:
        return random.randint(0, n_actions-1), eps

In [8]:
def optimize(policy_net, target_net, buffer, optimizer, device):
    if len(buffer) < BATCH_SIZE:
        return 0.0
    transitions = buffer.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))
    states = torch.FloatTensor(np.array(batch.state)).to(device)
    actions = torch.LongTensor(batch.action).unsqueeze(1).to(device)
    rewards = torch.FloatTensor(batch.reward).to(device)
    dones = torch.BoolTensor(batch.done).to(device)
    non_final_mask = ~dones
    non_final_next_states = torch.FloatTensor(
        np.array([s for s, d in zip(batch.next_state, batch.done) if not d])
    ).to(device)
    current_q = policy_net(states).gather(1, actions)
    next_q_values = torch.zeros(BATCH_SIZE, device=device)
    if len(non_final_next_states) > 0:
        with torch.no_grad():
            next_actions = policy_net(non_final_next_states).argmax(1, keepdim=True)
            next_q_values[non_final_mask] = target_net(non_final_next_states).gather(1, next_actions).squeeze()
    target_values = rewards + GAMMA * next_q_values * (~dones).float()
    loss = F.smooth_l1_loss(current_q, target_values.unsqueeze(1))  # ✅ Utilisation de F
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
    optimizer.step()
    return loss.item()

In [9]:
# Évaluation de l'agent
def evaluate_agent(policy_net, env, n_episodes=3):
    total_rewards = []
    steps_per_episode = []
    for _ in range(n_episodes):
        obs, _ = env.reset()
        state = preprocess_observation(obs)
        done = False
        total_reward = 0
        steps = 0
        while not done and steps < MAX_STEPS_PER_EPISODE:
            action, _ = select_action(state, policy_net, DEVICE, 0, env.action_space.n, eval_mode=True)
            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = preprocess_observation(obs)
            total_reward += reward
            steps += 1
        total_rewards.append(total_reward)
        steps_per_episode.append(steps)
    return np.mean(total_rewards), np.mean(steps_per_episode)

In [10]:
# Enregistrement de la vidéo
def record_video(model, env, video_path="videos/minigrid_video.mp4", max_episodes=3):
    frames = []
    for ep in range(max_episodes):
        obs, _ = env.reset()
        state = preprocess_observation(obs)
        done = False
        while not done:
            frame = env.render()  # ✅ Mode 'rgb_array' forcé à la création de l'environnement
            if frame.ndim == 3:
                frames.append(frame)
            action = select_action(state, model, DEVICE, 0, env.action_space.n, eval_mode=True)[0]
            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = preprocess_observation(obs)
    if len(frames) > 0:
        imageio.mimsave(video_path, frames, fps=10)
        print(f"Vidéo sauvegardée : {video_path}")
    else:
        print("Aucune frame capturée pour la vidéo.")
    env.close()

In [11]:
# Entraînement principal
def train_agent(num_episodes=500):
    # Création de l'environnement avec mode 'rgb_array'
    env = gym.make('MiniGrid-Empty-8x8-v0', render_mode='rgb_array')
    env = RGBImgObsWrapper(env)
    eval_env = gym.make('MiniGrid-Empty-8x8-v0', render_mode='rgb_array')
    eval_env = RGBImgObsWrapper(eval_env)
    
    obs_shape = env.observation_space['image'].shape
    obs_shape = (obs_shape[2], obs_shape[0], obs_shape[1])  # Format CHW
    n_actions = env.action_space.n
    
    policy_net = EnhancedDQN(obs_shape, n_actions).to(DEVICE)
    target_net = EnhancedDQN(obs_shape, n_actions).to(DEVICE)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()
    
    optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
    buffer = ReplayBuffer(MEMORY_SIZE)
    best_reward = -float('inf')
    steps_total = 0
    os.makedirs("videos", exist_ok=True)
    
    for episode in range(1, num_episodes + 1):
        obs, _ = env.reset()
        state = preprocess_observation(obs)
        total_reward = 0
        done = False
        episode_steps = 0
        episode_loss = 0
        while not done and episode_steps < MAX_STEPS_PER_EPISODE:
            action, eps = select_action(state, policy_net, DEVICE, steps_total, n_actions)
            steps_total += 1
            episode_steps += 1
            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            next_state = preprocess_observation(obs) if not done else None
            shaped_reward = reward * 10.0
            if not done:
                shaped_reward -= 0.01
            buffer.push(state, action, next_state, shaped_reward, done)
            state = next_state
            total_reward += reward
            loss = optimize(policy_net, target_net, buffer, optimizer, DEVICE)
            if loss:
                episode_loss += loss
            for target_param, source_param in zip(target_net.parameters(), policy_net.parameters()):
                target_param.data.copy_(TAU * source_param.data + (1 - TAU) * target_param.data)
        avg_loss = episode_loss / episode_steps if episode_steps > 0 else 0
        if episode % 10 == 0:
            eval_reward, eval_steps = evaluate_agent(policy_net, eval_env)
            print(f"Évaluation Ep {episode}: Reward: {eval_reward:.2f}, Steps: {eval_steps:.1f}")
            if eval_reward > best_reward:
                best_reward = eval_reward
                torch.save({
                    'episode': episode,
                    'model_state_dict': policy_net.state_dict(),
                    'reward': best_reward,
                }, "dqn_best_model.pth")
                print(f"Meilleur modèle sauvegardé avec récompense {best_reward:.2f}")
        print(f"Ep {episode:4d} | Reward: {total_reward:6.2f} | Steps: {episode_steps:3d} | Eps: {eps:.3f} | Loss: {avg_loss:.4f}")
    record_video(policy_net, env)
    env.close()
    eval_env.close()
    return policy_net

In [12]:
# Hyperparamètres
BATCH_SIZE = 64
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 10000
TARGET_UPDATE = 50
LEARNING_RATE = 5e-4
MEMORY_SIZE = 50000
MAX_STEPS_PER_EPISODE = 500
TAU = 0.005
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

In [13]:
if __name__ == "__main__":
    print("Démarrage de l'entraînement DQN optimisé...")
    trained_model = train_agent(num_episodes=500)

Démarrage de l'entraînement DQN optimisé...
Ep    1 | Reward:   0.00 | Steps: 256 | Eps: 0.975 | Loss: 0.0002
Ep    2 | Reward:   0.00 | Steps: 256 | Eps: 0.951 | Loss: 0.0001
Ep    3 | Reward:   0.00 | Steps: 256 | Eps: 0.927 | Loss: 0.0000
Ep    4 | Reward:   0.00 | Steps: 256 | Eps: 0.904 | Loss: 0.0000
Ep    5 | Reward:   0.52 | Steps: 137 | Eps: 0.892 | Loss: 0.0000
Ep    6 | Reward:   0.72 | Steps:  81 | Eps: 0.884 | Loss: 0.0046
Ep    7 | Reward:   0.00 | Steps: 256 | Eps: 0.862 | Loss: 0.0050
Ep    8 | Reward:   0.45 | Steps: 157 | Eps: 0.849 | Loss: 0.0083
Ep    9 | Reward:   0.63 | Steps: 106 | Eps: 0.840 | Loss: 0.0082
Évaluation Ep 10: Reward: 0.00, Steps: 256.0
Meilleur modèle sauvegardé avec récompense 0.00
Ep   10 | Reward:   0.66 | Steps:  96 | Eps: 0.832 | Loss: 0.0066
Ep   11 | Reward:   0.64 | Steps: 103 | Eps: 0.824 | Loss: 0.0158
Ep   12 | Reward:   0.30 | Steps: 198 | Eps: 0.808 | Loss: 0.0128
Ep   13 | Reward:   0.81 | Steps:  55 | Eps: 0.804 | Loss: 0.0106
Ep   