# 50% increase in the original number of vehicles

##### Imports

In [1]:
import gymnasium as gym
from matplotlib import pyplot as plt
import pprint
import gymnasium as gym
import highway_env
from stable_baselines3 import PPO
from highway_env.envs import MergeEnv, RoundaboutEnv, IntersectionEnv
from highway_env import utils
import numpy as np
%matplotlib inline

#### HighWay (Fast) Environment
###### **Defaul Settings**
###### **- algorithm:** PPO
###### **- timesteps:** 50 000
###### **- original vehicle's count:** 20
###### **- vehicle's count:** 30

In [2]:
env = gym.make("highway-fast-v0", render_mode='rgb_array', config={"vehicles_count": 20})
pprint.pprint(env.config)

{'action': {'type': 'DiscreteMetaAction'},
 'centering_position': [0.3, 0.5],
 'collision_reward': -1,
 'controlled_vehicles': 1,
 'duration': 30,
 'ego_spacing': 1.5,
 'high_speed_reward': 0.4,
 'initial_lane_id': None,
 'lane_change_reward': 0,
 'lanes_count': 3,
 'manual_control': False,
 'normalize_reward': True,
 'observation': {'type': 'Kinematics'},
 'offroad_terminal': False,
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_speed_range': [20, 30],
 'right_lane_reward': 0.1,
 'scaling': 5.5,
 'screen_height': 150,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 5,
 'vehicles_count': 20,
 'vehicles_density': 1}


  logger.warn(


##### Training the agent

In [3]:
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="highway_50_ppo/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("highway_50_ppo/model")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to highway_50_ppo/PPO_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 11.9     |
|    ep_rew_mean     | 8.85     |
| time/              |          |
|    fps             | 28       |
|    iterations      | 1        |
|    time_elapsed    | 70       |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 11.9       |
|    ep_rew_mean          | 8.91       |
| time/                   |            |
|    fps                  | 27         |
|    iterations           | 2          |
|    time_elapsed         | 146        |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.01654928 |
|    clip_fraction        | 0.236      |
|    clip_range           | 0.2        |


#### Merge Environment
###### **Defaul Settings**
###### **- algorithm:** PPO
###### **- timesteps:** 50 000
###### **- original vehicle's count:** 5
###### **- vehicle's count:** 7

In [4]:
# Personalized environment with a 50% increase in the number of vehicles
class CustomMergeEnv(MergeEnv):
    def _make_vehicles(self) -> None:
        """
        Create some new vehicles and add them to the road.
        """
        road = self.road
        ego_vehicle = self.action_type.vehicle_class(
            road, road.network.get_lane(("a", "b", 1)).position(30, 0), speed=30
        )
        road.vehicles.append(ego_vehicle)

        other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])

        # Originally there were 3 vehicles; with a 50% increase, now there are 5
        for position, speed in [(90, 29), (70, 31), (5, 31.5), (50, 30.5), (30, 32)]:
            lane = road.network.get_lane(("a", "b", self.np_random.integers(2)))
            position = lane.position(position + self.np_random.uniform(-5, 5), 0)
            speed += self.np_random.uniform(-1, 1)
            road.vehicles.append(other_vehicles_type(road, position, speed=speed))

        # Fusion vehicle
        merging_v = other_vehicles_type(
            road, road.network.get_lane(("j", "k", 0)).position(110, 0), speed=20
        )
        merging_v.target_speed = 30
        road.vehicles.append(merging_v)
        self.vehicle = ego_vehicle

In [5]:
env = CustomMergeEnv(render_mode='rgb_array')
pprint.pprint(env.config)

{'action': {'type': 'DiscreteMetaAction'},
 'centering_position': [0.3, 0.5],
 'collision_reward': -1,
 'high_speed_reward': 0.2,
 'lane_change_reward': -0.05,
 'manual_control': False,
 'merging_speed_reward': -0.5,
 'observation': {'type': 'Kinematics'},
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_speed_range': [20, 30],
 'right_lane_reward': 0.1,
 'scaling': 5.5,
 'screen_height': 150,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15}


##### Training the agent

In [6]:
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="merge_50_ppo/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("merge_50_ppo/model")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to merge_50_ppo/PPO_1
crashTrue
overFalse
crashTrue
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overTrue
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashFalse
overFalse
crashTrue
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashTrue
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse
overFalse
crashFalse

#### Roundabout Environment
###### **Defaul Settings**
###### **- algorithm:** PPO
###### **- timesteps:** 50 000
###### **- original vehicle's count:** 5
###### **- vehicle's count:** 8

In [7]:
# Personalized environment with a 50% increase in the number of vehicles
class CustomRoundaboutEnv(RoundaboutEnv):
    def _make_vehicles(self) -> None:
        """
        Create some new vehicles and add them to the road.
        """
        position_deviation = 2
        speed_deviation = 2

        # Ego-vehicle
        ego_lane = self.road.network.get_lane(("ser", "ses", 0))
        ego_vehicle = self.action_type.vehicle_class(
            self.road,
            ego_lane.position(125, 0),
            speed=8,
            heading=ego_lane.heading_at(140),
        )
        try:
            ego_vehicle.plan_route_to("nxs")
        except AttributeError:
            pass
        self.road.vehicles.append(ego_vehicle)
        self.vehicle = ego_vehicle

        # Incoming vehicle
        destinations = ["exr", "sxr", "nxr"]
        other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
        vehicle = other_vehicles_type.make_on_lane(
            self.road,
            ("we", "sx", 1),
            longitudinal=5 + self.np_random.normal() * position_deviation,
            speed=16 + self.np_random.normal() * speed_deviation,
        )

        if self.config["incoming_vehicle_destination"] is not None:
            destination = destinations[self.config["incoming_vehicle_destination"]]
        else:
            destination = self.np_random.choice(destinations)
        vehicle.plan_route_to(destination)
        vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Other vehicles (From 2 to 5 vehicles, 50% increase from original 2)
        for i in list(range(1, 2)) + list(range(-1, 0)) + [2, -2, 3]:
            vehicle = other_vehicles_type.make_on_lane(
                self.road,
                ("we", "sx", 0),
                longitudinal=20 * i + self.np_random.normal() * position_deviation,
                speed=16 + self.np_random.normal() * speed_deviation,
            )
            vehicle.plan_route_to(self.np_random.choice(destinations))
            vehicle.randomize_behavior()
            self.road.vehicles.append(vehicle)

        # Entering vehicle
        vehicle = other_vehicles_type.make_on_lane(
            self.road,
            ("eer", "ees", 0),
            longitudinal=50 + self.np_random.normal() * position_deviation,
            speed=16 + self.np_random.normal() * speed_deviation,
        )
        vehicle.plan_route_to(self.np_random.choice(destinations))
        vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

In [8]:
env = CustomRoundaboutEnv(render_mode='rgb_array')
pprint.pprint(env.config)

{'action': {'target_speeds': [0, 8, 16], 'type': 'DiscreteMetaAction'},
 'centering_position': [0.5, 0.6],
 'collision_reward': -1,
 'duration': 11,
 'high_speed_reward': 0.2,
 'incoming_vehicle_destination': None,
 'lane_change_reward': -0.05,
 'manual_control': False,
 'normalize_reward': True,
 'observation': {'absolute': True,
                 'features_range': {'vx': [-15, 15],
                                    'vy': [-15, 15],
                                    'x': [-100, 100],
                                    'y': [-100, 100]},
                 'type': 'Kinematics'},
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'right_lane_reward': 0,
 'scaling': 5.5,
 'screen_height': 600,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15}


##### Training the agent

In [9]:
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="roundabout_50_ppo/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("roundabout_50_ppo/model")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to roundabout_50_ppo/PPO_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 8.33     |
|    ep_rew_mean     | 7.11     |
| time/              |          |
|    fps             | 17       |
|    iterations      | 1        |
|    time_elapsed    | 120      |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 8.95        |
|    ep_rew_mean          | 7.74        |
| time/                   |             |
|    fps                  | 17          |
|    iterations           | 2           |
|    time_elapsed         | 240         |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.013692176 |
|    clip_fraction        | 0.158       |
|    clip_range           

#### Parking Environment
###### **Defaul Settings**
###### **- algorithm:** PPO
###### **- timesteps:** 50 000
###### **- original vehicle's count:** 0
###### **- vehicle's count:** 2

In [10]:
env = gym.make("parking-v0", render_mode='rgb_array', config={"vehicles_count": 2})
pprint.pprint(env.config)

{'action': {'type': 'ContinuousAction'},
 'add_walls': True,
 'centering_position': [0.5, 0.5],
 'collision_reward': -5,
 'controlled_vehicles': 1,
 'duration': 100,
 'manual_control': False,
 'observation': {'features': ['x', 'y', 'vx', 'vy', 'cos_h', 'sin_h'],
                 'normalize': False,
                 'scales': [100, 100, 5, 5, 1, 1],
                 'type': 'KinematicsGoal'},
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 5,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_weights': [1, 0.3, 0, 0, 0.02, 0.02],
 'scaling': 7,
 'screen_height': 300,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15,
 'steering_range': 0.7853981633974483,
 'success_goal_reward': 0.12,
 'vehicles_count': 2}


  logger.warn(


##### Training the agent

In [11]:
model = PPO('MultiInputPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="parking_50_ppo/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("parking_50_ppo/model")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to parking_50_ppo/PPO_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 71.4     |
|    ep_rew_mean     | -38.9    |
|    success_rate    | 0        |
| time/              |          |
|    fps             | 87       |
|    iterations      | 1        |
|    time_elapsed    | 23       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 79.3        |
|    ep_rew_mean          | -43         |
|    success_rate         | 0           |
| time/                   |             |
|    fps                  | 82          |
|    iterations           | 2           |
|    time_elapsed         | 49          |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.0061841

#### Intersection Environment
###### **Defaul Settings**
###### **- algorithm:** PPO
###### **- timesteps:** 50 000
###### **- original vehicle's count:** 10
###### **- vehicle's count:** 15

In [12]:
# Personalized environment with a 50% increase in the number of vehicles
class CustomIntersectionEnv(IntersectionEnv):
    def _make_vehicles(self, n_vehicles: int = 10) -> None:
        """
        Create some new vehicles and add them to the road.
        """
        # Increase the number of vehicles by 50%
        n_vehicles = int(n_vehicles * 1.5)

        vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
        vehicle_type.DISTANCE_WANTED = 7  # Low jam distance
        vehicle_type.COMFORT_ACC_MAX = 6
        vehicle_type.COMFORT_ACC_MIN = -3

        # Random vehicles
        simulation_steps = 3
        for t in range(n_vehicles - 1):
            self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])
        for _ in range(simulation_steps):
            [
                (
                    self.road.act(),
                    self.road.step(1 / self.config["simulation_frequency"]),
                )
                for _ in range(self.config["simulation_frequency"])
            ]

        # Ego-vehicle
        self._spawn_vehicle(
            60,
            spawn_probability=1,
            go_straight=True,
            position_deviation=0.1,
            speed_deviation=0,
        )

        # Incoming vehicle
        self.controlled_vehicles = []
        for ego_id in range(0, self.config["controlled_vehicles"]):
            ego_lane = self.road.network.get_lane(
                (f"o{ego_id % 4}", f"ir{ego_id % 4}", 0)
            )
            destination = self.config["destination"] or "o" + str(
                self.np_random.integers(1, 4)
            )
            ego_vehicle = self.action_type.vehicle_class(
                self.road,
                ego_lane.position(60 + 5 * self.np_random.normal(1), 0),
                speed=ego_lane.speed_limit,
                heading=ego_lane.heading_at(60),
            )
            try:
                ego_vehicle.plan_route_to(destination)
                ego_vehicle.speed_index = ego_vehicle.speed_to_index(
                    ego_lane.speed_limit
                )
                ego_vehicle.target_speed = ego_vehicle.index_to_speed(
                    ego_vehicle.speed_index
                )
            except AttributeError:
                pass

            self.road.vehicles.append(ego_vehicle)
            self.controlled_vehicles.append(ego_vehicle)
            for v in self.road.vehicles:  
                if (
                    v is not ego_vehicle
                    and np.linalg.norm(v.position - ego_vehicle.position) < 20
                ):
                    self.road.vehicles.remove(v)

In [13]:
env = CustomIntersectionEnv(render_mode='rgb_array')
pprint.pprint(env.config)

{'action': {'lateral': False,
            'longitudinal': True,
            'target_speeds': [0, 4.5, 9],
            'type': 'DiscreteMetaAction'},
 'arrived_reward': 1,
 'centering_position': [0.5, 0.6],
 'collision_reward': -5,
 'controlled_vehicles': 1,
 'destination': 'o1',
 'duration': 13,
 'high_speed_reward': 1,
 'initial_vehicle_count': 10,
 'manual_control': False,
 'normalize_reward': False,
 'observation': {'absolute': True,
                 'features': ['presence',
                              'x',
                              'y',
                              'vx',
                              'vy',
                              'cos_h',
                              'sin_h'],
                 'features_range': {'vx': [-20, 20],
                                    'vy': [-20, 20],
                                    'x': [-100, 100],
                                    'y': [-100, 100]},
                 'flatten': False,
                 'observe_intentions': False,


##### Training the agent

In [14]:
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="intersection_50_ppo/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("intersection_50_ppo/model")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to intersection_50_ppo/PPO_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 9.63     |
|    ep_rew_mean     | 1.15     |
| time/              |          |
|    fps             | 13       |
|    iterations      | 1        |
|    time_elapsed    | 150      |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 9.06        |
|    ep_rew_mean          | 2.21        |
| time/                   |             |
|    fps                  | 13          |
|    iterations           | 2           |
|    time_elapsed         | 305         |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.025327599 |
|    clip_fraction        | 0.314       |
|    clip_range         