# Deep Q-Network (DQN) pour MountainCar-v0

## Objectif
Implémenter un DQN pour résoudre le problème MountainCar-v0, où l'agent doit faire monter une voiture au sommet d'une colline en utilisant son momentum.

## Critère de réussite
L'agent doit atteindre le sommet (récompense > -200) dans au moins **90%** des épisodes sur 100 épisodes consécutifs.

## Architecture
- **Réseau de neurones**: 2 couches cachées de 24 neurones chacune avec activation ReLU
- **Experience Replay**: Mémoire tampon de 100,000 transitions
- **Target Network**: Mise à jour tous les 10 épisodes
- **Exploration**: Politique epsilon-greedy avec décroissance exponentielle
- **ModelCheckpoint**: Sauvegarde automatique du meilleur modèle

In [None]:
# Installation des bibliothèques nécessaires
%pip install gym gym[classic_control] tensorflow matplotlib imageio imageio-ffmpeg

In [None]:
# Import des bibliothèques nécessaires
import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint
import gym
import tensorflow as tf
import random
from collections import deque
import matplotlib.pyplot as plt
from IPython.display import HTML
import imageio
import os

In [None]:
# Configuration de l'environnement
env_name = "MountainCar-v0"
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
gamma = 0.99
batch_size = 32
memory_size = 100000
episodes = 500
learning_rate = 0.001
target_update_frequency = 10  # Mise à jour du target network tous les 10 épisodes
success_threshold = -200  # Pour MountainCar, un épisode réussi a une récompense > -200
success_rate_threshold = 0.90  # 90% de taux de réussite requis

# Créer le dossier pour sauvegarder les modèles
os.makedirs('models', exist_ok=True)

# Initialisation de l'environnement
env = gym.make(env_name)
state_shape = env.observation_space.shape[0]
action_shape = env.action_space.n

print(f"Environnement: {env_name}")
print(f"State shape: {state_shape}")
print(f"Action shape: {action_shape}")
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

In [None]:
# Création du modèle DQN
def create_q_model():
    model = tf.keras.Sequential([
        layers.Dense(24, activation='relu', input_shape=(state_shape,)),
        layers.Dense(24, activation='relu'),
        layers.Dense(action_shape, activation='linear')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss='mse')
    return model

# Création des réseaux principal et target
q_model = create_q_model()
target_model = create_q_model()
target_model.set_weights(q_model.get_weights())

# Configuration du ModelCheckpoint pour sauvegarder le meilleur modèle
checkpoint_path = 'models/mountaincar_dqn_best.h5'
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='loss',  # On surveillera la loss pendant l'entraînement
    save_best_only=True,
    save_weights_only=False,
    mode='min',
    verbose=1
)

print("Modèles créés avec succès!")
print(f"ModelCheckpoint configuré: {checkpoint_path}")
q_model.summary()

In [None]:
# Experience Replay Buffer
memory = deque(maxlen=memory_size)

def store_transition(state, action, reward, next_state, done):
    """Stocke une transition dans le replay buffer"""
    memory.append((state, action, reward, next_state, done))

def sample_batch():
    """Échantillonne un batch de transitions pour l'entraînement"""
    batch = random.sample(memory, batch_size)
    states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
    return states, actions, rewards, next_states, dones

In [None]:
# Politique epsilon-greedy
def epsilon_greedy_policy(state, epsilon):
    """Sélectionne une action selon la politique epsilon-greedy"""
    if np.random.random() < epsilon:
        return env.action_space.sample()  # Exploration
    else:
        q_values = q_model.predict(state[np.newaxis], verbose=0)
        return np.argmax(q_values[0])  # Exploitation

In [None]:
# Fonction d'entraînement
def train_step():
    """Effectue une étape d'entraînement du DQN"""
    if len(memory) < batch_size:
        return
    
    # Échantillonnage d'un batch
    states, actions, rewards, next_states, dones = sample_batch()
    
    # Calcul des Q-values cibles avec le target network
    next_q_values = target_model.predict(next_states, verbose=0)
    max_next_q_values = np.max(next_q_values, axis=1)
    
    # Calcul des Q-values actuelles
    target_q_values = q_model.predict(states, verbose=0)
    
    # Mise à jour des Q-values cibles selon l'équation de Bellman
    for i in range(batch_size):
        if dones[i]:
            target_q_values[i][actions[i]] = rewards[i]
        else:
            target_q_values[i][actions[i]] = rewards[i] + gamma * max_next_q_values[i]
    
    # Entraînement du réseau principal
    q_model.fit(states, target_q_values, verbose=0, epochs=1)

In [None]:
# Boucle d'entraînement principale
reward_history = []
episode_lengths = []
average_rewards = []
success_history = []  # Historique des succès (récompense > -200)
success_rates = []  # Taux de réussite sur 100 épisodes

# Variables pour ModelCheckpoint - sauvegarde du meilleur modèle
best_success_rate = 0.0
best_avg_reward = float('-inf')
best_model_path = 'models/mountaincar_dqn_best.h5'

print("Début de l'entraînement...")
print("=" * 50)

for episode in range(episodes):
    state = env.reset()
    # Gérer le nouveau format de gym (retourne un tuple avec info)
    if isinstance(state, tuple):
        state = state[0]
    
    total_reward = 0
    done = False
    steps = 0
    
    while not done:
        # Sélection de l'action
        action = epsilon_greedy_policy(state, epsilon)
        
        # Exécution de l'action
        result = env.step(action)
        if len(result) == 4:  # Ancien format gym
            next_state, reward, done, _ = result
        else:  # Nouveau format gymnasium
            next_state, reward, terminated, truncated, info = result
            done = terminated or truncated
        
        # Stockage de la transition
        store_transition(state, action, reward, next_state, done)
        
        # Entraînement
        train_step()
        
        state = next_state
        total_reward += reward
        steps += 1
    
    # Décroissance d'epsilon
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    
    # Mise à jour du target network
    if episode % target_update_frequency == 0:
        target_model.set_weights(q_model.get_weights())
    
    # Enregistrement des résultats
    reward_history.append(total_reward)
    episode_lengths.append(steps)
    is_success = total_reward > success_threshold
    success_history.append(is_success)
    
    # Calcul de la moyenne sur les 100 derniers épisodes
    if len(reward_history) >= 100:
        avg_reward = np.mean(reward_history[-100:])
        average_rewards.append(avg_reward)
        success_rate = np.mean(success_history[-100:])
        success_rates.append(success_rate)
        
        # ModelCheckpoint: Sauvegarder le meilleur modèle basé sur le taux de réussite
        if success_rate > best_success_rate or (success_rate == best_success_rate and avg_reward > best_avg_reward):
            best_success_rate = success_rate
            best_avg_reward = avg_reward
            q_model.save(best_model_path)
            if (episode + 1) % 10 == 0:  # Afficher seulement tous les 10 épisodes
                print(f"  → Meilleur modèle sauvegardé! (Taux: {success_rate:.2%}, Reward: {avg_reward:.2f})")
    else:
        average_rewards.append(np.mean(reward_history))
        success_rates.append(np.mean(success_history))
    
    # Sauvegarde périodique du modèle
    if (episode + 1) % 50 == 0:
        q_model.save(f'models/mountaincar_dqn_episode_{episode + 1}.h5')
    
    # Affichage des résultats
    if (episode + 1) % 10 == 0:
        avg = np.mean(reward_history[-10:])
        recent_success_rate = np.mean(success_history[-10:]) if len(success_history) >= 10 else 0
        print(f"Episode: {episode + 1}/{episodes}, "
              f"Reward moyen (10 derniers): {avg:.2f}, "
              f"Reward moyen (100 derniers): {average_rewards[-1]:.2f}, "
              f"Taux de réussite (100 derniers): {success_rates[-1]:.2%}, "
              f"Epsilon: {epsilon:.3f}")
    
    # Vérification du critère de réussite
    if len(reward_history) >= 100:
        if success_rates[-1] >= success_rate_threshold:
            print(f"\n✓ Succès! Taux de réussite de {success_rates[-1]:.2%} atteint à l'épisode {episode + 1}")
            print(f"  Score moyen: {average_rewards[-1]:.2f}")
            # Sauvegarder le modèle final
            q_model.save('models/mountaincar_dqn_final.h5')
            print("  Modèle final sauvegardé!")
            print("=" * 50)
            break

print("\nEntraînement terminé!")
# Sauvegarder le modèle final même si le critère n'est pas atteint
q_model.save('models/mountaincar_dqn_final.h5')
print(f"Modèle final sauvegardé dans 'models/mountaincar_dqn_final.h5'")
print(f"Meilleur modèle sauvegardé dans '{best_model_path}' (Taux: {best_success_rate:.2%}, Reward: {best_avg_reward:.2f})")

In [None]:
# Visualisation des résultats d'entraînement
plt.figure(figsize=(18, 6))

# Graphique 1: Récompenses par épisode
plt.subplot(2, 3, 1)
plt.plot(reward_history, alpha=0.6, label='Reward par épisode', color='blue')
if len(average_rewards) > 0:
    plt.plot(average_rewards, label='Moyenne sur 100 épisodes', color='red', linewidth=2)
plt.axhline(y=success_threshold, color='g', linestyle='--', label=f'Seuil de réussite ({success_threshold})')
plt.xlabel('Épisode')
plt.ylabel('Récompense')
plt.title('Évolution des Récompenses')
plt.legend()
plt.grid(True, alpha=0.3)

# Graphique 2: Taux de réussite
plt.subplot(2, 3, 2)
if len(success_rates) > 0:
    plt.plot(success_rates, label='Taux de réussite (100 épisodes)', color='green', linewidth=2)
    plt.axhline(y=success_rate_threshold, color='r', linestyle='--', 
                label=f'Seuil requis ({success_rate_threshold:.0%})')
plt.xlabel('Épisode')
plt.ylabel('Taux de réussite')
plt.title('Taux de Réussite (récompense > -200)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.ylim([0, 1.1])

# Graphique 3: Longueur des épisodes
plt.subplot(2, 3, 3)
plt.plot(episode_lengths, alpha=0.6, color='purple')
plt.xlabel('Épisode')
plt.ylabel('Longueur de l\'épisode')
plt.title('Longueur des Épisodes')
plt.grid(True, alpha=0.3)

# Graphique 4: Distribution des récompenses
plt.subplot(2, 3, 4)
plt.hist(reward_history, bins=50, alpha=0.7, edgecolor='black', color='skyblue')
plt.axvline(x=success_threshold, color='r', linestyle='--', label='Seuil de réussite')
plt.xlabel('Récompense')
plt.ylabel('Fréquence')
plt.title('Distribution des Récompenses')
plt.legend()
plt.grid(True, alpha=0.3)

# Graphique 5: Évolution du taux de réussite (10 derniers)
plt.subplot(2, 3, 5)
if len(success_history) >= 10:
    recent_success = [np.mean(success_history[max(0, i-9):i+1]) 
                      for i in range(len(success_history))]
    plt.plot(recent_success, label='Taux de réussite (10 derniers)', color='orange')
plt.xlabel('Épisode')
plt.ylabel('Taux de réussite')
plt.title('Taux de Réussite (fenêtre glissante 10)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.ylim([0, 1.1])

# Graphique 6: Comparaison succès/échecs
plt.subplot(2, 3, 6)
success_count = sum(success_history)
failure_count = len(success_history) - success_count
plt.bar(['Succès', 'Échecs'], [success_count, failure_count], 
        color=['green', 'red'], alpha=0.7, edgecolor='black')
plt.ylabel('Nombre d\'épisodes')
plt.title(f'Répartition Succès/Échecs\n({success_count}/{len(success_history)} = {success_count/len(success_history):.1%})')
plt.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# Statistiques finales
print("\n" + "=" * 50)
print("STATISTIQUES FINALES")
print("=" * 50)
print(f"Nombre d'épisodes: {len(reward_history)}")
print(f"Récompense moyenne (tous épisodes): {np.mean(reward_history):.2f}")
if len(reward_history) >= 100:
    print(f"Récompense moyenne (100 derniers): {np.mean(reward_history[-100:]):.2f}")
print(f"Récompense maximale: {np.max(reward_history)}")
print(f"Récompense minimale: {np.min(reward_history)}")
print(f"Écart-type: {np.std(reward_history):.2f}")
print(f"\nTaux de réussite global: {np.mean(success_history):.2%}")
if len(success_history) >= 100:
    print(f"Taux de réussite (100 derniers): {np.mean(success_history[-100:]):.2%}")
print(f"Nombre total de succès: {sum(success_history)}/{len(success_history)}")

In [None]:
# Charger le meilleur modèle sauvegardé (optionnel)
# Décommentez les lignes suivantes pour charger un modèle précédemment sauvegardé

# from tensorflow.keras.models import load_model
# q_model = load_model('models/mountaincar_dqn_best.h5')
# target_model = create_q_model()
# target_model.set_weights(q_model.get_weights())
# print("Meilleur modèle chargé avec succès!")

In [None]:
# Fonction d'évaluation
def evaluate_agent(env, model, num_episodes=100, render=False):
    """Évalue l'agent sur un nombre d'épisodes"""
    total_rewards = []
    
    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        
        total_reward = 0
        done = False
        
        while not done:
            if render:
                env.render()
            
            # Action déterministe (pas d'exploration)
            q_values = model.predict(state[np.newaxis], verbose=0)
            action = np.argmax(q_values[0])
            
            result = env.step(action)
            if len(result) == 4:
                next_state, reward, done, _ = result
            else:
                next_state, reward, terminated, truncated, info = result
                done = terminated or truncated
            
            state = next_state
            total_reward += reward
        
        total_rewards.append(total_reward)
        if (episode + 1) % 10 == 0:
            print(f"Évaluation - Épisode {episode + 1}/{num_episodes}, "
                  f"Reward moyen: {np.mean(total_rewards):.2f}")
    
    return total_rewards

# Évaluation finale
print("Évaluation de l'agent sur 100 épisodes...")
eval_rewards = evaluate_agent(env, q_model, num_episodes=100, render=False)
print(f"\nScore moyen sur 100 épisodes: {np.mean(eval_rewards):.2f}")
print(f"Score maximum: {np.max(eval_rewards)}")
print(f"Score minimum: {np.min(eval_rewards)}")

In [None]:
# Enregistrement d'une vidéo de l'agent entraîné
print("Enregistrement d'une vidéo de l'agent...")

# Méthode 1: Utiliser gym.wrappers.RecordVideo (si disponible)
try:
    # Essayer avec le nouveau format gymnasium
    try:
        video_env = gym.make(env_name, render_mode='rgb_array')
        video_env = gym.wrappers.RecordVideo(
            video_env, 
            './video',
            episode_trigger=lambda x: x == 0
        )
    except:
        # Essayer avec l'ancien format gym
        video_env = gym.make(env_name)
        video_env = gym.wrappers.RecordVideo(
            video_env, 
            './video',
            episode_trigger=lambda x: x == 0
        )
    
    state = video_env.reset()
    if isinstance(state, tuple):
        state = state[0]
    
    done = False
    step_count = 0
    max_steps = 200  # MountainCar a une limite de 200 pas par défaut
    while not done and step_count < max_steps:
        q_values = q_model.predict(state[np.newaxis], verbose=0)
        action = np.argmax(q_values[0])
        
        result = video_env.step(action)
        if len(result) == 4:
            next_state, reward, done, _ = result
        else:
            next_state, reward, terminated, truncated, info = result
            done = terminated or truncated
        
        state = next_state
        step_count += 1
    
    video_env.close()
    print("✓ Vidéo enregistrée dans le dossier './video'")
    
except Exception as e:
    print(f"Méthode 1 échouée: {e}")
    print("Tentative avec une méthode alternative (capture manuelle)...")
    
    # Méthode 2: Capture manuelle des frames
    try:
        # Essayer de créer l'environnement avec render_mode
        try:
            env_for_video = gym.make(env_name, render_mode='rgb_array')
        except:
            env_for_video = gym.make(env_name)
        
        frames = []
        state = env_for_video.reset()
        if isinstance(state, tuple):
            state = state[0]
        
        done = False
        step_count = 0
        max_steps = 200  # MountainCar a une limite de 200 pas
        
        while not done and step_count < max_steps:
            # Essayer de capturer le frame
            try:
                # Pour gymnasium/gym nouveau format
                if hasattr(env_for_video, 'render'):
                    frame = env_for_video.render()
                    if frame is not None and isinstance(frame, np.ndarray):
                        frames.append(frame)
            except Exception as render_error:
                # Si render() ne fonctionne pas, continuer sans frames
                pass
            
            # Action de l'agent
            q_values = q_model.predict(state[np.newaxis], verbose=0)
            action = np.argmax(q_values[0])
            
            result = env_for_video.step(action)
            if len(result) == 4:
                next_state, reward, done, _ = result
            else:
                next_state, reward, terminated, truncated, info = result
                done = terminated or truncated
            
            state = next_state
            step_count += 1
        
        env_for_video.close()
        
        if frames:
            os.makedirs('video', exist_ok=True)
            video_path = 'video/mountaincar_agent.mp4'
            imageio.mimsave(video_path, frames, fps=30)
            print(f"✓ Vidéo sauvegardée: {video_path} ({len(frames)} frames)")
        else:
            print("⚠ Impossible de capturer les frames vidéo.")
            print("  L'environnement peut nécessiter une configuration graphique spécifique.")
            print("  La vidéo sera générée automatiquement par gym.wrappers.RecordVideo si disponible.")
            
    except Exception as e2:
        print(f"⚠ Erreur avec la méthode alternative: {e2}")
        print("  Note: L'enregistrement vidéo peut nécessiter des dépendances supplémentaires")
        print("  ou une configuration graphique. L'agent est néanmoins entraîné et fonctionnel.")