In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, TensorBoard, ModelCheckpoint
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout, GlobalAveragePooling2D, GlobalMaxPooling2D, Layer
import gymnasium as gym
import ale_py
from collections import deque
import random
import os
from datetime import datetime
import matplotlib.pyplot as plt

# GPU (askip)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"GPU(s) disponible(s): {len(gpus)}")
    except RuntimeError as e:
        print(e)
else:
    print("CPU utilisé")

ENV_NAME = "ALE/Alien-v5"
LEARNING_RATE = 0.00025
GAMMA = 0.99
EPSILON_START = 1.0
EPSILON_MIN = 0.1
EPSILON_DECAY = 0.995
BATCH_SIZE = 32
MEMORY_SIZE = 10000
EPISODES = 500
UPDATE_TARGET_FREQ = 10
FRAME_STACK = 4
LSTM_UNITS = 256

gym.register_envs(ale_py)

In [None]:
class CBAM(Layer):
    def __init__(self, ratio=8, kernel_size=7, **kwargs):
        super().__init__(**kwargs)
        self.ratio = ratio
        self.kernel_size = kernel_size

    def build(self, input_shape):
        channels = int(input_shape[-1])
        hidden = max(channels // self.ratio, 1)
        self.gap = GlobalAveragePooling2D()
        self.gmp = GlobalMaxPooling2D()
        self.fc1 = Dense(hidden, activation='relu', kernel_initializer='he_normal', use_bias=True)
        self.fc2 = Dense(channels, activation=None, kernel_initializer='he_normal', use_bias=True)
        self.spatial_conv = Conv2D(filters=1, kernel_size=self.kernel_size, padding='same', activation='sigmoid', kernel_initializer='he_normal')
        super().build(input_shape)

    def call(self, inputs):
        avg_pool = self.fc2(self.fc1(self.gap(inputs)))
        max_pool = self.fc2(self.fc1(self.gmp(inputs)))
        channel_attn = tf.nn.sigmoid(avg_pool + max_pool)
        channel_attn = tf.reshape(channel_attn, (-1, 1, 1, tf.shape(inputs)[-1]))
        x = inputs * channel_attn
        avg_spatial = tf.reduce_mean(x, axis=-1, keepdims=True)
        max_spatial = tf.reduce_max(x, axis=-1, keepdims=True)
        spatial = tf.concat([avg_spatial, max_spatial], axis=-1)
        spatial_attn = self.spatial_conv(spatial)
        return x * spatial_attn

    def get_config(self):
        cfg = super().get_config()
        cfg.update({"ratio": self.ratio, "kernel_size": self.kernel_size})
        return cfg


def preprocess_frame(frame):
    frame = tf.image.rgb_to_grayscale(frame)
    frame = tf.image.resize(frame, [84, 84])
    frame = tf.cast(frame, tf.float32) / 255.0
    return frame


def build_darqn_model(input_shape, n_actions):
    state_input = layers.Input(shape=input_shape, name='state_input')
    
    x = Conv2D(32, (8, 8), strides=4, activation='relu', kernel_initializer='he_normal')(state_input)
    x = CBAM(ratio=8, kernel_size=7)(x)
    
    x = Conv2D(64, (4, 4), strides=2, activation='relu', kernel_initializer='he_normal')(x)
    x = CBAM(ratio=8, kernel_size=7)(x)
    
    x = Conv2D(64, (3, 3), strides=1, activation='relu', kernel_initializer='he_normal')(x)
    x = CBAM(ratio=8, kernel_size=7)(x)
    
    x = Flatten()(x)
    
    x = layers.Reshape((FRAME_STACK, -1))(x)
    
    x = layers.LSTM(LSTM_UNITS, activation='tanh', return_sequences=False)(x)
    x = Dropout(0.3)(x)
    
    x = Dense(512, activation='relu', kernel_initializer='he_normal')(x)
    x = Dropout(0.2)(x)
    
    output = Dense(n_actions, activation='linear', kernel_initializer='he_normal', name='q_values')(x)
    
    model = Model(inputs=state_input, outputs=output, name='DARQN_CBAM')
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
                  loss='huber',
                  metrics=['mae'])
    
    return model


class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), np.array(rewards), 
                np.array(next_states), np.array(dones))
    
    def size(self):
        return len(self.buffer)


class DARQNAgent:
    def __init__(self, state_shape, n_actions):
        self.state_shape = state_shape
        self.n_actions = n_actions
        self.epsilon = EPSILON_START
        
        self.model = build_darqn_model(state_shape, n_actions)
        self.target_model = build_darqn_model(state_shape, n_actions)
        self.update_target_model()
        
        self.replay_buffer = ReplayBuffer(MEMORY_SIZE)
        
        self.frame_stack = deque(maxlen=FRAME_STACK)
        
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
    
    def get_action(self, state, training=True):
        if training and np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)
        
        q_values = self.model.predict(np.expand_dims(state, axis=0), verbose=0)
        return np.argmax(q_values[0])
    
    def train(self):
        if self.replay_buffer.size() < BATCH_SIZE:
            return 0.0
        
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
        
        next_q_values = self.target_model.predict(next_states, verbose=0)
        max_next_q_values = np.max(next_q_values, axis=1)
        targets = rewards + GAMMA * max_next_q_values * (1 - dones)
        
        target_q_values = self.model.predict(states, verbose=0)
        for i, action in enumerate(actions):
            target_q_values[i][action] = targets[i]
        
        loss = self.model.train_on_batch(states, target_q_values)
        
        return loss
    
    def decay_epsilon(self):
        self.epsilon = max(EPSILON_MIN, self.epsilon * EPSILON_DECAY)
    
    def stack_frames(self, frame, is_new_episode=False):
        frame = preprocess_frame(frame)
        
        if is_new_episode:
            self.frame_stack.clear()
            for _ in range(FRAME_STACK):
                self.frame_stack.append(frame)
        else:
            self.frame_stack.append(frame)
        
        stacked = np.stack(self.frame_stack, axis=-1)
        return stacked


In [None]:
env = gym.make(ENV_NAME, render_mode=None)
n_actions = env.action_space.n
state_shape = (84, 84, FRAME_STACK)

agent = DARQNAgent(state_shape, n_actions)

agent.model.summary()

print(f"{n_actions}")
print(f"{state_shape}")

entrainement

In [None]:
def train_darqn():
    log_dir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    os.makedirs(log_dir, exist_ok=True)
    
    episode_rewards = []
    episode_losses = []
    best_reward = -float('inf')
    
    print(f"\n{'='*50}")
    print(f"Début de l'entraînement - {EPISODES} épisodes")
    print(f"{'='*50}\n")
    
    for episode in range(EPISODES):
        state, _ = env.reset()
        state = agent.stack_frames(state, is_new_episode=True)
        
        total_reward = 0
        total_loss = 0
        steps = 0
        done = False
        
        while not done:
            action = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            next_state = agent.stack_frames(next_state)
            
            agent.replay_buffer.add(state, action, reward, next_state, done)
            
            loss = agent.train()
            
            state = next_state
            total_reward += reward
            total_loss += loss
            steps += 1
        
        if episode % UPDATE_TARGET_FREQ == 0:
            agent.update_target_model()
        
        agent.decay_epsilon()
        
        avg_loss = total_loss / steps if steps > 0 else 0
        episode_rewards.append(total_reward)
        episode_losses.append(avg_loss)
        
        if total_reward > best_reward:
            best_reward = total_reward
            os.makedirs('models', exist_ok=True)
            agent.model.save('models/best_darqn_cbam_alien.keras')
        
        if episode % 10 == 0:
            avg_reward_10 = np.mean(episode_rewards[-10:]) if len(episode_rewards) >= 10 else np.mean(episode_rewards)
            print(f"Épisode {episode:3d} | "
                  f"Récompense: {total_reward:6.1f} | "
                  f"Moyenne (10): {avg_reward_10:6.1f} | "
                  f"Epsilon: {agent.epsilon:.3f} | "
                  f"Steps: {steps:4d} | "
                  f"Loss: {avg_loss:.4f}")
        
        if episode % 50 == 0 and episode > 0:
            os.makedirs('models', exist_ok=True)
            agent.model.save(f'models/darqn_cbam_alien_ep{episode}.keras')
    
    print(f"\n{'='*50}")
    print(f"Entraînement terminé!")
    print(f"Meilleure récompense: {best_reward:.1f}")
    print(f"{'='*50}\n")
    
    env.close()
    
    return episode_rewards, episode_losses

rewards, losses = train_darqn()

graphes

In [None]:
def plot_training_results(rewards, losses):
    fig, axes = plt.subplots(2, 1, figsize=(12, 10))
    
    axes[0].plot(rewards, alpha=0.6, label='Récompense par épisode')
    if len(rewards) >= 10:
        moving_avg = np.convolve(rewards, np.ones(10)/10, mode='valid')
        axes[0].plot(range(9, len(rewards)), moving_avg, 'r-', linewidth=2, label='Moyenne mobile (10)')
    axes[0].set_xlabel('Épisode')
    axes[0].set_ylabel('Récompense totale')
    axes[0].set_title('Évolution des récompenses au cours de l\'entraînement')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    axes[1].plot(losses, alpha=0.6, color='orange', label='Loss par épisode')
    if len(losses) >= 10:
        moving_avg_loss = np.convolve(losses, np.ones(10)/10, mode='valid')
        axes[1].plot(range(9, len(losses)), moving_avg_loss, 'r-', linewidth=2, label='Moyenne mobile (10)')
    axes[1].set_xlabel('Épisode')
    axes[1].set_ylabel('Loss moyenne')
    axes[1].set_title('Évolution de la loss au cours de l\'entraînement')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('training_results_darqn_cbam.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print(f"\nrécompenses:")
    print(f"moyenne {np.mean(rewards):.2f}")
    print(f"max {np.max(rewards):.2f}")
    print(f"min {np.min(rewards):.2f}")
    print(f"Loss moyenne: {np.mean(losses):.4f}")

plot_training_results(rewards, losses)

test de l'agent entrainé

In [None]:

def test_agent(model_path='models/best_darqn_cbam_alien.keras', n_episodes=5, render=True):
    test_agent = DARQNAgent(state_shape, n_actions)
    test_agent.model = keras.models.load_model(model_path, custom_objects={'CBAM': CBAM})
    test_agent.epsilon = 0.0
    
    render_mode = 'human' if render else None
    test_env = gym.make(ENV_NAME, render_mode=render_mode)
    
    test_rewards = []
    
    print(f"\n{'='*50}")
    print(f"Test de l'agent sur {n_episodes} épisodes")
    print(f"{'='*50}\n")
    
    for episode in range(n_episodes):
        state, _ = test_env.reset()
        state = test_agent.stack_frames(state, is_new_episode=True)
        
        total_reward = 0
        steps = 0
        done = False
        
        while not done:
            action = test_agent.get_action(state, training=False)
            next_state, reward, terminated, truncated, _ = test_env.step(action)
            done = terminated or truncated
            
            next_state = test_agent.stack_frames(next_state)
            state = next_state
            total_reward += reward
            steps += 1
        
        test_rewards.append(total_reward)
        print(f"Épisode {episode + 1}: Récompense = {total_reward:.1f}, Steps = {steps}")
    
    test_env.close()
    
    print(f"\n{'='*50}")
    print(f"  - Récompense moyenne: {np.mean(test_rewards):.2f}")
    print(f"  - Récompense max: {np.max(test_rewards):.2f}")
    print(f"  - Récompense min: {np.min(test_rewards):.2f}")
    print(f"{'='*50}\n")
    
    return test_rewards

# test de l'agent après l'entraînement (faut décommenter)
# test_rewards = test_agent(render=False)