## Emergency Vehicle Autonomous Driving (RL)

## Overview
Goal is to train a autonomous emergency vehicle model (ambulance, police, firefighter, etc.).
Prioritizing speed, collision avoidance, while not so much road laws; like speed and traffic lights.

I'll begin with a more basic model with a few sensors and minimal traffic then incrementally add complexity.

```console
!CarlaUE4.exe -benchmark -fps=2000
```

In [1]:
import carla
import random
import numpy as np
import os

from geopy.distance import geodesic

from gym import Env
from gym.spaces import Discrete, Box, Dict

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback

### Carla Python API
Used to communicate with the server – `CarlaUE4.exe`

In [2]:
# Connect to the CARLA server, only run this after starting the server (see README.md)
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

In [3]:
map = world.get_map()
random_spawn = random.choice(world.get_map().get_spawn_points()).location
# Convert world coordinates to GNSS
destination = map.transform_to_geolocation(random_spawn)
print(np.array([destination.latitude, destination.longitude, destination.altitude]))

[-0.00083375  0.00095224  0.59999996]


In [4]:
actors_list = []

# get blueprints and spawn points
bp_lib = world.get_blueprint_library()
spawn_points = world.get_map().get_spawn_points()
    
print('Sensors:')
for bp in bp_lib.filter('sensor'):
    print(bp.id[7:], end=' | ')

Sensors:
other.collision | camera.depth | camera.optical_flow | camera.normals | other.lane_invasion | camera.dvs | other.imu | other.gnss | other.obstacle | other.radar | lidar.ray_cast_semantic | lidar.ray_cast | camera.rgb | camera.semantic_segmentation | other.rss | camera.instance_segmentation | 

### Spawning ego vehicle (agent)

In [5]:
def spawn_vehicle():
    # spawn vehicle, ambulance
    vehicle_bp = bp_lib.find('vehicle.ford.ambulance')
    vehicle = world.try_spawn_actor(vehicle_bp, random.choice(spawn_points)) # random locations spawn

    return vehicle

In [6]:
# vehicle = spawn_vehicle()
# actors_list.append(vehicle)

### Sensors to the vehicle, callback functions

Processing RGB image
* Raw data is a 1D flat array of pixels
* Reshape to 4 channel image so instead it's, array[y, x] = RGBA
* Drop the alpha, RGBA -> RGB, irrelevant

Env parameter is for the enviroment class, for when it is called

In [7]:
IM_WIDTH = 480
IM_HEIGHT = 320
IM_FOV = 110

def spawn_rgb_camera(to_attach):
    # RGB Camera
    camera_rgb_bp = bp_lib.find('sensor.camera.rgb')
    camera_rgb_bp.set_attribute('image_size_x', f'{IM_WIDTH}')
    camera_rgb_bp.set_attribute('image_size_y', f'{IM_HEIGHT}')
    camera_rgb_bp.set_attribute('fov', f'{IM_FOV}')
    rgb_transform = carla.Transform(carla.Location(x=2.5, z=1.5))  # Front-mounted
    camera_rgb = world.spawn_actor(camera_rgb_bp, rgb_transform, attach_to=to_attach)
    return camera_rgb

# callback function
def process_rgb_image(image, env):
    '''Process an RGB image'''
    array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) 
    array = np.reshape(array, (image.height, image.width, 4)) 
    env.rgb_image = array[:, :, :3]
    
    # # Display image, uncomment to show
    # cv2.imshow("RGB Camera", array)  # Display using OpenCV
    # cv2.waitKey(1)  # Wait for a key press

In [8]:
# camera_rgb = spawn_rgb_camera(vehicle)
# actors_list.append(camera_rgb)

Processing Lidar data
* Using point cloud data, not image data; set of 3D points for 3D objects
* Reshape to 3D array

In [9]:
def spawn_lidar_sensor(to_attach):
    # LiDAR Sensor
    lidar_bp = bp_lib.find('sensor.lidar.ray_cast')
    lidar_bp.set_attribute('range', '50')
    lidar_transform = carla.Transform(carla.Location(z=2.5)) # Front-mounted
    lidar_sensor = world.spawn_actor(lidar_bp, lidar_transform, attach_to=to_attach)
    return lidar_sensor

# callback function
def process_lidar_pc(point_cloud, env, points=360):
    '''Process a LiDAR point cloud to have a fixed number of points'''
    array = np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4'))
    positions = array.shape[0] // 3
    max_set_3 = int(positions) * 3
    point_cloud_array = np.reshape(array[:max_set_3], (positions, 3))
    
    if positions > points:
        # Randomly sample
        indices = np.random.choice(positions, points, replace=False)
        env.lidar_pc = point_cloud_array[indices]
        
    elif positions < points:
        # Pad with zeros
        padding = np.zeros((points - positions, 3), dtype=np.float32)
        env.lidar_pc = np.vstack((point_cloud_array, padding))
        
    else:
        env.lidar_pc = point_cloud_array

In [10]:
# lidar_sensor = spawn_lidar_sensor(vehicle)
# actors_list.append(lidar_sensor)

Collision sensor
* Primarily used to detect when a simulation has failed, i.e. the vehicle has crashed

In [11]:
def spawn_collision_sensor(to_attach):
    # Collision Sensor
    collision_bp = bp_lib.find('sensor.other.collision')
    collision_sensor = world.spawn_actor(collision_bp, carla.Transform(), attach_to=to_attach)
    return collision_sensor
   
def process_collision(event, env):
    '''Process a collision event'''
    print("COLLISION:", event)
    env.collided = 1

In [12]:
# collision_sensor = spawn_collision_sensor(vehicle)
# actors_list.append(collision_sensor)

GNSS sensor
* Global position of the vehicle in the world

In [13]:
def spawn_gnss_sensor(to_attach):
    # GNSS Sensor
    gnss_bp = bp_lib.find('sensor.other.gnss')
    gnss_sensor = world.spawn_actor(gnss_bp, carla.Transform(), attach_to=to_attach)
    return gnss_sensor

def process_gnss_data(data, env):
    '''Process GNSS'''
    env.current_location = np.array([data.latitude, data.longitude, data.altitude])


### Destinations
Just like in a real world enviorment, an emergency vehicle doesnt just drive around with no aim.
It needs a goal (destination) for which to head towards.

For this simulation we can make use of a random spawn point from CARLA as the destination. (Implemented in the class)

Using geopy for distance calculation, as its more accurate than euclidean distance

In [14]:
def calculate_distance(start, desti):
    distance = geodesic((start[0], start[1]), (desti[0], desti[1])).meters
    
    return distance

In [15]:
def get_current_location_gnss(actor):
    location = actor.get_location()
    gnss = map.transform_to_geolocation(location)
    current = np.array([gnss.latitude, gnss.longitude, gnss.altitude])
    
    return current

def set_destination_gnss(actor):
    # Ensure its at least 100m away
    
    distance = 0
    while distance < 100:
        random_spawn = random.choice(spawn_points).location
        gnss = map.transform_to_geolocation(random_spawn)  # Convert to GNSS
        destination = np.array([gnss.latitude, gnss.longitude, gnss.altitude])
    
        distance = calculate_distance(get_current_location_gnss(actor), destination)
        
    return destination



## Reinforcement Learning Enviroment Class
We'll use open-ai gym for this

Observation Space (`Dict`):
* RGB Camera Shape: `(IM_WIDTH, IM_HEIGHT, 3)` : `Box` -> Only with visual render
* LiDAR Point Cloud Shape: `(N, 3)`; `N` points, `3` x,y,z : `Box`
* Collision Shape: `1` : `Discrete`
* Current Location Shape: `3` x,y,z : `Box`
* Destination Location Shape: `3` x,y,z : `Box`
* GNSS Location Shape: `3` lat,long,alt : `Box`

Action Space (`Box`):
* Steering: `[-1, 1]`, Left : Right
* Throttle: `[0,1]`, None : Full acceleration

In [16]:
class CarlaEnv(Env):
    
    def __init__(self, visual_render=False):
        '''
        Initialize the Env, and sets up the observation and action spaces detailed above
        '''
        
        self.visual_render = visual_render
        
        # Action space
        self.action_space = Box( # ([steering, throttle])
            low=np.array([-1.0, 0.1]), # minimum throttle is 0.1, to always be in motion, encourage exploration
            high=np.array([1.0, 1.0]),
            dtype=np.float32)
        
        # Observations space
        observation_spaces = {}
        
        if self.visual_render:
            rgb_cam_space = Box(
                low=0,
                high=255,
                shape=(IM_HEIGHT, IM_WIDTH, 3),
                dtype=np.uint8) # uint8 is for images
            observation_spaces['rgb'] = rgb_cam_space
        
        lidar_space = Box(
            low=-100,
            high=100,
            shape=(360, 3),
            dtype=np.float32)
        observation_spaces['lidar'] = lidar_space
        
        collision_space = Discrete(2) # 0 or 1
        observation_spaces['collision'] = collision_space
        
        current_location_space = Box(
            low=np.array([-180, -180, -500]),  # Latitude, Longitude, Altitude
            high=np.array([180, 180, 500]),
            shape=(3,),
            dtype=np.float32
        )
        observation_spaces['current_location'] = current_location_space
        
        destination_space = Box(
            low=np.array([-180, -180, -500]),
            high=np.array([180, 180, 500]),
            shape=(3,),
            dtype=np.float32
        )
        observation_spaces['destination'] = destination_space
        
        self.observation_space = Dict(observation_spaces)
        
        # Initialize variables, to the external functions taking in (env)
        self.collided = 0 # False
        self.rgb_image = np.zeros((IM_HEIGHT, IM_WIDTH, 3), dtype=np.uint8) if self.visual_render else None # Black image
        self.lidar_pc = np.zeros((360, 3), dtype=np.float32) # Empty point cloud
        
        self.actors_list = []
    
    def reset(self):
        '''
        Reset Env, respawn agent, and return initial state
        '''
        
        # Reset flags
        self.collided = 0
        
        # Destroy all previous actors
        if self.actors_list:
            for actor in self.actors_list:
                actor.destroy()
            self.actors_list.clear()
        
        # Spawn actor and sensors using previous functions
        self.vehicle = spawn_vehicle()
        self.lidar_sensor = spawn_lidar_sensor(self.vehicle)
        self.collision_sensor = spawn_collision_sensor(self.vehicle)
        self.gnss_sensor = spawn_gnss_sensor(self.vehicle)
        
        # Add actors to list
        self.actors_list.extend([self.vehicle, self.lidar_sensor, self.collision_sensor, self.gnss_sensor])
        
        # Apply callbacks
        self.lidar_sensor.listen(lambda data: process_lidar_pc(data, self))
        self.collision_sensor.listen(lambda event: process_collision(event, self))
        self.gnss_sensor.listen(lambda data: process_gnss_data(data, self))
        
        # GNSS current and destination        
        self.current_location = get_current_location_gnss(self.vehicle)
        self.destination = set_destination_gnss(self.vehicle)
        
        # Return state
        obs = {
            'lidar': self.lidar_pc,
            'collision': self.collided,
            'current_location': self.current_location,
            'destination': self.destination
        }
        
        # Add RGB image if visual render
        if self.visual_render:
            self.camera_rgb = spawn_rgb_camera(self.vehicle)
            self.actors_list.append(self.camera_rgb)
            self.camera_rgb.listen(lambda data: process_rgb_image(data, self))
            obs['rgb'] = self.rgb_image
        
        # Log dictionary
        log_dict = {
            'visual_render': self.visual_render,
            'current_location': self.current_location,
            'destination': self.destination,
            'distance': calculate_distance(self.current_location, self.destination)
        }
        print(log_dict)
        
        return obs
    
    def get_reward(self):
        '''
        Calculate and return reward; to be used in step function
        ''' 
        
        reward = 0
        
        # Reward for collision
        if self.collided:
            reward -= 200
        
        # Time penalty
        reward -= 0.2
        
        # Faster = better up to 45 m/s
        velocity = self.vehicle.get_velocity()
        speed = np.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)  # m/s
        reward += min(speed, 45) / 3 # no extra rewards after 45 m/s (~ 100mph)
        
        # In lane and following traffic rules don't matter as it's an emergency vehicle
        
        # Destination reward
        distance = calculate_distance(self.current_location, self.destination)
        reward += 100 / (distance + 1)
        
        # Problems when models destination is in front of it by chance
        # if distance < 5.0: # ends in m range
        #     reward += 50 
        #     done = 1
        
        return reward
        
    def step(self, action):
        '''
        Take action, return next state, reward, done
        '''
        
        # Apply action
        steering = float(action[0])
        throttle = float(action[1]) * 0 + 1
        
        self.vehicle.apply_control(carla.VehicleControl(throttle=throttle, steer=steering))
        
        # Get reward
        reward = self.get_reward()
        
        # Update current location
        self.current_location = get_current_location_gnss(self.vehicle)
        
        obs = {
            'lidar': self.lidar_pc,
            'collision': self.collided,
            'current_location': self.current_location,
            'destination': self.destination
        }
        
        if self.visual_render:
            obs['rgb'] = self.rgb_image
        
        # Done conditions
        done = self.collided
        
        distance = calculate_distance(self.current_location, self.destination)
        

            
        speed = np.sqrt((self.vehicle.get_velocity().x**2) + (self.vehicle.get_velocity().y**2) + (self.vehicle.get_velocity().z**2))
        
        # Information, speed, current steering, throttle, distance to destination
        # Data, min, max
        info = {
            'reward': reward,
            'speed': speed,
            'steering': steering,
            'throttle': throttle,
            'distance': distance
        }
        
        #print(info)
        
        return obs, reward, done, info

    def render(self):
        '''
        Display a rendering of the environment, from the spectators POV
        This will only work when the server runs in rendering mode
        '''
        try:
            spectator = self.vehicle.get_world().get_spectator()
            transform = self.vehicle.get_transform()
            
            # Make spectator follow vehicle
            spectator.set_transform(carla.Transform(
                transform.location + carla.Location(z=30.0), # Offset so you get a better view of surroundings also
                carla.Rotation(pitch=-60, yaw=transform.rotation.yaw)  # Look at same direction as vehicle
            ))
        except:
            print('Rendering mode likely not enabled, or server not running.')
    
    def close(self):
        '''
        Clean up
        '''
        if self.actors_list:
            for actor in self.actors_list:
                actor.destroy()
            self.actors_list.clear()
            
        print('Environment cleaned up.')

## Traning the Agent
Proximal Policy Optimization (PPO) to train the agent (vehicle); well suited for continuous action spaces like steering, throttle.

### Traffic with python script in PythonAPI/examaples
To begin with the trianing wont incorperate traffic, simplified env, once the model learns basic navigation traffic and other real world elements will be incorperated.

```cmd
!cd ../examples/ && py -3.8 generate_traffic.py
```

In [17]:
# Regular render -> True, Slowest, with spectator
# Off-Screen -> True, Faster, no spectator
# No-Renders -> False, Fastest, no spectator
rendering = False

Testing env for a quick demo, this uses `.sample()` so all the movement are random and not based on any predicitons

In [18]:
# # Initialize environment
# env = CarlaEnv(visual_render=rendering)
# env.reset()

# for ep in range(1,3): 
#     obs = env.reset()
#     done = 0
#     total_reward = 0
    
#     while done == 0:
#         action = env.action_space.sample()
#         obs, reward, done, info = env.step(action)
#         total_reward += reward
        
#         env.render()
        
#         print(info)
        
#     print('Episode Reward:', total_reward)
    
# env.close() # Clean up

The policy used is `MultiInputPolicy` due to the obseration space being a `Dict`

In [19]:
#load model in if exists
log_path = os.path.join("training", "logs")
model_path = os.path.join("training", "models")

try:
    model_ppo = PPO.load('training/models/best_model.zip', device='cuda')
    print("Model loaded successfully")
except:
    print("Model not found, training new model")

Model loaded successfully


`DummyVecEnv` wrapper to vectorize, for `PPO` model training.

*unsolved note: cannot make use of `SubprocVecEnv` due to following error:*
```console
RuntimeError: Pickling of "carla.libcarla.BlueprintLibrary" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)
```

In [20]:
# Create environment
env = DummyVecEnv([lambda: CarlaEnv(visual_render=rendering)])

eval_callback = EvalCallback(env,
                             eval_freq=10_000,
                             best_model_save_path=model_path,
                             verbose=1)

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [21]:
try:
    # Train model
    model_ppo = PPO('MultiInputPolicy', env, verbose=1, tensorboard_log=log_path, device='cuda')
    model_ppo.learn(total_timesteps=100_000_000, callback=eval_callback)
except:
    print("Training interrupted. Closing environment.")
finally:
    env.close()
    model_ppo.save(model_path)

Using cpu device
{'visual_render': False, 'current_location': array([-3.78566230e-04,  8.90037847e-04,  5.46100020e-01]), 'destination': array([-0.00062936, -0.00102937,  0.59999996]), 'distance': 215.4593054242264}
Logging to training\logs\PPO_5
COLLISION: CollisionEvent(frame=5352525, timestamp=264443.953958, other_actor=000001F8E7C3DE90)
{'visual_render': False, 'current_location': array([-0.00080702,  0.00098386,  0.48239994]), 'destination': array([-1.47994362e-04, -6.76825518e-04,  5.99999964e-01]), 'distance': 198.71093259276185}
COLLISION: CollisionEvent(frame=5352641, timestamp=264449.753958, other_actor=000001F8E7C3DC50)
{'visual_render': False, 'current_location': array([-1.47994362e-04, -7.62963984e-04,  4.82399940e-01]), 'destination': array([-1.51898174e-04,  3.89627395e-04,  5.99999964e-01]), 'distance': 128.30661143399507}
-----------------------------
| time/              |      |
|    fps             | 1165 |
|    iterations      | 1    |
|    time_elapsed    | 1    |

TO minimize GPU usage by unecessary rendering, I've enabled no render mode, however also made use of the no render script to get a top down 2d, pygame view of the simulation.

### Further training
Increses complexity of the simulation, more realistic

* Traffic:
    ```console
    !py -3.8 PythonAPI/examples/generate_traffic.py
    ```
* Weather
    ```console
    !py -3.8 PythonAPI/examples/dynamic_weather.py
    ```

## Results & Evaluation - In Progress