<a href="https://colab.research.google.com/github/ezzeddinegasmi/DRL_comparative_study/blob/main/DQN_jeudi_22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [80]:
pip install gym[atari] gym[accept-rom-license] torch torchvision numpy matplotlib opencv-python



In [81]:
import gym
import numpy as np
import cv2

def preprocess_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
    return resized / 255.0  # Normaliser

def stack_frames(frames, new_frame, is_new_episode):
    if is_new_episode:
        frames = [new_frame] * 4
    else:
        frames.append(new_frame)
        frames.pop(0)
    return np.stack(frames, axis=0), frames


In [82]:
import torch.nn as nn
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self, action_space):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, 8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, 3, stride=1)
        self.fc1 = nn.Linear(7*7*64, 512)
        self.out = nn.Linear(512, action_space)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.out(x)


In [87]:
import gym
import numpy as np
import cv2
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

# --- Prétraitement de l'image ---
def preprocess_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
    return resized / 255.0

def stack_frames(frames, new_frame, is_new_episode):
    if is_new_episode:
        frames = [new_frame] * 4
    else:
        frames.append(new_frame)
        frames.pop(0)
    return np.stack(frames, axis=0), frames

# --- Modèle DQN ---
class DQN(nn.Module):
    def __init__(self, action_space):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, 8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, 3, stride=1)
        self.fc1 = nn.Linear(7*7*64, 512)
        self.out = nn.Linear(512, action_space)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return self.out(x)

# --- Replay Memory ---
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.stack(states), actions, rewards, np.stack(next_states), dones

    def __len__(self):
        return len(self.buffer)

# --- Hyperparamètres ---
EPISODES = 500
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.1
EPS_DECAY = 1000000
TARGET_UPDATE = 1000
REPLAY_SIZE = 100000
LEARNING_RATE = 0.00025

# --- Initialisation ---
env = gym.make("ALE/Breakout-v5", render_mode=None)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

main_dqn = DQN(env.action_space.n).to(device)
target_dqn = DQN(env.action_space.n).to(device)
target_dqn.load_state_dict(main_dqn.state_dict())

optimizer = optim.Adam(main_dqn.parameters(), lr=LEARNING_RATE)
memory = ReplayBuffer(REPLAY_SIZE)
steps_done = 0

# --- Choix d'action ε-greedy ---
def select_action(state, epsilon):
    if random.random() < epsilon:
        return env.action_space.sample()
    with torch.no_grad():
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        q_values = main_dqn(state)
        return q_values.argmax().item()

# --- Entraînement principal ---
for episode in range(EPISODES):
    obs, _ = env.reset()
    frame = preprocess_frame(obs)
    state_stack = [frame] * 4
    state = np.stack(state_stack, axis=0)
    total_reward = 0
    done = False

    while not done:
        epsilon = EPS_END + (EPS_START - EPS_END) * np.exp(-1. * steps_done / EPS_DECAY)
        action = select_action(state, epsilon)
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        next_frame = preprocess_frame(next_obs)
        next_state, state_stack = stack_frames(state_stack, next_frame, is_new_episode=False)
        memory.push(state, action, reward, next_state, done)

        state = next_state
        total_reward += reward
        steps_done += 1

        # Apprentissage
        if len(memory) > BATCH_SIZE:
            states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)
            states = torch.tensor(states, dtype=torch.float32).to(device)
            actions = torch.tensor(actions).to(device)
            rewards = torch.tensor(rewards).to(device)
            next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
            dones = torch.tensor(dones).to(device)

            q_values = main_dqn(states).gather(1, actions.unsqueeze(1)).squeeze(1)
            next_q_values = target_dqn(next_states).max(1)[0]
            expected_q = rewards + GAMMA * next_q_values * (1 - dones)

            loss = nn.MSELoss()(q_values, expected_q.detach())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Mise à jour du réseau cible
        if steps_done % TARGET_UPDATE == 0:
            target_dqn.load_state_dict(main_dqn.state_dict())

    print(f"Épisode {episode + 1} - Récompense: {total_reward:.0f} - Epsilon: {epsilon:.3f}")

env.close()


AttributeError: module 'numpy' has no attribute 'bool8'

In [88]:
torch.save(main_dqn.state_dict(), "breakout_dqn.pt")

In [89]:
import gym
import torch
import torch.nn as nn
import numpy as np
import cv2
import time

# --- Même réseau que pour l'entraînement ---
class DQN(nn.Module):
    def __init__(self, action_space):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, 8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, 3, stride=1)
        self.fc1 = nn.Linear(7*7*64, 512)
        self.out = nn.Linear(512, action_space)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return self.out(x)

# --- Prétraitement comme avant ---
def preprocess_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
    return resized / 255.0

def stack_frames(frames, new_frame, is_new_episode):
    if is_new_episode:
        frames = [new_frame] * 4
    else:
        frames.append(new_frame)
        frames.pop(0)
    return np.stack(frames, axis=0), frames

# --- Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("ALE/Breakout-v5", render_mode="human")

model = DQN(env.action_space.n).to(device)
model.load_state_dict(torch.load("breakout_dqn.pt", map_location=device))
model.eval()

# --- Exécution du modèle ---
obs, _ = env.reset()
frame = preprocess_frame(obs)
state_stack = [frame] * 4
state = np.stack(state_stack, axis=0)

done = False
total_reward = 0

while not done:
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        q_values = model(state_tensor)
        action = q_values.argmax().item()

    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    total_reward += reward

    next_frame = preprocess_frame(obs)
    state, state_stack = stack_frames(state_stack, next_frame, is_new_episode=False)

    time.sleep(0.02)  # Pour ralentir un peu le jeu

print(f"Score total : {total_reward}")
env.close()


RuntimeError: Failed to initialize SDL

In [71]:
obs, _ = env.reset()
frame = preprocess_frame(obs)
state_stack = [frame] * 4
state = np.stack(state_stack, axis=0)

# --- Appuyer sur "FIRE" pour commencer le jeu ---
obs, reward, terminated, truncated, _ = env.step(1)  # action FIRE