**1. Install Environment, Agent, and Libraries**:

---



In [None]:
# Install environment and agent
!pip install git+https://github.com/eleurent/highway-env

# TODO: we use the bleeding edge version because the current stable version does not support the latest gym>=0.21 versions. Revert back to stable at the next SB3 release.
!pip install git+https://github.com/DLR-RM/stable-baselines3

# Make sure agents are installed
!pip install git+https://github.com/eleurent/rl-agents #egg=rl-agents

# Environment
import gymnasium as gym
import highway_env

gym.register_envs(highway_env)

# Pull in agents from stable baseline
from stable_baselines3 import DQN
from stable_baselines3 import common

# Visualization utils
%load_ext tensorboard
import sys
from tqdm.notebook import trange
!pip install tensorboardx gym pyvirtualdisplay
!apt-get install -y xvfb ffmpeg
!git clone https://github.com/Farama-Foundation/HighwayEnv.git 2> /dev/null
sys.path.insert(0, '/content/HighwayEnv/scripts/')
from utils import record_videos, show_videos

In [None]:
# Load in tensorboard for DQN
%tensorboard --logdir "highway_dqn"
# Might need to hit the refresh button in the top right in order to see results #

**2. Train the DQN Model**:

---


In [None]:
# Make the highway environment
env = gym.make("highway-fast-v0")

# Define parameters of the DQN algorithm for highway environment
model = DQN('MlpPolicy', env,
                policy_kwargs=dict(net_arch=[256, 256]),
                learning_rate=5e-4,
                buffer_size=15000,
                learning_starts=200,
                batch_size=32,
                gamma=0.8,
                train_freq=1,
                gradient_steps=1,
                target_update_interval=50,
                exploration_fraction=0.7,
                verbose=1,
                tensorboard_log='highway_dqn')

# Run DQN Model n times (Tensorboard will show all runs)
n_runs = 1
for i in range(n_runs):
  model.learn(int(2e4))

# Save the model
model.save("DQN_highway")

**3. Evaluate the Trained Model**:

---


In [None]:
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np

# Make the highway environment
env = gym.make("highway-fast-v0")

# Load the trained agent
model = DQN.load("DQN_highway", env=env)

# Evaluate the model n number of times
n_eval_episodes = 10
reward, duration = evaluate_policy(model, env,
                                          n_eval_episodes=n_eval_episodes,
                                          return_episode_rewards=True,
                                          deterministic=True)


# Tabulate the results for easy viewing
from tabulate import tabulate

# Define array for number of tests ran when evaluating model
Test_Number = np.arange(1, n_eval_episodes+1)

# Print the table using tabulate
print("\n", tabulate({"Test Number": Test_Number,
                "Reward": reward,
                "Duration (s)": duration}, headers="keys"))

# Print the average reward
print("\nAverage Reward: ", np.mean(reward),"\n")

# Print success rate (duration agent lasted / max duration length)
env_max_duration = 30 # highway-fast_v0 duration is set to 30s
print("Success Rate: ", np.mean(duration)/env_max_duration*100,"% \n")


**4. Run Trained Model for N Episodes, record Performance Metrics**:

---


In [None]:
import pandas as pd

# Define the environment w/render mode
env = gym.make("highway-fast-v0", render_mode='human')

# Load the trained agent
model = DQN.load("DQN_highway", env=env)

# Return the current environment and reset it
vec_env = model.get_env()
obs = vec_env.reset()

# Initialize the performance data list
performance_data = []

# Define number of episodes to be ran
n_episodes = 20

for episode in range(n_episodes):
    done = truncated = False
    obs, info = env.reset()
    episode_reward = 0
    time_steps = 0

    while not (done or truncated):
        # print("Obs Shape - extract function: ", obs.shape)
        action, _states = model.predict(obs, deterministic=True)
        new_obs, reward, done, truncated, info = env.step(action)
        episode_reward += reward
        time_steps += 1

        # Append the performance data
        performance_data.append({
            'Reward': episode_reward,
            'Step': time_steps,
            'Speed': info.get('speed', np.nan),
            'Crashed': info.get('crashed', False),
            'Collision Reward': info.get('rewards', {}).get('collision_reward', 0),
            'Right Lane Reward': info.get('rewards', {}).get('right_lane_reward', 0),
            'High Speed Reward': info.get('rewards', {}).get('high_speed_reward', 0),
            'On Road Reward': info.get('rewards', {}).get('on_road_reward', 0),
        })

        obs = new_obs

# Create a DataFrame with the performance data
performance_df = pd.DataFrame(performance_data)

# Calculate the metrics
average_reward = performance_df['Reward'].mean()
reward_stddev = performance_df['Reward'].std()
observation_variance = performance_df['Speed'].var()
action_variance = performance_df['Reward'].var()
total_transitions = len(performance_df)
crash_rate = performance_df['Crashed'].sum()/n_episodes
average_collision_reward = performance_df['Collision Reward'].mean()
average_right_lane_reward = performance_df['Right Lane Reward'].mean()
average_high_speed_reward = performance_df['High Speed Reward'].mean()
average_on_road_reward = performance_df['On Road Reward'].mean()
reward_sparsity = (performance_df['Reward'] != 0).mean()
transition_rewards_variance = performance_df['Reward'].var()

# Print Performance Metrics
print("\nModel Performance Data:")
print("---------------------------")
print(f"Average Reward: {average_reward:.2f}")
print(f"Reward Standard Deviation: {reward_stddev:.2f}")
print(f"Observation Variance: {observation_variance:.2f}")
print(f"Action Variance: {action_variance:.2f}")
print(f"Total Transitions: {total_transitions}")
print(f"Crash Rate: {crash_rate:.2%}")
print(f"Average Collision Reward: {average_collision_reward:.2f}")
print(f"Average Right Lane Reward: {average_right_lane_reward:.2f}")
print(f"Average High Speed Reward: {average_high_speed_reward:.2f}")
print(f"Average On Road Reward: {average_on_road_reward:.2f}")
print(f"Reward Sparsity: {reward_sparsity:.2%}")
print(f"Transition Rewards Variance: {transition_rewards_variance:.2f}")

**5. Modification of the Environment Parameters:**
This section will take ~7 hours to run




---



In [None]:
# Load in tensorboard for DQN
%tensorboard --logdir "highway_dqn_mod1"

In [None]:
# Environment Modification #
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np

# Array to store reward vector and success rate
Results = []

# List of different environment configurations, with multiple parameters
env_configs = [
    {"lanes_count": 2, "vehicles_count": 50, "reward_speed_range": [20, 30]},
    {"lanes_count": 4, "vehicles_count": 50, "reward_speed_range": [20, 30]},
    {"lanes_count": 6, "vehicles_count": 50, "reward_speed_range": [20, 30]},
    {"lanes_count": 2, "vehicles_count": 100, "reward_speed_range": [20, 30]},
    {"lanes_count": 4, "vehicles_count": 100, "reward_speed_range": [20, 30]},
    {"lanes_count": 6, "vehicles_count": 100, "reward_speed_range": [20, 30]},
    {"lanes_count": 2, "vehicles_count": 25, "reward_speed_range": [40, 50]},
    {"lanes_count": 4, "vehicles_count": 25, "reward_speed_range": [40, 50]},
    {"lanes_count": 6, "vehicles_count": 25, "reward_speed_range": [40, 50]},
]

# Loop through the environment configurations
for config in env_configs:
    # Create the environment with the current config
    env = gym.make("highway-fast-v0", render_mode='human', config=config)

    # Define parameters of the DQN algorithm for highway environment
    model = DQN('MlpPolicy', env,
                    policy_kwargs=dict(net_arch=[256, 256]),
                    learning_rate=5e-4,
                    buffer_size=15000,
                    learning_starts=200,
                    batch_size=32,
                    gamma=0.8,
                    train_freq=1,
                    gradient_steps=1,
                    target_update_interval=50,
                    exploration_fraction=0.7,
                    verbose=1,
                    tensorboard_log='highway_dqn_mod1')

    # Run DQN Model n times (Tensorboard will show all runs)
    timesteps = 1e4
    model.learn(int(timesteps))

    # Evaluate the model n number of times
    n_eval_episodes = 10
    reward, duration = evaluate_policy(model, env,
                                              n_eval_episodes=n_eval_episodes,
                                              return_episode_rewards=True,
                                              deterministic=True)

    # Print the average reward
    avg_reward = np.mean(reward)

    # Print success rate (duration agent lasted / max duration length)
    env_max_duration = 30 # highway-fast_v0 duration is set to 30s
    success_rate = np.mean(duration)/env_max_duration*100

    Results.append({
        "config": config,
        "avg_reward": avg_reward,
        "success_rate": success_rate
    })

    # Reset the environment
    env.reset()

# Print the results
print(Results)

**6. Record Episode**:

---


In [None]:
# Record 3 episodes of the trained agent
env = gym.make("highway-fast-v0", render_mode='rgb_array', config=config)
env = record_videos(env)

for episode in trange(3, desc='Test episodes'):
    (obs, info), done = env.reset(), False
    while not done:
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = env.step(int(action))
env.close()
show_videos()