In [1]:
!pip install -q gym numpy gymnasium stable-baselines3

You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [2]:
import carla
import os

client_name = os.environ.get("CLIENT_NAME", "DOES NOT EXIST")
if client_name == "DOES NOT EXIST":
    raise Exception("The environment variable for the container name of the carla server has not been set")

# Connect to the client and retrieve the world object
client = carla.Client(client_name, 2000)
world = client.get_world()

In [3]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from time import sleep

class CarlaEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a env to interface with CARLA simulator
    """

    # Because of jupyter notebook, we cannot implement the GUI ('human' render mode)
    metadata = {"render_modes": ["console"]}
    
    def camera_callback(self, image):
        self.camera1_data = np.array(image.raw_data).reshape(600, 800, 4)[..., [2,1,0,3]].copy()
    
    def lane_callback(self, event):
        self.crossed_lane = True

    def __init__(self, render_mode="console"):
        super(CarlaEnv, self).__init__()
        self.render_mode = render_mode
        self.camera1_data = np.empty((600, 800, 4))
        self.crossed_lane = False

        # Define action and observation space
        # They must be gym.spaces objects
        # In this case it is buckets for throttle, steering and brake
        n_actions = 230 + 11 - 1
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the RGB camera view (for now)
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(600,800,4), dtype=np.uint8
        )
        
        # Get camera listener working
        cameras = []
        lane_sensors = []
        for actor in world.get_actors():
            if(actor.type_id == 'sensor.camera.rgb'):
                cameras.append(actor)
            elif (actor.type_id == 'sensor.other.lane_invasion'):
                lane_sensors.append(actor)
        self.lane_sensor = lane_sensors[0]
        self.lane_sensor.listen(self.lane_callback)
        self.camera = cameras[0]
        self.camera.listen(self.camera_callback)
        vehicles = []
        for actor in world.get_actors():
            if(actor.type_id == 'vehicle.tesla.model3'):
                vehicles.append(actor)
        for vehicle in vehicles:
            if(vehicle.attributes["role_name"] == "ego_vehicle"):
                self.ego_vehicle = vehicle

    def reset(self, seed=None, options=None):
        """
        Important: the observation must be a numpy array
        :return: (np.array)
        """
        super().reset(seed=seed, options=options)
        # move vehicle to correct location
        initLocation = carla.Location(x=-51.404804, y=-76.343437, z=0.053576)
        
        return self.camera1_data.astype(np.uint8), {}  # empty info dict

    def step(self, action):
        throttle = 0.0
        steer = 0.0
        brake = 0.0
        if(action <= 230 - 1):
            throttle = ((action - 11 - 1) // 21) / 10.0
            steer = (((action - 11 - 1) // 11) / 10.0) - 1.0
        else:
            brake = (action - 230 - 1 - 1) / 10.0
        vehicle_control_cmd = carla.VehicleControl(throttle, steer, brake)
        self.ego_vehicle.apply_control(vehicle_control_cmd)
        
        
        # Let the simulator respond
        sleep(0.5)

        # Optionally we can pass additional info, we are not using that for now
        info = {}
        truncated = False
        reward = 2
        terminated = self.crossed_lane
        if(terminated):
            self.camera.stop()
            self.lane_sensor.stop()
            reward = 0

        return (
            self.camera1_data.astype(np.uint8),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        # agent is represented as a cross, rest as a dot
        if self.render_mode == "console":
            print("." * self.agent_pos, end="")
            print("x", end="")
            print("." * (self.grid_size - self.agent_pos))

    def close(self):
        pass

In [4]:
from stable_baselines3.common.env_checker import check_env

In [5]:
env = CarlaEnv()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

In [6]:
from stable_baselines3 import DQN

In [8]:
model = DQN("MlpPolicy", env, verbose=1, buffer_size=1000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
