In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import matplotlib.pyplot as plt
from IPython.display import clear_output, display
import time
import random
import gymnasium as gym
import highway_env
import collections
import os
from configuration2 import config_dict

# Vérifier si GPU est disponible et le configurer
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    print(f"GPU disponible: {physical_devices}")
    # Configurer le GPU pour l'entraînement
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    print("GPU configuré pour l'utilisation")
else:
    print("Aucun GPU détecté, utilisation du CPU")

In [None]:
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = collections.deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch_size = min(batch_size, len(self.buffer))
        samples = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*samples))
        return states, actions, rewards.reshape(-1, 1), next_states, dones.reshape(-1, 1)
    
    def __len__(self):
        return len(self.buffer)

In [None]:
class Actor:
    def __init__(self, state_dim, action_dim, action_bound):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.action_bound = action_bound
        self.model = self._build_network()
        self.optimizer = optimizers.Adam(learning_rate=0.001)  # Augmenté pour apprendre plus vite
    
    def _build_network(self):
        inputs = layers.Input(shape=self.state_dim)
        
        # Aplatir l'entrée si elle est multidimensionnelle
        x = layers.Flatten()(inputs)
        
        # Réseau plus petit et plus efficace
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(64, activation='relu')(x)
        
        outputs = layers.Dense(self.action_dim, activation='tanh')(x)
        outputs = layers.Lambda(lambda x: x * self.action_bound)(outputs)
        
        model = models.Model(inputs, outputs)
        return model
    
    def predict(self, state):
        if len(state.shape) == len(self.state_dim):
            state = np.expand_dims(state, axis=0)
        return self.model(state)
    
    def train(self, states, critic_grads):
        with tf.GradientTape() as tape:
            actions = self.model(states)
        actor_grads = tape.gradient(actions, self.model.trainable_variables, -critic_grads)
        self.optimizer.apply_gradients(zip(actor_grads, self.model.trainable_variables))

In [None]:
class Critic:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.model = self._build_network()
        self.optimizer = optimizers.Adam(learning_rate=0.002)
    
    def _build_network(self):
        # Entrée d'état
        state_input = layers.Input(shape=self.state_dim)
        state_out = layers.Flatten()(state_input)
        state_out = layers.Dense(128, activation='relu')(state_out)
        
        # Entrée d'action
        action_input = layers.Input(shape=(self.action_dim,))
        action_out = layers.Dense(64, activation='relu')(action_input)
        
        # Combiner les deux flux
        concat = layers.Concatenate()([state_out, action_out])
        x = layers.Dense(64, activation='relu')(concat)
        outputs = layers.Dense(1)(x)
        
        model = models.Model([state_input, action_input], outputs)
        return model
    
    def predict(self, state, action):
        if len(state.shape) == len(self.state_dim):
            state = np.expand_dims(state, axis=0)
        if len(action.shape) == 1:
            action = np.expand_dims(action, axis=0)
        return self.model([state, action])
    
    def train(self, states, actions, target_q):
        with tf.GradientTape() as tape:
            q_values = self.model([states, actions], training=True)
            loss = tf.reduce_mean(tf.square(target_q - q_values))
        
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        return loss
    
    def get_action_gradients(self, states, actions):
        actions = tf.convert_to_tensor(actions)
        with tf.GradientTape() as tape:
            tape.watch(actions)
            q_values = self.model([states, actions])
        return tape.gradient(q_values, actions)

In [None]:
class DDPGAgent:
    def __init__(self, state_dim, action_dim, action_bound, buffer_capacity=50000, batch_size=128):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.action_bound = action_bound
        self.buffer = ReplayBuffer(buffer_capacity)
        self.batch_size = batch_size
        self.gamma = 0.99
        self.tau = 0.005  # Augmenté pour une mise à jour plus rapide
        self.train_interval = 4  # Entraîner plus fréquemment pour apprendre plus vite
        self.steps_count = 0
        
        # Réseaux principaux
        self.actor = Actor(state_dim, action_dim, action_bound)
        self.critic = Critic(state_dim, action_dim)
        
        # Réseaux cibles
        self.target_actor = Actor(state_dim, action_dim, action_bound)
        self.target_critic = Critic(state_dim, action_dim)
        
        # Initialiser les poids des réseaux cibles
        self.update_target_networks(tau=1.0)
        
        self.noise_std = 0.4  # Plus de bruit pour l'exploration initiale
        self.noise_decay = 0.999  # Décroissance plus rapide du bruit
        self.min_noise = 0.05
    
    def update_target_networks(self, tau=None):
        tau = self.tau if tau is None else tau
        
        for source, target in zip(self.actor.model.variables, self.target_actor.model.variables):
            target.assign((1 - tau) * target + tau * source)
        
        for source, target in zip(self.critic.model.variables, self.target_critic.model.variables):
            target.assign((1 - tau) * target + tau * source)
    
    def get_action(self, state, add_noise=True):
        state = np.array(state, dtype=np.float32)
        if len(state.shape) == len(self.state_dim):
            state = np.expand_dims(state, axis=0)
        
        action = self.actor.predict(state)[0]
        
        if add_noise:
            noise = np.random.normal(0, self.noise_std, size=self.action_dim)
            action = np.clip(action + noise, -self.action_bound, self.action_bound)
            # Réduire le bruit au fil du temps
            self.noise_std = max(self.min_noise, self.noise_std * self.noise_decay)
        
        return action
    
    def remember(self, state, action, reward, next_state, done):
        self.buffer.add(state, action, reward, next_state, done)
    
    def train(self):
        self.steps_count += 1
        if self.steps_count % self.train_interval != 0 or len(self.buffer) < self.batch_size:
            return None
        
        # Échantillonner un batch d'expériences
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
        
        # Convertir en tenseurs TensorFlow
        states = tf.convert_to_tensor(states, dtype=tf.float32)
        actions = tf.convert_to_tensor(actions, dtype=tf.float32)
        rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
        next_states = tf.convert_to_tensor(next_states, dtype=tf.float32)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)
        
        # Entraîner le critique
        target_actions = self.target_actor.predict(next_states)
        target_q = self.target_critic.predict(next_states, target_actions)
        target_q = rewards + self.gamma * target_q * (1 - dones)
        
        critic_loss = self.critic.train(states, actions, target_q)
        
        # Entraîner l'acteur
        actions_pred = self.actor.predict(states)
        critic_grads = self.critic.get_action_gradients(states, actions_pred)
        self.actor.train(states, critic_grads)
        
        # Mettre à jour les réseaux cibles
        self.update_target_networks()
        
        return critic_loss

In [None]:
def train_ddpg(env, agent, episodes=1000, max_steps_per_episode=200, eval_interval=50):
    """Entraîne l'agent DDPG sans rendu pendant l'entraînement pour éviter les crashs"""
    episode_rewards = []
    episode_steps = []
    critic_losses = []  # Pour suivre la loss du critique
    best_reward = -float('inf')
    
    for episode in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        truncated = False
        step = 0
        episode_losses = []  # Liste pour stocker les losses de cet épisode
        
        while not (done or truncated) and step < max_steps_per_episode:
            # Sélectionner une action
            action = agent.get_action(state)
            
            # Exécuter l'action
            next_state, reward, done, truncated, _ = env.step(action)
            
            # Stocker l'expérience
            agent.remember(state, action, reward, next_state, done or truncated)
            
            # Entraîner l'agent et récupérer la loss
            loss = agent.train()
            if loss is not None:
                episode_losses.append(float(loss))
            
            # Mise à jour de l'état et du compteur
            state = next_state
            episode_reward += reward
            step += 1
        
        episode_rewards.append(episode_reward)
        episode_steps.append(step)
        
        # Calculer la loss moyenne de l'épisode
        avg_loss = np.mean(episode_losses) if episode_losses else 0
        critic_losses.append(avg_loss)
        
        # Afficher la progression tous les 10 épisodes
        if episode % 10 == 0:
            avg_reward = np.mean(episode_rewards[-10:])
            print(f"Épisode {episode}/{episodes}, Récompense: {episode_reward:.2f}, Moyenne des 10 dernières: {avg_reward:.2f}, Loss: {avg_loss:.4f}, Bruit: {agent.noise_std:.4f}, Étapes: {step}")
        
        # Évaluation périodique sans rendu
        if episode % eval_interval == 0:
            eval_reward = evaluate_agent(env, agent, episodes=1, render=False)
            print(f"Évaluation à l'épisode {episode}: {eval_reward:.2f}")
            
            # Sauvegarder le meilleur modèle
            if eval_reward > best_reward:
                best_reward = eval_reward
                print(f"Nouveau meilleur modèle trouvé! Récompense: {best_reward:.2f}")
    
    return episode_rewards, episode_steps, critic_losses

In [None]:
def evaluate_agent(env, agent, episodes=5, render=True):
    """Évalue l'agent sur plusieurs épisodes"""
    total_rewards = []
    
    # Configuration du rendu (uniquement si render=True)
    fig = None
    ax = None
    img_plot = None
    
    if render:
        plt.figure(figsize=(8, 8))
        fig, ax = plt.subplots(figsize=(8, 8))
        plt.title("Évaluation de l'agent")
    
    for episode in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        truncated = False
        step = 0
        frames = []  # Pour collecter les images à afficher après
        
        while not (done or truncated) and step < 300:
            # Choisir l'action sans bruit
            action = agent.get_action(state, add_noise=False)
            
            # Exécuter l'action
            next_state, reward, done, truncated, _ = env.step(action)
            
            # Collecter les images pour affichage ultérieur si rendu demandé
            if render:
                frames.append(env.render())
            
            # Mettre à jour l'état et la récompense
            state = next_state
            episode_reward += reward
            step += 1
        
        # Afficher quelques images de l'épisode à la fin plutôt qu'en temps réel
        if render and len(frames) > 0:
            for i, frame in enumerate(frames[::10]):  # Afficher 1 image sur 10
                plt.clf()
                plt.imshow(frame)
                plt.title(f"Épisode {episode+1}, Step {i*10}, Reward: {episode_reward:.2f}")
                plt.pause(0.1)
        
        total_rewards.append(episode_reward)
        print(f"Épisode d'évaluation {episode+1}/{episodes}, Récompense: {episode_reward:.2f}, Étapes: {step}")
    
    if fig is not None:
        plt.close(fig)
    
    avg_reward = np.mean(total_rewards)
    return avg_reward

In [None]:
# Modifier la configuration pour accélérer l'apprentissage et terminer quand l'agent sort de la route
config_dict["simulation_frequency"] = 30  # Simulation plus rapide
config_dict["policy_frequency"] = 10
config_dict["duration"] = 150  # Réduire pour des épisodes plus courts
config_dict["offscreen_rendering"] = True  # Désactiver le rendu pendant l'entraînement
config_dict["render_mode"] = "rgb_array"
config_dict["offroad_terminal"] = True  # S'assurer que l'épisode se termine quand l'agent sort de la route
config_dict["terminal_on_out_of_road"] = True  # Autre paramètre pour terminer l'épisode hors route
config_dict["out_of_road_cost"] = -15  # Pénalité plus forte si l'agent sort de la route

# Créer et configurer l'environnement
gym.register_envs(highway_env)
env = gym.make("racetrack-v0", render_mode="rgb_array")
env.unwrapped.configure(config_dict)

# Observer les dimensions de l'état et de l'action
state, _ = env.reset()
print(f"Forme de l'observation: {state.shape}")
print(f"Espace d'action: {env.action_space}")

# Définir les dimensions d'état et d'action
state_dim = state.shape
action_dim = env.action_space.shape[0]
action_bound = 1.0

# Créer l'agent DDPG avec des paramètres optimisés
agent = DDPGAgent(
    state_dim=state_dim,
    action_dim=action_dim,
    action_bound=action_bound,
    buffer_capacity=100000,
    batch_size=256  # Plus grande batch size pour utiliser le GPU efficacement
)

print(f"Agent créé avec state_dim={state_dim}, action_dim={action_dim}, action_bound={action_bound}")

In [None]:
# Paramètres d'entraînement
episodes = 200  # Nombre total d'épisodes
max_steps = 200  # Maximum d'étapes par épisode

# Entraîner l'agent sans rendu pour éviter les crashs
rewards, steps, critic_losses = train_ddpg(
    env=env,
    agent=agent,
    episodes=episodes,
    max_steps_per_episode=max_steps,
    eval_interval=50  # Évaluer tous les 50 épisodes
)

In [None]:
plt.figure(figsize=(15, 5))

# Courbe de récompense
plt.subplot(1, 3, 1)
plt.plot(rewards)
plt.title('Récompense par épisode')
plt.xlabel('Épisode')
plt.ylabel('Récompense')
plt.grid(True)

# Courbe du nombre d'étapes
plt.subplot(1, 3, 2)
plt.plot(steps)
plt.title("Nombre d'étapes par épisode")
plt.xlabel('Épisode')
plt.ylabel('Étapes')
plt.grid(True)

# Courbe des losses
plt.subplot(1, 3, 3)
plt.plot(critic_losses)
plt.title("Loss du critique par épisode")
plt.xlabel('Épisode')
plt.ylabel('Loss')
plt.grid(True)

plt.tight_layout()
plt.savefig('training_curves.png')  # Sauvegarder l'image des courbes
plt.show()

# Calculer et afficher les moyennes glissantes
window_size = 50
rewards_smooth = [np.mean(rewards[max(0, i-window_size):i+1]) for i in range(len(rewards))]

plt.figure(figsize=(10, 5))
plt.plot(rewards, alpha=0.3, label='Récompenses brutes')
plt.plot(rewards_smooth, label='Moyenne glissante')
plt.title('Récompense par épisode (avec moyenne glissante)')
plt.xlabel('Épisode')
plt.ylabel('Récompense')
plt.legend()
plt.grid(True)
plt.savefig('rewards_smoothed.png')  # Sauvegarder l'image des récompenses lissées
plt.show()

In [None]:
def demonstrate_agent(agent, num_episodes=3, max_steps=10000):
    """Démontre l'agent entraîné dans une fenêtre"""
    # Créer un nouvel environnement pour la démonstration
    demo_config = config_dict.copy()
    demo_config["offscreen_rendering"] = False  # Activer le rendu dans une fenêtre
    demo_config["render_mode"] = "human"  # Mode de rendu en temps réel
    demo_config["screen_width"] = 800  # Taille de la fenêtre plus grande
    demo_config["screen_height"] = 600
    demo_config["simulation_frequency"] = 15  # Ralentir un peu pour mieux voir
    
    # Créer l'environnement de démonstration
    demo_env = gym.make("racetrack-v0", render_mode="human")
    demo_env.unwrapped.configure(demo_config)
    
    print("\nDémonstration de l'agent entraîné:")
    total_rewards = []
    
    try:
        for episode in range(num_episodes):
            state, _ = demo_env.reset()
            episode_reward = 0
            done = False
            truncated = False
            step = 0
            
            while not (done or truncated) and step < max_steps:
                # Choisir l'action sans exploration (sans bruit)
                action = agent.get_action(state, add_noise=False)
                
                # Exécuter l'action
                next_state, reward, done, truncated, _ = demo_env.step(action)
                
                # Mettre à jour l'état et la récompense
                state = next_state
                episode_reward += reward
                step += 1
            
            total_rewards.append(episode_reward)
            print(f"Épisode de démonstration {episode+1}/{num_episodes}, Récompense: {episode_reward:.2f}, Étapes: {step}")
    
    except KeyboardInterrupt:
        print("Démonstration interrompue par l'utilisateur")
    
    finally:
        # Fermer l'environnement proprement
        demo_env.close()
        
    avg_reward = np.mean(total_rewards)
    print(f"Récompense moyenne de démonstration: {avg_reward:.2f}")
    
    return avg_reward

# Pour lancer la démonstration de l'agent après l'entraînement
print("Appuyez sur Ctrl+C dans la fenêtre de sortie pour arrêter la démonstration.")
demonstrate_agent(agent)

In [None]:
# Configuration pour l'évaluation finale avec sauvegarde d'images
def evaluate_and_save_images(env, agent, episodes=3, max_steps=300):
    """Évalue l'agent et sauvegarde des images"""
    import os
    img_dir = "evaluation_images"
    os.makedirs(img_dir, exist_ok=True)
    
    total_rewards = []
    
    for episode in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        truncated = False
        step = 0
        frames = []
        
        while not (done or truncated) and step < max_steps:
            # Choisir l'action sans bruit
            action = agent.get_action(state, add_noise=False)
            
            # Exécuter l'action
            next_state, reward, done, truncated, _ = env.step(action)
            
            # Collecter l'image
            frames.append(env.render())
            
            # Mettre à jour l'état et la récompense
            state = next_state
            episode_reward += reward
            step += 1
        
        # Sauvegarder des images clés de l'épisode
        for i in range(0, len(frames), max(1, len(frames) // 10)):  # ~10 images par épisode
            plt.figure(figsize=(10, 8))
            plt.imshow(frames[i])
            plt.title(f"Épisode {episode+1}, Step {i}, Reward: {episode_reward:.2f}")
            plt.savefig(f"{img_dir}/ep{episode+1}_step{i}.png")
            plt.close()
        
        # Sauvegarder l'image finale
        if frames:
            plt.figure(figsize=(10, 8))
            plt.imshow(frames[-1])
            plt.title(f"Épisode {episode+1} Final, Reward: {episode_reward:.2f}")
            plt.savefig(f"{img_dir}/ep{episode+1}_final.png")
            plt.close()
            
        total_rewards.append(episode_reward)
        print(f"Épisode d'évaluation {episode+1}/{episodes}, Récompense: {episode_reward:.2f}, Étapes: {step}")
    
    avg_reward = np.mean(total_rewards)
    print(f"Récompense moyenne: {avg_reward:.2f}")
    print(f"Images sauvegardées dans le dossier: {img_dir}")
    return avg_reward

# Évaluer et sauvegarder des images
print("\nÉvaluation finale avec sauvegarde d'images:")
final_reward = evaluate_and_save_images(env, agent, episodes=3)

In [None]:
def demonstrate_agent(agent, num_episodes=3, max_steps=300):
    """Démontre l'agent entraîné dans une fenêtre"""
    # Créer un nouvel environnement pour la démonstration
    demo_config = dict(config_dict)  # Copie du dictionnaire de configuration
    demo_config["offscreen_rendering"] = False
    demo_config["render_mode"] = "human"
    demo_config["screen_width"] = 800
    demo_config["screen_height"] = 600
    demo_config["simulation_frequency"] = 15  # Ralentir pour mieux voir
    
    # Créer l'environnement de démonstration
    demo_env = gym.make("racetrack-v0", render_mode="human")
    demo_env.unwrapped.configure(demo_config)
    
    print("\nDémonstration de l'agent entraîné:")
    total_rewards = []
    
    try:
        for episode in range(num_episodes):
            state, _ = demo_env.reset()
            episode_reward = 0
            done = False
            truncated = False
            step = 0
            
            while not (done or truncated) and step < max_steps:
                # Choisir l'action sans exploration (sans bruit)
                action = agent.get_action(state, add_noise=False)
                
                # Exécuter l'action
                next_state, reward, done, truncated, _ = demo_env.step(action)
                
                # Mettre à jour l'état et la récompense
                state = next_state
                episode_reward += reward
                step += 1
            
            total_rewards.append(episode_reward)
            print(f"Épisode de démonstration {episode+1}/{num_episodes}, Récompense: {episode_reward:.2f}, Étapes: {step}")
    
    except KeyboardInterrupt:
        print("Démonstration interrompue par l'utilisateur")
    
    finally:
        # Fermer l'environnement proprement
        demo_env.close()
        
    avg_reward = np.mean(total_rewards)
    print(f"Récompense moyenne de démonstration: {avg_reward:.2f}")
    
    return avg_reward

# Pour lancer la démonstration de l'agent après l'entraînement
print("\nDémonstration de l'agent dans une fenêtre:")
print("Appuyez sur Ctrl+C dans la fenêtre de sortie pour arrêter la démonstration.")
demonstrate_agent(agent)