## Import des librairies

In [2]:
import gymnasium
import highway_env
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.monitor import Monitor

## Configuration de l'environnement

In [3]:
# Configuration de style pour les graphiques
sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (12, 8)
plt.rcParams["font.size"] = 12

# Création du répertoire pour les logs et les modèles sauvegardés
log_dir = "highway_dqn_logs/"
model_dir = "highway_dqn_models/"
os.makedirs(log_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

# Importation de la configuration spécifique à l'environnement
from config.config_sb import config_dict

## Fonctions Utiles

In [4]:
# Callback personnalisé pour suivre l'apprentissage
class TrainingMonitorCallback(BaseCallback):
    def __init__(self, check_freq=1000, save_path=None, verbose=1):
        super(TrainingMonitorCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
        self.rewards_history = []
        self.episode_lengths = []
        self.current_episode_reward = 0
        self.current_episode_length = 0
        self.collision_count = 0
        self.lane_changes = 0
        self.speeds = []

    def _on_step(self):
        # Récupérer les informations de l'étape actuelle
        info = (
            self.locals.get("infos")[0] if "infos" in self.locals else {}
        )
        reward = self.locals.get("rewards")[0] if "rewards" in self.locals else 0
        done = self.locals.get("dones")[0] if "dones" in self.locals else False

        # Incrémenter les compteurs d'épisode
        self.current_episode_reward += reward
        self.current_episode_length += 1

        # Collecter des statistiques de conduite si disponibles
        if isinstance(info, dict):
            # Adaptation pour la nouvelle structure d'info
            if "speed" in info:
                self.speeds.append(info["speed"])
            if "crashed" in info and info["crashed"]:
                self.collision_count += 1
            if "action" in info:
                action = info["action"]
                if isinstance(action, np.ndarray) and action.size == 1:
                    action = action.item()
                if action in [0, 2]:
                    self.lane_changes += 1

        # À la fin d'un épisode, enregistrer les statistiques et réinitialiser
        if done:
            self.rewards_history.append(self.current_episode_reward)
            self.episode_lengths.append(self.current_episode_length)
            self.current_episode_reward = 0
            self.current_episode_length = 0

        # Sauvegarder le modèle périodiquement
        if self.n_calls % self.check_freq == 0 and self.save_path is not None:
            path = os.path.join(self.save_path, f"model_{self.n_calls}_steps")
            self.model.save(path)
            if self.verbose > 0:
                print(f"Model saved to {path}")

            # Générer et sauvegarder des graphiques d'apprentissage
            self._plot_training_progress()

        return True

    def _plot_training_progress(self):
        """Génère des graphiques montrant la progression de l'apprentissage"""
        # Vérifier s'il y a des données à tracer
        if not self.rewards_history:
            return
        # Récompenses par épisode
        plt.figure(figsize=(15, 10))

        # Subplot pour les récompenses
        plt.subplot(2, 2, 1)
        plt.plot(self.rewards_history, label="Récompense par épisode")
        if len(self.rewards_history) >= 10:  # Seulement si assez de données
            plt.plot(
                pd.Series(self.rewards_history).rolling(10).mean(),
                label="Moyenne mobile (10 épisodes)",
            )
        plt.xlabel("Épisode")
        plt.ylabel("Récompense cumulée")
        plt.title("Évolution des récompenses pendant l'apprentissage")
        plt.legend()
        plt.grid(True)

        # Subplot pour la durée des épisodes
        plt.subplot(2, 2, 2)
        plt.plot(self.episode_lengths)
        plt.xlabel("Épisode")
        plt.ylabel("Nombre de pas")
        plt.title("Durée des épisodes pendant l'apprentissage")
        plt.grid(True)

        # Subplot pour les collisions
        if len(self.rewards_history) > 0:
            collision_rate = self.collision_count / len(self.rewards_history)
            plt.subplot(2, 2, 3)
            plt.bar(["Taux de collision"], [collision_rate])
            plt.ylabel("Collisions par épisode")
            plt.title(f"Taux de collision: {collision_rate:.4f}")

        # Subplot pour la distribution des vitesses
        if len(self.speeds) > 0:
            plt.subplot(2, 2, 4)
            sns.histplot(self.speeds, kde=True)
            plt.xlabel("Vitesse")
            plt.title("Distribution des vitesses")

        plt.tight_layout()
        plt.savefig(
            os.path.join(self.save_path, f"training_progress_{self.n_calls}.png")
        )
        plt.close()


# Fonction pour évaluer le modèle entraîné
def evaluate_and_visualize(model, env, num_episodes=10, display = False):
    """Évalue le modèle et génère des visualisations sur ses performances"""
    # Variables pour collecter les statistiques
    rewards = []
    episode_lengths = []
    speeds = []
    actions = []  # Stockage des actions au lieu des positions de voie
    collisions = 0
    lane_changes = 0

    # Collecter des statistiques détaillées
    for episode in range(num_episodes):
        done = truncated = False
        obs, info = env.reset()
        episode_reward = 0
        episode_length = 0
        episode_speeds = []
        episode_actions = []

        while not (done or truncated):
            action, _states = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, info = env.step(action)

            # Collecte de statistiques
            episode_reward += reward
            episode_length += 1

            if isinstance(info, dict):
                if "speed" in info:
                    episode_speeds.append(info["speed"])
                if "action" in info:
                    a = info["action"]
                    if isinstance(a, np.ndarray) and a.size == 1:
                        a = a.item()
                    episode_actions.append(a)
                    if a in [0, 2]:  # Actions de changement de voie
                        lane_changes += 1
                if "crashed" in info and info["crashed"]:
                    collisions += 1
                    
                if display == True :
                    env.render()  # Rendu de l'environnement pour visualisation

        # Enregistrer les statistiques de l'épisode
        rewards.append(episode_reward)
        episode_lengths.append(episode_length)
        speeds.extend(episode_speeds)
        actions.extend(episode_actions)

        print(
            f"Épisode {episode+1}: Récompense = {episode_reward:.2f}, Durée = {episode_length}"
        )

    # Créer des visualisations pour le rapport
    create_evaluation_visualizations(
        rewards,
        episode_lengths,
        speeds,
        actions,  # Utilisation des actions au lieu des voies
        collisions / num_episodes,
        lane_changes / num_episodes,
    )

    return np.mean(rewards), np.std(rewards)


def create_evaluation_visualizations(
    rewards, lengths, speeds, actions, collision_rate, lane_change_rate
):
    """Crée des visualisations détaillées sur les performances du modèle"""
    # Vérifier si des données sont disponibles
    if not rewards:
        print("Pas de données d'évaluation disponibles pour créer des visualisations")
        return

    # Configurer une figure avec plusieurs sous-graphiques
    plt.figure(figsize=(15, 12))

    # 1. Distribution des récompenses par épisode
    plt.subplot(2, 3, 1)
    sns.barplot(x=list(range(1, len(rewards) + 1)), y=rewards)
    plt.xlabel("Épisode")
    plt.ylabel("Récompense totale")
    plt.title("Récompenses par épisode")

    # 2. Distribution des durées d'épisode
    plt.subplot(2, 3, 2)
    sns.barplot(x=list(range(1, len(lengths) + 1)), y=lengths)
    plt.xlabel("Épisode")
    plt.ylabel("Nombre de pas")
    plt.title("Durée des épisodes")

    # 3. Distribution des vitesses
    plt.subplot(2, 3, 3)
    if speeds:
        sns.histplot(speeds, kde=True)
        plt.xlabel("Vitesse")
        plt.ylabel("Fréquence")
        plt.title("Distribution des vitesses")
    else:
        plt.text(
            0.5, 0.5, "Pas de données de vitesse disponibles", ha="center", va="center"
        )
        plt.axis("off")

    # 4. Distribution des actions
    plt.subplot(2, 3, 4)
    if actions:
        # Comptage des actions
        action_counts = pd.Series(actions).value_counts().sort_index()
        action_names = {
            0: "LANE_LEFT",
            1: "IDLE",
            2: "LANE_RIGHT",
            3: "FASTER",
            4: "SLOWER"
        }
        
        # Pour les indices existants uniquement
        labels = [action_names.get(i, f"Action {i}") for i in action_counts.index]
        
        sns.barplot(x=labels, y=action_counts.values)
        plt.xlabel("Action")
        plt.ylabel("Fréquence")
        plt.title("Distribution des actions")
        plt.xticks(rotation=45)
    else:
        plt.text(
            0.5, 0.5, "Pas de données d'action disponibles", ha="center", va="center"
        )
        plt.axis("off")

    # 5. Affichage des taux de collision et de changement de voie
    plt.subplot(2, 3, 5)
    rates = [collision_rate, lane_change_rate]
    print(f"Taux de collision: {collision_rate:.4f}, Taux de demande de changement de voie: {lane_change_rate:.4f}")
    labels = ["Taux de collision", "Taux de changement de voie"]
    sns.barplot(x=labels, y=rates)
    plt.ylabel("Taux par épisode")
    plt.title("Statistiques de conduite")

    plt.tight_layout()
    plt.savefig(os.path.join(log_dir, "evaluation_results.png"), dpi=300)
    plt.close()

    # Créer un graphique séparé pour l'analyse de la politique si suffisamment de données
    if speeds and len(rewards) > 5:
        create_policy_analysis(rewards, speeds, actions, collision_rate)


def create_policy_analysis(rewards, speeds, actions, collision_rate):
    """Crée une visualisation analysant la politique apprise"""
    plt.figure(figsize=(12, 6))

    # Relation entre vitesse moyenne et récompense par épisode
    if len(rewards) > 5:
        avg_speeds = []
        for i in range(len(rewards)):
            start = i * (len(speeds) // len(rewards))
            end = (
                (i + 1) * (len(speeds) // len(rewards))
                if i < len(rewards) - 1
                else len(speeds)
            )
            if start < len(speeds) and end <= len(speeds) and start < end:
                avg_speeds.append(np.mean(speeds[start:end]))
            else:
                avg_speeds.append(0)  # Valeur par défaut si l'indice est hors limite

        plt.scatter(avg_speeds, rewards, s=100, alpha=0.7)

        # Ajouter une ligne de tendance
        z = np.polyfit(avg_speeds, rewards, 1)
        p = np.poly1d(z)
        plt.plot(sorted(avg_speeds), p(sorted(avg_speeds)), "r--", alpha=0.8)

        plt.xlabel("Vitesse moyenne par épisode")
        plt.ylabel("Récompense totale par épisode")
        plt.title("Relation entre vitesse et récompense (politique apprise)")
        plt.grid(True)

    plt.tight_layout()
    plt.savefig(os.path.join(log_dir, "policy_analysis.png"), dpi=300)
    plt.close()

    # Ajout d'un graphique pour analyser les composantes de récompense
    plt.figure(figsize=(12, 6))
    
    plt.text(0.5, 0.5, 
             f"Analyse des performances:\n\n"
             f"Taux de collision: {collision_rate:.4f} par épisode\n"
             f"Vitesse moyenne: {np.mean(speeds):.2f}\n"
             f"Récompense moyenne: {np.mean(rewards):.2f}",
             ha="center", va="center", fontsize=12)
    plt.axis("off")
    plt.tight_layout()
    plt.savefig(os.path.join(log_dir, "reward_components_analysis.png"), dpi=300)
    plt.close()


def plot_training_rewards(rewards_history):
    """Trace la courbe des récompenses d'apprentissage"""
    if not rewards_history:
        print("Pas de données d'entraînement disponibles pour créer des visualisations")
        return

    plt.figure(figsize=(12, 6))
    # plt.plot(rewards_history, label="Récompense par épisode")

    # Ajouter une moyenne mobile si suffisamment de données
    if len(rewards_history) >= 10:
        window_size = max(5, len(rewards_history) // 20)
        smoothed_rewards = pd.Series(rewards_history).rolling(window=window_size).mean()
        plt.plot(
            smoothed_rewards,
            "r-",
            linewidth=2,
            label=f"Moyenne mobile ({window_size} épisodes)",
        )

    plt.xlabel("Épisode")
    plt.ylabel("Récompense cumulée")
    plt.title("Courbe d'apprentissage")
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.savefig(os.path.join(log_dir, "learning_curve.png"), dpi=300)
    plt.close()



## Initialisation de l'environnement

In [5]:
# Créer et configurer l'environnement
env = gymnasium.make("highway-fast-v0", render_mode="rgb_array")
env.unwrapped.configure(config_dict)

# Wrapper l'environnement avec Monitor pour enregistrer les statistiques
env = Monitor(env, log_dir)

# Afficher les informations sur l'espace d'observation
obs_space = env.observation_space
print(f"Espace d'observation: {obs_space}")

# Tester l'environnement
obs, info = env.reset()
print(f"Forme de l'observation: {obs.shape}")
print(f"Exemple d'info: {info}")

Espace d'observation: Box(-inf, inf, (5, 5), float32)
Forme de l'observation: (3, 3, 20)
Exemple d'info: {'speed': 25, 'crashed': False, 'action': 1, 'rewards': {'collision_reward': 0.0, 'right_lane_reward': 1.0, 'high_speed_reward': 0.7, 'on_road_reward': 1.0}}


## Entraînement

In [6]:
# Initialiser le callback pour le suivi de l'apprentissage
callback = TrainingMonitorCallback(check_freq=1000, save_path=model_dir, verbose=1)

# Créer et entraîner le modèle DQN
model = DQN(
    "MlpPolicy",
    env,
    policy_kwargs=dict(net_arch=[256, 256]),
    learning_rate=5e-4,
    buffer_size=15000,
    learning_starts=200,
    batch_size=32,
    gamma=0.8,
    train_freq=1,
    gradient_steps=1,
    target_update_interval=50,
    verbose=1,
    tensorboard_log=None,
)

# Entraîner le modèle avec le callback personnalisé
model.learn(total_timesteps=20000, callback=callback)

# Sauvegarder le modèle final
final_model_path = os.path.join(model_dir, "model_final")
model.save(final_model_path)
print(f"Modèle final sauvegardé à {final_model_path}")

# Tracer la courbe d'apprentissage à partir des données collectées
plot_training_rewards(callback.rewards_history)

Using cpu device
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 35.8     |
|    ep_rew_mean      | 30.5     |
|    exploration_rate | 0.932    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 31       |
|    time_elapsed     | 4        |
|    total_timesteps  | 143      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 41.1     |
|    ep_rew_mean      | 35.8     |
|    exploration_rate | 0.844    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 25       |
|    time_elapsed     | 13       |
|    total_timesteps  | 329      |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 0.0511   |
|    n_updates        | 128      |
----------------------------------
----------------------------------
| r

## Évaluation

In [None]:
# Évaluer et visualiser les performances du modèle final
print("\nÉvaluation du modèle final:")
# Il faut ajuster la valeur de Display pour afficher ou non l'environnement
display = True
mean_reward, std_reward = evaluate_and_visualize(model, env, num_episodes=100, display = display)
print(f"Récompense moyenne: {mean_reward:.2f} ± {std_reward:.2f}")

# Fermer l'environnement
env.close()

## Exemple sur 3 épisodes

In [15]:
from IPython.display import HTML

# Remplace le chemin par celui de ta vidéo
video_path = "evaluation_video.mp4"

HTML(f'<video width="600" controls><source src="{video_path}" type="video/mp4"></video>')