In [1]:
!pip install gymnasium[atari] ale-py shimmy opencv-python tensorflow

Collecting ale-py
  Downloading ale_py-0.11.2-cp312-cp312-win_amd64.whl.metadata (9.2 kB)
Collecting shimmy
  Downloading Shimmy-2.0.0-py3-none-any.whl.metadata (3.5 kB)
Collecting opencv-python
  Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (19 kB)
Collecting tensorflow
  Using cached tensorflow-2.20.0-cp312-cp312-win_amd64.whl.metadata (4.6 kB)
Collecting gymnasium[atari]
  Downloading gymnasium-1.2.2-py3-none-any.whl.metadata (10 kB)
Collecting numpy>=1.21.0 (from gymnasium[atari])
  Downloading numpy-2.3.5-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting cloudpickle>=1.2.0 (from gymnasium[atari])
  Downloading cloudpickle-3.1.2-py3-none-any.whl.metadata (7.1 kB)
Collecting typing-extensions>=4.3.0 (from gymnasium[atari])
  Using cached typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[atari])
  Using cached Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting numpy>=

In [2]:
import gymnasium as gym
import ale_py
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import numpy as np
import cv2
from collections import deque
import random

In [3]:
# Enregistrement des environnements ALE
gym.register_envs(ale_py)

# --- 1. Configuration et Hyperparamètres ---
CONFIG = {
    'env_name': 'ALE/DonkeyKong-v5',
    'state_shape': (84, 84, 1), # (H, W, Channels) - Grayscale
    'seq_len': 4,               # Longueur de la séquence pour le LSTM
    'gamma': 0.99,
    'learning_rate': 0.0001,
    'epsilon_start': 1.0,
    'epsilon_min': 0.1,
    'epsilon_decay': 0.99995, # Decay plus lent pour laisser le temps d'apprendre
    'batch_size': 32,
    'memory_size': 20000,     # Taille du buffer
    'target_update_freq': 1000,
    'train_freq': 4
}


In [4]:
# --- 2. Wrappers pour l'environnement ---
# Ces wrappers transforment l'image brute (210,160,3) en (84,84,1) normalisé
class AtariPreprocessing(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(84, 84, 1), dtype=np.float32)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        return self._process(obs), reward, terminated, truncated, info

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        return self._process(obs), info

    def _process(self, frame):
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) # Gris
        frame = cv2.resize(frame, (84, 84))             # Resize
        frame = frame / 255.0                           # Normalisation 0-1
        return np.expand_dims(frame, axis=-1)           # Ajout dimension channel

def make_env():
    # render_mode='rgb_array' est nécessaire pour éviter des erreurs graphiques sur serveur
    env = gym.make(CONFIG['env_name'], render_mode='rgb_array')
    env = AtariPreprocessing(env)
    return env


In [5]:
# --- 3. Modèle DARQN (Dueling Attention Recurrent Q-Network) ---
class DuelingDARQN(models.Model):
    def __init__(self, action_size):
        super(DuelingDARQN, self).__init__()

        # CNN Encoder (TimeDistributed pour appliquer à chaque frame de la séquence)
        self.conv1 = layers.TimeDistributed(layers.Conv2D(32, 8, strides=4, activation='relu'))
        self.conv2 = layers.TimeDistributed(layers.Conv2D(64, 4, strides=2, activation='relu'))
        self.conv3 = layers.TimeDistributed(layers.Conv2D(64, 3, strides=1, activation='relu'))
        self.flatten = layers.TimeDistributed(layers.Flatten())

        # Recurrent Layer
        self.lstm = layers.LSTM(512, return_sequences=True)

        # Attention Mechanism (Multi-Head est plus robuste)
        self.attention = layers.MultiHeadAttention(num_heads=2, key_dim=64)
        self.layer_norm = layers.LayerNormalization()

        # Dueling Streams
        # 1. Value Stream (V)
        self.v_dense = layers.Dense(512, activation='relu')
        self.v_out = layers.Dense(1)

        # 2. Advantage Stream (A)
        self.a_dense = layers.Dense(512, activation='relu')
        self.a_out = layers.Dense(action_size)

    def call(self, inputs):
        # inputs shape: (Batch, Seq_Len, 84, 84, 1)
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.flatten(x)

        # LSTM Processing
        lstm_out = self.lstm(x) # (Batch, Seq, 512)

        # Self-Attention sur la dimension temporelle
        # L'attention permet de pondérer l'importance des frames passées
        attn_out = self.attention(query=lstm_out, value=lstm_out, key=lstm_out)
        context = self.layer_norm(lstm_out + attn_out)

        # On prend souvent le dernier état ou une moyenne pondérée.
        # Ici on prend le dernier état contextuel après attention.
        last_context = context[:, -1, :]

        # Dueling Logic
        v = self.v_dense(last_context)
        v = self.v_out(v)

        a = self.a_dense(last_context)
        a = self.a_out(a)

        # Q = V + (A - mean(A))
        q = v + (a - tf.reduce_mean(a, axis=1, keepdims=True))
        return q


In [6]:
# --- 4. Agent Double DQN ---
class Agent:
    def __init__(self, action_size):
        self.action_size = action_size
        self.epsilon = CONFIG['epsilon_start']

        # Création des modèles
        self.model = DuelingDARQN(action_size)
        self.target_model = DuelingDARQN(action_size)

        # Compilation dummy pour initialiser les poids (nécessaire pour copier vers target)
        dummy_input = tf.zeros((1, CONFIG['seq_len'], *CONFIG['state_shape']))
        self.model(dummy_input)
        self.target_model(dummy_input)
        self.update_target()

        self.optimizer = optimizers.Adam(learning_rate=CONFIG['learning_rate'], clipnorm=1.0)

        # Replay Buffer (Deque est plus simple pour commencer)
        self.memory = deque(maxlen=CONFIG['memory_size'])

    def update_target(self):
        self.target_model.set_weights(self.model.get_weights())

    def act(self, state_seq):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_size)

        # state_seq est (Seq, 84, 84, 1), on ajoute la dimension Batch -> (1, Seq, 84, 84, 1)
        state_tensor = tf.convert_to_tensor([state_seq], dtype=tf.float32)
        q_values = self.model(state_tensor)
        return np.argmax(q_values.numpy()[0])

    def remember(self, s, a, r, ns, d):
        self.memory.append((s, a, r, ns, d))

    def train(self):
        if len(self.memory) < CONFIG['batch_size']:
            return

        batch = random.sample(self.memory, CONFIG['batch_size'])

        # Conversion en numpy arrays
        states = np.array([i[0] for i in batch], dtype=np.float32)
        actions = np.array([i[1] for i in batch])
        rewards = np.array([i[2] for i in batch], dtype=np.float32)
        next_states = np.array([i[3] for i in batch], dtype=np.float32)
        dones = np.array([i[4] for i in batch], dtype=np.float32)

        # --- Double DQN Logic ---
        # 1. Utiliser le modèle principal pour choisir la meilleure action suivante
        next_q_main_model = self.model(next_states)
        best_next_actions = tf.argmax(next_q_main_model, axis=1)

        # 2. Utiliser le modèle cible pour calculer la valeur Q de cette action
        next_q_target_model = self.target_model(next_states)

        # Sélection des Q-values correspondant aux meilleures actions
        indices = tf.stack([tf.range(CONFIG['batch_size'], dtype=tf.int64), best_next_actions], axis=1)
        target_q_values = tf.gather_nd(next_q_target_model, indices)

        # 3. Calcul de la cible finale (Bellman equation)
        targets = rewards + (1 - dones) * CONFIG['gamma'] * target_q_values

        with tf.GradientTape() as tape:
            q_values = self.model(states)

            # On récupère Q(s, a) pour les actions jouées
            action_indices = tf.stack([tf.range(CONFIG['batch_size'], dtype=tf.int32), actions], axis=1)
            q_action = tf.gather_nd(q_values, action_indices)

            # Loss (Huber loss est souvent plus stable que MSE pour DQN)
            loss = tf.keras.losses.Huber()(targets, q_action)

        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

        # Epsilon Decay
        if self.epsilon > CONFIG['epsilon_min']:
            self.epsilon *= CONFIG['epsilon_decay']


In [7]:
# --- 5. Boucle Principale ---
env = make_env()
action_size = env.action_space.n
agent = Agent(action_size)

scores = []
score_window = deque(maxlen=100)

print(f"Début de l'entraînement sur {CONFIG['env_name']}...")
print(f"Action Space: {action_size}")

for episode in range(5000): # Nombre d'épisodes
    # Reset environnement
    obs, info = env.reset()

    # Initialisation de la séquence (Remplir avec la première frame)
    # Shape: (Seq_Len, 84, 84, 1)
    state_seq = np.stack([obs] * CONFIG['seq_len'], axis=0)

    score = 0
    done = False
    step = 0

    while not done:
        # 1. Action
        # On aplatit la dim sequence pour le stockage propre si nécessaire, mais ici on garde (Seq, H, W, C)
        # Mais attention: le model attend (Seq, 84, 84, 1)
        # obs est (84, 84, 1)

        action = agent.act(state_seq)

        # 2. Step
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Mise à jour de la séquence
        # On décale la séquence et on ajoute la nouvelle frame à la fin
        next_state_seq = np.roll(state_seq, -1, axis=0)
        next_state_seq[-1] = next_obs

        # 3. Stockage
        agent.remember(state_seq, action, reward, next_state_seq, done)

        state_seq = next_state_seq
        score += reward
        step += 1

        # 4. Entraînement
        if step % CONFIG['train_freq'] == 0:
            agent.train()

    # Fin d'épisode
    if episode % (CONFIG['target_update_freq'] // 100) == 0: # Update approximatif
        agent.update_target()

    score_window.append(score)
    avg_score = np.mean(score_window)

    print(f"Episode: {episode} | Score: {score:.1f} | Avg Score: {avg_score:.1f} | Epsilon: {agent.epsilon:.3f}")

    if avg_score >= 300: # Seuil arbitraire pour sauvegarder
        agent.model.save_weights("donkey_kong_darqn.h5")

env.close()

Début de l'entraînement sur ALE/DonkeyKong-v5...
Action Space: 18
Episode: 0 | Score: 100.0 | Avg Score: 100.0 | Epsilon: 0.992
Episode: 1 | Score: 200.0 | Avg Score: 150.0 | Epsilon: 0.980
Episode: 2 | Score: 300.0 | Avg Score: 200.0 | Epsilon: 0.970
Episode: 3 | Score: 100.0 | Avg Score: 175.0 | Epsilon: 0.961
Episode: 4 | Score: 100.0 | Avg Score: 160.0 | Epsilon: 0.952
Episode: 5 | Score: 200.0 | Avg Score: 166.7 | Epsilon: 0.945
Episode: 6 | Score: 200.0 | Avg Score: 171.4 | Epsilon: 0.937
Episode: 7 | Score: 200.0 | Avg Score: 175.0 | Epsilon: 0.928
Episode: 8 | Score: 100.0 | Avg Score: 166.7 | Epsilon: 0.920
Episode: 9 | Score: 200.0 | Avg Score: 170.0 | Epsilon: 0.912
Episode: 10 | Score: 200.0 | Avg Score: 172.7 | Epsilon: 0.904
Episode: 11 | Score: 100.0 | Avg Score: 166.7 | Epsilon: 0.896
Episode: 12 | Score: 300.0 | Avg Score: 176.9 | Epsilon: 0.886
Episode: 13 | Score: 100.0 | Avg Score: 171.4 | Epsilon: 0.880
Episode: 14 | Score: 100.0 | Avg Score: 166.7 | Epsilon: 0.872

KeyboardInterrupt: 

In [None]:
from google.colab import files

# 1. Sauvegarder les poids
# On utilise save_weights car sauvegarder un modèle "subclassé" (custom) entier est souvent buggé
agent.model.save_weights("donkey_kong_final.weights.h5")
print("Modèle sauvegardé localement dans le runtime Colab.")

# 2. Télécharger le fichier sur ton PC
files.download("donkey_kong_final.weights.h5")