In [None]:
# imports - ignore
# !pip install gymnasium stable_baselines3[extra] opencv-python highway_env

In [18]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import DQN

import highway_env  # noqa: F401

In [None]:
# for new roundabout
from highway_env.envs.roundabout_env import RoundaboutEnv as OriginalRoundabout
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.road.lane import CircularLane, LineType, SineLane, StraightLane
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import MDPVehicle
import numpy as np 
class MyRoundaboutEnv(OriginalRoundabout):
    @classmethod
    def default_config(cls):
        config = super().default_config()
        config.update({
            "roundabout_radius": 20  # new default radius
        })
        return config

    def _make_road(self):
        center = [0, 0]
        radius = self.config["roundabout_radius"]
        radii = [radius, radius + 4, radius + 2 * 4]
        alpha = 24  # [deg]

        net = RoadNetwork()
        # radii = [radius, radius + 4]
        n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
        # line = [[c, s], [n, c]]
        line = [
            [c, s],  # inner lane
            [s, s],  # middle lane
            [n, c],  # outer lane
        ]
        for lane in [0,1,2]:#[0, 1]:
            net.add_lane(
                "se",
                "ex",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(90 - alpha),
                    np.deg2rad(alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "ex",
                "ee",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(alpha),
                    np.deg2rad(-alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "ee",
                "nx",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(-alpha),
                    np.deg2rad(-90 + alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "nx",
                "ne",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(-90 + alpha),
                    np.deg2rad(-90 - alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "ne",
                "wx",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(-90 - alpha),
                    np.deg2rad(-180 + alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "wx",
                "we",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(-180 + alpha),
                    np.deg2rad(-180 - alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "we",
                "sx",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(180 - alpha),
                    np.deg2rad(90 + alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )
            net.add_lane(
                "sx",
                "se",
                CircularLane(
                    center,
                    radii[lane],
                    np.deg2rad(90 + alpha),
                    np.deg2rad(90 - alpha),
                    clockwise=False,
                    line_types=line[lane],
                ),
            )

        # Access lanes: (r)oad/(s)ine
        access = 170  # [m]
        dev = 85  # [m]
        a = 5  # [m]
        delta_st = 0.2 * dev  # [m]

        delta_en = dev - delta_st
        w = 2 * np.pi / dev
        net.add_lane(
            "ser", "ses", StraightLane([2, access], [2, dev / 2], line_types=(s, c))
        )
        net.add_lane(
            "ses",
            "se",
            SineLane(
                [2 + a, dev / 2],
                [2 + a, dev / 2 - delta_st],
                a,
                w,
                -np.pi / 2,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "sx",
            "sxs",
            SineLane(
                [-2 - a, -dev / 2 + delta_en],
                [-2 - a, dev / 2],
                a,
                w,
                -np.pi / 2 + w * delta_en,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "sxs", "sxr", StraightLane([-2, dev / 2], [-2, access], line_types=(n, c))
        )

        net.add_lane(
            "eer", "ees", StraightLane([access, -2], [dev / 2, -2], line_types=(s, c))
        )
        net.add_lane(
            "ees",
            "ee",
            SineLane(
                [dev / 2, -2 - a],
                [dev / 2 - delta_st, -2 - a],
                a,
                w,
                -np.pi / 2,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "ex",
            "exs",
            SineLane(
                [-dev / 2 + delta_en, 2 + a],
                [dev / 2, 2 + a],
                a,
                w,
                -np.pi / 2 + w * delta_en,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "exs", "exr", StraightLane([dev / 2, 2], [access, 2], line_types=(n, c))
        )

        net.add_lane(
            "ner", "nes", StraightLane([-2, -access], [-2, -dev / 2], line_types=(s, c))
        )
        net.add_lane(
            "nes",
            "ne",
            SineLane(
                [-2 - a, -dev / 2],
                [-2 - a, -dev / 2 + delta_st],
                a,
                w,
                -np.pi / 2,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "nx",
            "nxs",
            SineLane(
                [2 + a, dev / 2 - delta_en],
                [2 + a, -dev / 2],
                a,
                w,
                -np.pi / 2 + w * delta_en,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "nxs", "nxr", StraightLane([2, -dev / 2], [2, -access], line_types=(n, c))
        )

        net.add_lane(
            "wer", "wes", StraightLane([-access, 2], [-dev / 2, 2], line_types=(s, c))
        )
        net.add_lane(
            "wes",
            "we",
            SineLane(
                [-dev / 2, 2 + a],
                [-dev / 2 + delta_st, 2 + a],
                a,
                w,
                -np.pi / 2,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "wx",
            "wxs",
            SineLane(
                [dev / 2 - delta_en, -2 - a],
                [-dev / 2, -2 - a],
                a,
                w,
                -np.pi / 2 + w * delta_en,
                line_types=(c, c),
            ),
        )
        net.add_lane(
            "wxs", "wxr", StraightLane([-dev / 2, -2], [-access, -2], line_types=(n, c))
        )

        road = Road(
            network=net,
            np_random=self.np_random,
            record_history=self.config["show_trajectories"],
        )
        self.road = road
        
    def _make_vehicles(self):
        position_deviation = 2.0
        speed_deviation = 2.0

        # Ego-vehicle
        ego_lane = self.road.network.get_lane(("ser", "ses", 0))
        ego_vehicle = self.action_type.vehicle_class(
            self.road,
            ego_lane.position(125.0, 0.0),
            speed=8.0,
            heading=ego_lane.heading_at(140.0),
        )
        try:
            ego_vehicle.plan_route_to("nxs")
        except AttributeError:
            pass
        self.road.vehicles.append(ego_vehicle)
        self.vehicle = ego_vehicle

        # Incoming vehicle
        destinations = ["exr", "sxr", "nxr"]
        other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
        vehicle = other_vehicles_type.make_on_lane(
            self.road,
            ("we", "sx", 1),
            longitudinal=5.0 + self.np_random.normal() * position_deviation,
            speed=16 + self.np_random.normal() * speed_deviation,
        )

        if self.config["incoming_vehicle_destination"] is not None:
            destination = destinations[self.config["incoming_vehicle_destination"]]
        else:
            destination = self.np_random.choice(destinations)
        vehicle.plan_route_to(destination)
        vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Other vehicles
        for i in list(range(1, 2)) + list(range(-1, 0)):
            vehicle = other_vehicles_type.make_on_lane(
                self.road,
                ("we", "sx", 0),
                longitudinal=20.0 * float(i)
                + self.np_random.normal() * position_deviation,
                speed=16.0 + self.np_random.normal() * speed_deviation,
            )
            vehicle.plan_route_to(self.np_random.choice(destinations))
            vehicle.randomize_behavior()
            self.road.vehicles.append(vehicle)

        # Entering vehicle
        vehicle = other_vehicles_type.make_on_lane(
            self.road,
            ("eer", "ees", 0),
            longitudinal=5.0 + self.np_random.normal() * position_deviation,
            speed=16.0 + self.np_random.normal() * speed_deviation,
        )
        vehicle.plan_route_to(self.np_random.choice(destinations))
        vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

In [15]:
from gymnasium.envs.registration import register

register(
    id="MyRoundaboutEnv-v0",
    entry_point="__main__:MyRoundaboutEnv",  # or the module path if in another file
)

In [30]:
TRAIN = False

# Create the environment
env = gym.make("roundabout-v0", render_mode="rgb_array")#, roundabout_radius = 30)
# env = gym.make("MyRoundaboutEnv-v0", render_mode="rgb_array")#, roundabout_radius = 30)

# add disturbances



obs, info = env.reset()

# Create the model
model = DQN(
    "MlpPolicy",
    env,
    policy_kwargs=dict(net_arch=[256, 256]),
    learning_rate=5e-4,
    buffer_size=15000,
    learning_starts=200,
    batch_size=32,
    gamma=0.8,
    train_freq=1,
    gradient_steps=1,
    target_update_interval=50,
    verbose=1,
    tensorboard_log="roundabout_dqn/",
)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [5]:
import numpy as np
print(np.__version__)

2.0.2


In [3]:
# Train the model
if TRAIN:
    model.learn(total_timesteps=int(2e4))
    model.save("roundabout_dqn/model")
    del model

In [7]:
# from google.colab import drive
# drive.mount('/content/drive')


In [None]:
# # Change working directory
# import os
# os.chdir('/content/drive/MyDrive/CS238V-Cars/')  # put your folder path here

# # Check current working directory
# !pwd

/content/drive/MyDrive/CS238V-Cars


In [31]:
# Actuator fuzzing

import numpy as np

def low_level_agent_fuzzing(vehicle, acc_std=0.5, steer_std=0.1, max_steer_std=0.05):
    """Add Gaussian noise to the low-level controllers of a vehicle.

    Args:
        vehicle: ControlledVehicle or MDPVehicle instance
        acc_std: std deviation for acceleration noise [m/sÂ²]
        steer_std: std deviation for steering noise [rad]
    """

    # Backup original functions
    original_speed_control = vehicle.speed_control
    original_steering_control = vehicle.steering_control
    
    # Optional: perturb MAX_STEERING_ANGLE once per vehicle
    vehicle.MAX_STEERING_ANGLE = max(
        0.05,  # avoid zero max steering
        vehicle.MAX_STEERING_ANGLE + np.random.normal(0, max_steer_std)
    )

    # Redefine speed_control with Gaussian noise
    def noisy_speed_control(target_speed):
        base = original_speed_control(target_speed)
        noise = np.random.normal(0, acc_std)
        return base + noise

    # Redefine steering_control with Gaussian noise
    def noisy_steering_control(target_lane_index):
        base = original_steering_control(target_lane_index)
        noise = np.random.normal(0, steer_std)
        return base + noise

    vehicle.speed_control = noisy_speed_control
    vehicle.steering_control = noisy_steering_control


In [33]:
import random
# Run the trained model and record video
model = DQN.load("roundabout_dqn/model", env=env, device="cpu")
# model.set_env(env)
env = RecordVideo(
    env, video_folder="roundabout_dqn/videos/action_controller_fuzz", episode_trigger=lambda e: True
)
env.unwrapped.config["simulation_frequency"] = 15  # Higher FPS for rendering
env.unwrapped.config.update({
     "observation": {
         # default
                    "type": "Kinematics",
                    "absolute": True,
                    
                    "features_range": {
                        "x": [-100, 100],
                        "y": [-100, 100],
                        "vx": [-15, 15], #[-15, 15],
                        "vy": [-15, 15], #[-15, 15],
                    },
                    
                    # # add new
                    "vehicles_count": 5,
                    # "order": "shuffled",
                    
                },
     
     
    # "sensor_noise": 0.02
})
low_level_agent_fuzzing(env.unwrapped.vehicle, acc_std=1, steer_std=5)

env.unwrapped.set_record_video_wrapper(env)

for videos in range(2):
    done = truncated = False
    obs, info = env.reset()
    while not (done or truncated):
        # Predict
        action, _states = model.predict(obs, deterministic=True)
        
        # # # action fuzz
        # if random.random() < 0.4:
        #     available = env.unwrapped.action_type.get_available_actions()
        #     print(available)
        #     action = random.choice(available)

        # # Get reward
        obs, reward, done, truncated, info = env.step(action)
        
        # Render
        env.render()
    env.close()

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


  logger.warn(


In [None]:
# !pip freeze > colab_requirements.txt

In [None]:
# imports
# # import numpy
# # import gymnasium
# # import stable_baselines3
# # import cv2
# # import highway_env
# import torch
# import torchvision
# import torchaudio


# # print("numpy:", numpy.__version__)
# # print("gymnasium:", gymnasium.__version__)
# # print("stable_baselines3:", stable_baselines3.__version__)
# # print("opencv-python:", cv2.__version__)
# # print("highway_env:", highway_env.__version__)

# print("torch:", torch.__version__)
# print("torchvision:", torchvision.__version__)
# print("torchaudio:", torchaudio.__version__)


numpy: 2.0.2
gymnasium: 1.2.3
stable_baselines3: 2.7.1
opencv-python: 4.13.0
highway_env: 1.10.2
