IMPORTS

In [2]:
# darqn_atari.py
# 🎮 DARQN minimal avec CNN + CBAM + LSTM sur Pong
# Objectif : Apprendre à un agent à jouer à Pong depuis les images brutes
# — Entraînement sans affichage

import random
from collections import deque
import numpy as np
import tensorflow as tf
from keras import layers, models, optimizers, losses, ops
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStackObservation
import ale_py

ENVIRONNEMENT : PONG

In [None]:
# On enregistre les environnements Atari dans Gymnasium
gym.register_envs(ale_py)

# Paramètres principaux

ENV_ID = "ALE/Pong-v5"      # Environnement Atari : Pong
SEQ_LEN = 4                 # Nombre de frames empilées (mémoire temporelle)
GAMMA = 0.99                # Facteur de discount (importance des récompenses futures)
LEARNING_RATE = 1e-4
EPSILON = 1.0               # Probabilité initiale d’explorer
EPSILON_MIN = 0.01
EPSILON_DECAY = 0.995
BATCH_SIZE = 32
MEMORY_SIZE = 100_000
EPISODES = 500
TARGET_UPDATE_EVERY = 10    # Tous les 10 épisodes, on synchronise le réseau cible

# Préparation de l’environnement Atari

def make_env():
    """
    Crée un environnement Pong en niveaux de gris (84x84),
    sans frameskip supplémentaire, et avec empilement de 4 images consécutives.
    """
    env = gym.make(ENV_ID, render_mode="rgb_array")

    # AtariPreprocessing :
    # - Convertit les images en 84x84
    # - Passe en niveaux de gris (plus simple que 3 canaux RGB)
    # - Gère les no-op aléatoires (pour diversifier les débuts de partie)
    env = AtariPreprocessing(
        env,
        screen_size=84,
        grayscale_obs=True,
        frame_skip=1,          # On désactive le saut de frames du wrapper
        noop_max=30,
        terminal_on_life_loss=False,
        scale_obs=False,
    )

    # FrameStackObservation :
    # - Empile les 4 dernières frames (T=4)
    #   → Cela donne à l’agent une perception du mouvement
    env = FrameStackObservation(env, stack_size=SEQ_LEN, padding_type="reset")
    return env

env = make_env()
n_actions = env.action_space.n

def obs_to_seq(obs):
    """Convertit l’observation (stack de 4 frames) en tenseur normalisé [0,1]."""
    arr = np.array(obs, dtype=np.float32) / 255.0  # Normalisation
    if arr.ndim == 3:
        arr = arr[..., np.newaxis]  # (T,84,84,1)
    return arr

CBAM (Convolutional Block Attention Module)

In [None]:

# CBAM (Convolutional Block Attention Module)
# ============================================================

def cbam_block(x, reduction=16, spatial_kernel=7):
    """
    CBAM = Channel and Spatial Attention Module.
    Permet au réseau de se concentrer sur les zones et canaux importants.
    - "Channel attention" : apprend quels filtres sont utiles.
    - "Spatial attention" : apprend où regarder dans l’image.
    """
    ch = x.shape[-1]

    # --- Attention par canal ---
    # Moyenne et max globales → donnent une idée de l’importance de chaque canal
    gap = layers.GlobalAveragePooling2D(keepdims=True)(x)
    gmp = layers.GlobalMaxPooling2D(keepdims=True)(x)
    mid = int(max(1, ch // reduction))

    # Petit MLP partagé (2 convs 1x1)
    mlp = models.Sequential([
        layers.Conv2D(mid, 1, activation="relu", padding="same"),
        layers.Conv2D(ch,  1, activation="sigmoid", padding="same"),
    ])

    # Combine GAP + GMP et applique le masque de canal
    ca = layers.Add()([mlp(gap), mlp(gmp)])
    x = layers.Multiply()([x, ca])

    # --- Attention spatiale ---
    # On calcule la moyenne et le max des canaux → on fusionne (concat)
    avg_sp = layers.Lambda(lambda t: ops.mean(t, axis=-1, keepdims=True))(x)
    max_sp = layers.Lambda(lambda t: ops.max(t, axis=-1,  keepdims=True))(x)
    concat = layers.Concatenate(axis=-1)([avg_sp, max_sp])
    # Convolution 7x7 → détecte les zones spatialement pertinentes
    sa = layers.Conv2D(1, spatial_kernel, padding="same", activation="sigmoid")(concat)
    x = layers.Multiply()([x, sa])
    return x


MODELE D'ENTRAINEMENT

In [5]:
# 🧩 Réseau Q : CNN + CBAM + LSTM

def create_q_model(input_shape=(SEQ_LEN, 84, 84, 1), n_actions=n_actions):
    """
    Réseau Q avec traitement spatio-temporel :
    - CNN : extrait les caractéristiques visuelles.
    - CBAM : module d’attention, renforce les régions/canaux importants.
    - LSTM : intègre la dimension temporelle (mouvements, dynamiques).
    - Dense : estime les valeurs Q pour chaque action possible.
    """

    seq_in = layers.Input(shape=input_shape)  # (T,84,84,1)

    # --- Bloc CNN (analyse spatiale) ---
    # Convolutions pour extraire les motifs visuels (balle, raquettes…)
    inp = layers.Input(shape=(84, 84, 1))
    y = layers.Conv2D(32, 8, strides=4, activation="relu", padding="same")(inp)
    y = layers.Conv2D(64, 4, strides=2, activation="relu", padding="same")(y)
    y = layers.Conv2D(64, 3, strides=1, activation="relu", padding="same")(y)

    # --- CBAM (focalisation sur les bonnes zones) ---
    y = cbam_block(y)

    # --- Flatten + Dense ---
    # On obtient un vecteur compact de 256 dimensions par frame
    y = layers.Flatten()(y)
    y = layers.Dense(256, activation="relu")(y)
    frame_encoder = models.Model(inp, y, name="frame_encoder_cbam")

    # --- TimeDistributed ---
    # On applique le CNN+CBAM sur chaque frame de la séquence temporelle (T=4)
    x = layers.TimeDistributed(frame_encoder)(seq_in)  # (B,T,256)

    # --- LSTM ---
    # LSTM traite la séquence de 4 vecteurs 256D pour capturer le mouvement.
    # → permet d’anticiper la trajectoire de la balle.
    x = layers.LSTM(128, return_sequences=False)(x)  # (B,128)

    # --- Dense finale ---
    # Calcule les valeurs Q(s,a) pour chaque action.
    x = layers.Dense(128, activation="relu")(x)
    q_out = layers.Dense(n_actions, activation="linear")(x)

    # --- Compilation ---
    # Huber loss : plus stable que MSE (moins sensible aux outliers)
    model = models.Model(seq_in, q_out, name="Q_CBAM_LSTM")
    model.compile(
        optimizer=optimizers.Adam(learning_rate=LEARNING_RATE),
        loss=losses.Huber()
    )
    return model


# On crée le réseau principal (Q) et le réseau cible (target)
q_model = create_q_model()
target_model = create_q_model()
target_model.set_weights(q_model.get_weights())




MEMOIRE D'EXPERIENCE : STORE_TRANSITION Function

In [6]:
# 🧱 Mémoire d’expérience (Experience Replay)

memory = deque(maxlen=MEMORY_SIZE)

def store_transition(state_seq, action, reward, next_state_seq, done):
    """
    Stocke une expérience (transition) dans la mémoire :
    (s, a, r, s', done)
    """
    memory.append((
        state_seq.astype(np.float32),
        int(action),
        float(np.sign(reward)),  # on clip les récompenses (-1, 0, +1)
        next_state_seq.astype(np.float32),
        bool(done),
    ))

def sample_batch():
    """Renvoie un batch aléatoire pour l'entraînement."""
    batch = random.sample(memory, BATCH_SIZE)
    s, a, r, s2, d = map(np.asarray, zip(*batch))
    return np.array(s), np.array(a), np.array(r), np.array(s2), np.array(d, dtype=np.float32)


FONCTION D'EXPLORATION EPSILON

In [7]:
# Politique epsilon-greedy

def epsilon_greedy_policy(state_seq, epsilon):
    """
    Choisit une action :
    - aléatoire (exploration) avec probabilité epsilon
    - ou la meilleure selon Q (exploitation)
    """
    if np.random.rand() < epsilon:
        return np.random.randint(n_actions)
    q_values = q_model.predict(state_seq[np.newaxis, ...], verbose=0)[0]
    return int(np.argmax(q_values))

FONCTION D'ENTRAINEMENT

In [8]:
# Fonction d'entraînement

def train_step():
    """Un pas d'apprentissage du DQN."""
    if len(memory) < BATCH_SIZE:
        return
    s, a, r, s2, d = sample_batch()

    # Q(s',a') du modèle cible
    next_q = target_model.predict(s2, verbose=0)
    max_next_q = np.max(next_q, axis=1)
    targets = r + (1.0 - d) * GAMMA * max_next_q

    # Q(s,a) courant du modèle principal
    q_curr = q_model.predict(s, verbose=0)
    for i in range(BATCH_SIZE):
        q_curr[i, a[i]] = targets[i]

    q_model.fit(s, q_curr, verbose=0, batch_size=BATCH_SIZE, epochs=1)


ON LANCE LE MODELE D'ENTRAINEMENT : AVEC UNE PETITE ANALYSE A CHAQUE EPOQUE

In [None]:
# Boucle d'entraînement

reward_history = []
epsilon = EPSILON

for episode in range(1, EPISODES + 1):
    obs, info = env.reset()
    state_seq = obs_to_seq(obs)
    done = False
    total_reward = 0.0

    while not done:
        # Choisir une action selon epsilon-greedy
        action = epsilon_greedy_policy(state_seq, epsilon)

        # Exécuter l'action
        next_obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        next_state_seq = obs_to_seq(next_obs)

        # Sauvegarder la transition
        store_transition(state_seq, action, reward, next_state_seq, done)
        total_reward += reward
        state_seq = next_state_seq

        # Mise à jour du réseau
        train_step()

    # Mise à jour du réseau cible et de l’exploration
    epsilon = max(EPSILON_MIN, epsilon * EPSILON_DECAY)
    if episode % TARGET_UPDATE_EVERY == 0:
        target_model.set_weights(q_model.get_weights())

    reward_history.append(total_reward)
    avg100 = np.mean(reward_history[-100:])
    print(f"Ep {episode:5d} | eps={epsilon:.3f} | EpR={total_reward:+.1f} | Avg100={avg100:+.1f} | Memory={len(memory)}")

env.close()
print("✅ Training finished.")


--- FINI ---