#### Imports

In [None]:

import json
import math
import random
import time
import queue
import shutil 
import os
import sys
from enum import Enum
from datetime import datetime

import carla
import numpy as np
import urdf_parser_py.urdf as urdf
from scipy.spatial.transform import Rotation   
import matplotlib.pyplot as plt
from matplotlib import cm
import open3d as o3d
import cv2

from classes.CARLASemantics import SemanticColors, SemanticTags

In [2]:
try:
    CARLA_ADDITIONAL_PYTHON_API_PATH = "/home/erkoiv/Documents/CARLA_0.9.13/PythonAPI/carla"
    if CARLA_ADDITIONAL_PYTHON_API_PATH not in sys.path:
        sys.path.append(CARLA_ADDITIONAL_PYTHON_API_PATH)
    from agents.navigation.basic_agent import BasicAgent 
except Exception as error:
    raise ImportError(f"FATAL ERROR: Unable to import CARLA autonomous driving agent BasicAgent due to missing PythonAPI. The API is included in the simulator installation package (not included with \"import carla\"). Setup the CARLA simulator repository and add the correct PythonAPI path above. ({error})")

#### Connect to CARLA server

In [3]:
client = carla.Client('localhost', 2000)
client.set_timeout(10)
world = client.get_world()
map = world.get_map()

blueprint_library = world.get_blueprint_library()
spawn_points = world.get_map().get_spawn_points()

In [4]:
def reload_world():
    global world, map, blueprint_library, spawn_points, traffic_manager
    world = client.reload_world()
    map = world.get_map()
    blueprint_library = world.get_blueprint_library()
    spawn_points = map.get_spawn_points()
    traffic_manager = client.get_trafficmanager()

def load_world(map_name="Town01", timeout=10.0):
    global world, map, blueprint_library, spawn_points, traffic_manager
    client.set_timeout(timeout)
    world = client.load_world(map_name)
    map = world.get_map()
    blueprint_library = world.get_blueprint_library()
    spawn_points = map.get_spawn_points()
    traffic_manager = client.get_trafficmanager()

load_world(map_name="Town03")

In [5]:
class CarlaSyncMode(object):
    """
        with CarlaSyncMode(world, sensors) as sync_mode:
            while True:
                data = sync_mode.tick(timeout=1.0)

    """

    def __init__(self, world, sensors, **kwargs):
        self.world = world
        self.sensors = sensors
        self.frame = None
        self.delta_seconds = 1.0 / kwargs.get('fps', 5)
        self._queues = []
        self._settings = None

    def __enter__(self):
        self._settings = self.world.get_settings()
        self.frame = self.world.apply_settings(carla.WorldSettings(
            no_rendering_mode=False,
            synchronous_mode=True,
            fixed_delta_seconds=self.delta_seconds,
            ))

        def make_queue(register_event):
            q = queue.Queue()
            register_event(q.put)
            self._queues.append(q)

        make_queue(self.world.on_tick)
        for sensor in self.sensors:
            make_queue(sensor.listen)
        return self

    def tick(self, timeout=300):
        self.frame = self.world.tick()
        data = [self._retrieve_data(q, timeout) for q in self._queues]
        assert all(x.frame == self.frame for x in data)
        return data

    def __exit__(self, *args, **kwargs):
        self.world.apply_settings(self._settings)

    def _retrieve_data(self, sensor_queue, timeout):
        while True:
            data = sensor_queue.get(timeout=timeout)
            if data.frame == self.frame:
                return data

#### CARLA transformation functions

In [6]:
def matrix_to_transform(matrix):
    # Ensure matrix is a NumPy array
    if not isinstance(matrix, np.ndarray):
        matrix = np.array(matrix)

    # Extract translation
    location = carla.Location(x=matrix[0, 3], y=(-matrix[1, 3]), z=matrix[2, 3])

    roll, pitch, yaw = Rotation.from_matrix(matrix[:3, :3]).as_euler('xyz', degrees=True)
    rotation = carla.Rotation(pitch=(-pitch), yaw=(-yaw), roll=roll)
    
    # Create and return carla.Transform
    return carla.Transform(location, rotation)


def build_transform_matrix(rotation, translation):
    m = np.eye(4)
    m[:3, :3] = rotation
    m[:3, 3] = translation
    return m


def rotation_matrix(axis, angle):
    """
    Create a rotation matrix for a given axis and angle.
    """
    if axis == 'x':
        return np.array([
            [1, 0, 0, 0],
            [0, np.cos(angle), -np.sin(angle), 0],
            [0, np.sin(angle), np.cos(angle), 0],
            [0, 0, 0, 1]
        ])
    elif axis == 'y':
        return np.array([
            [np.cos(angle), 0, np.sin(angle), 0],
            [0, 1, 0, 0],
            [-np.sin(angle), 0, np.cos(angle), 0],
            [0, 0, 0, 1]
        ])
    elif axis == 'z':
        return np.array([
            [np.cos(angle), -np.sin(angle), 0, 0],
            [np.sin(angle), np.cos(angle), 0, 0],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])

def reflection_matrix():
    """
    Create a reflection matrix to flip the Y-axis.
    """
    return np.array([
        [1, 0, 0, 0],
        [0, -1, 0, 0],
        [0, 0, 1, 0],
        [0, 0, 0, 1]
    ])

def transform_to_carla(sensor_type, transformation):
    """
    Convert sensor transformation to CARLA format.
    """
    sensor_type = sensor_type.lower().strip()
    if sensor_type in ['camera', 'sensor.camera', 'sensor.camera.rgb', 'sensor.camera.semantic_segmentation', 'sensor.camera.depth']:
        rotation1 = rotation_matrix('z', np.pi / 2)
        rotation2 = rotation_matrix('y', -np.pi / 2)
        rotation = np.dot(rotation1, rotation2)
    elif sensor_type in ['lidar', 'sensor.lidar', 'sensor.lidar.ray_cast', 'sensor.lidar.ray_cast_semantic']:
        rotation = rotation_matrix('z', np.pi / 2)
    elif sensor_type in ['radar', 'sensor.other.radar']:
        rotation = np.eye(4)
    else:
        raise ValueError("Unknown sensor type")
    tf = np.dot(transformation, rotation)
    return tf

def transform_from_carla(sensor_type, transformation):
    """
    Convert sensor transformation from CARLA format.
    """
    sensor_type = sensor_type.lower().strip()
    if sensor_type in ['camera', 'sensor.camera', 'sensor.camera.rgb', 'sensor.camera.semantic_segmentation', 'sensor.camera.depth']:
        rotation1 = rotation_matrix('z', -np.pi / 2)
        rotation2 = rotation_matrix('y', np.pi / 2)
        rotation = np.dot(rotation2, rotation1)
    elif sensor_type in ['lidar', 'sensor.lidar', 'sensor.lidar.ray_cast', 'sensor.lidar.ray_cast_semantic']:  
        rotation = rotation_matrix('z', -np.pi / 2)
    elif sensor_type in ['radar', 'sensor.other.radar']:
        rotation = np.eye(4)
    else:
        raise ValueError("Unknown sensor type")
    tf = np.dot(transformation, rotation)
    return tf

### Run path validation in CARLA

In [7]:
planned_path_json = dict()
with open('config/town3.path.json') as path_file:
    planned_path_json = json.load(path_file)
planned_path = planned_path_json["route"]

def get_agent_path(coordinates):
    path = []
    for x, y, z in coordinates:
        path.append(carla.Location(x=x, y=y, z=z))
    return path

In [8]:
def validate_path():
    reload_world()

    cv2.namedWindow("Press Q to stop simulation")
    cv2.imshow("Press Q to stop simulation", np.zeros((1,1)))

    blueprint_name = "vehicle.micro.microlino"
    blueprint = blueprint_library.find(blueprint_name)
    transform = spawn_points[0]
    validation_vehicle = world.spawn_actor(blueprint, transform)

    blueprint = blueprint_library.find("sensor.camera.rgb")
    blueprint.set_attribute('image_size_x', str(720))
    blueprint.set_attribute('image_size_y', str(480))

    transform_matrix = carla.Transform(carla.Location(x=1.7, y=0.0, z=1.5), carla.Rotation(roll=0, pitch=0, yaw=0)).get_matrix()
    transform = matrix_to_transform(transform_matrix)
    sensor = world.spawn_actor(blueprint, transform, attach_to=validation_vehicle)
    validation_vehicle_control_agent = BasicAgent(validation_vehicle, target_speed=17)
    validation_vehicle_control_agent.ignore_traffic_lights(active=True)
    agent_path = get_agent_path(planned_path)
    validation_vehicle_control_agent.set_destination(agent_path.pop(0))

    visited_path = []
    try:
        with CarlaSyncMode(world, [sensor], fps=10) as sync_mode:
            print("Driving to the start of the path...")
            while validation_vehicle_control_agent.done() is False:
                if (cv2.waitKey(1) == ord('q')):
                    break
                
                simulated_results = sync_mode.tick()[1:]
                cam_sensor = simulated_results[0]
                cam_sensor_image =  np.reshape(np.copy(cam_sensor.raw_data), (cam_sensor.height, cam_sensor.width, 4))
                cv2.imshow("CAM_FRONT", cam_sensor_image)

                validation_vehicle.apply_control(validation_vehicle_control_agent.run_step())
            print("Arrived at the start of the path!")

            print("Driving on path...")
            while True:
                if (cv2.waitKey(1) == ord('q')):
                    break

                simulated_results = sync_mode.tick()[1:]
                cam_sensor = simulated_results[0]
                cam_sensor_image =  np.reshape(np.copy(cam_sensor.raw_data), (cam_sensor.height, cam_sensor.width, 4))
                cv2.imshow("CAM_FRONT", cam_sensor_image)
                
                if validation_vehicle_control_agent.done():
                    print(f"{datetime.now()} Checkpoint reached. Ego vehicle has reached {len(planned_path) - len(agent_path)}/{len(planned_path)} planned path points.")
                    visited_path.append(validation_vehicle.get_location())
                    if (agent_path == []):
                        break
                    validation_vehicle_control_agent.set_destination(agent_path.pop(0))
                validation_vehicle.apply_control(validation_vehicle_control_agent.run_step())
            print("Drive finished!")
    except RuntimeError as error:
        print(error)
    finally:
        cv2.destroyAllWindows()
        validation_vehicle.destroy()

    assert len(visited_path) == len(planned_path), "Validation failed. The vehicle could not complete the planned path."

skip_validation = True
if not skip_validation:
    validate_path()

### Prepare CARLA world for full simulation

#### Add ego vehicle (with intrinsics and extrinsics from external configuration)

In [9]:
class URDFParser:
    def __init__(self, urdf_file):
        self.urdf_file = urdf_file
        self.robot = urdf.URDF.from_xml_file(urdf_file)
        self.root = self.robot.get_root()

    def compute_chain_transform(self, chain):
        transform = np.eye(4)
        
        for joint in chain:
            if joint not in self.robot.joint_map:
                continue
            
            joint_info = self.robot.joint_map[joint]
            rpy = joint_info.origin.rpy
            xyz = joint_info.origin.xyz
            rotation = Rotation.from_euler('xyz', rpy).as_matrix()
            translation = np.array(xyz)
            T = build_transform_matrix(rotation, translation)
            transform = np.dot(transform, T)
        
        return transform

    def get_T_from_to(self, start_frame, end_frame):
        chain_1 = self.robot.get_chain(self.root, start_frame)
        chain_2 = self.robot.get_chain(self.root, end_frame)
        T1 = self.compute_chain_transform(chain_1)
        T2 = self.compute_chain_transform(chain_2)
        return np.dot(np.linalg.inv(T1), T2)

In [10]:
reload_world()

blueprint_name = "vehicle.micro.microlino"
blueprint = blueprint_library.find(blueprint_name)
blueprint.set_attribute('role_name','ego')
transform = spawn_points[0]
vehicle = world.spawn_actor(blueprint, transform)
vehicle_control_agent = BasicAgent(vehicle)
vehicle_control_agent.ignore_traffic_lights(active=True)

sensor_names = []
sensor_types = []
sensors = []

axes = []

extrinsics = URDFParser('config/nuscenes.extrinsics.urdf')
intrinsics = dict()
with open('config/nuscenes.intrinsics.json') as intrinsics_file:
    intrinsics = json.load(intrinsics_file)

for sensor_configuration in extrinsics.robot.links:
    sensor_name = sensor_configuration.name

    if "CAM_" in sensor_name:
        blueprint_name = "sensor.camera.rgb"
        sensor_type = blueprint_name
        blueprint = blueprint_library.find(blueprint_name)
        sensor_intrinsics = intrinsics.get(sensor_name, dict())

        def calculate_fov(focal_length, image_width):
            fov_radians = 2 * np.arctan(image_width / (2 * focal_length))
            fov_degrees = np.degrees(fov_radians)
            return fov_degrees
        image_width = str(sensor_intrinsics.get("w", 1600))
        blueprint.set_attribute('image_size_x', image_width)
        image_height = str(sensor_intrinsics.get("h", 900))
        blueprint.set_attribute('image_size_y', image_height)
        focal_distance = float(sensor_intrinsics.get("fl"))
        field_of_view = str(calculate_fov(focal_distance, float(image_width)))
        blueprint.set_attribute('fov', field_of_view)
        
        transform_matrix = extrinsics.compute_chain_transform(extrinsics.robot.get_chain(extrinsics.root, sensor_name))
        transform_matrix = transform_to_carla("camera", transform_matrix)
        transform = matrix_to_transform(transform_matrix)
        attached_to = vehicle
        sensor = world.spawn_actor(blueprint, transform, attach_to=attached_to)
        sensors.append(sensor)
        sensor_types.append(sensor_type)
        sensor_names.append(sensor_name)

        ## Spawn additional DEPTH and SEMANTIC cams
        
        blueprint_name = "sensor.camera.depth"
        sensor_type = blueprint_name
        blueprint = blueprint_library.find(blueprint_name)
        blueprint.set_attribute('image_size_x', image_width)
        blueprint.set_attribute('image_size_y', image_height)
        blueprint.set_attribute('fov', field_of_view)
        attached_to = vehicle
        sensor = world.spawn_actor(blueprint, transform, attach_to=attached_to)
        sensors.append(sensor)
        sensor_types.append(sensor_type)
        sensor_names.append(f"DEPTH_{sensor_name}")

        blueprint_name = "sensor.camera.semantic_segmentation"
        sensor_type = blueprint_name
        blueprint = blueprint_library.find(blueprint_name)
        blueprint.set_attribute('image_size_x', image_width)
        blueprint.set_attribute('image_size_y', image_height)
        blueprint.set_attribute('fov', field_of_view)
        attached_to = vehicle
        sensor = world.spawn_actor(blueprint, transform, attach_to=attached_to)
        sensors.append(sensor)
        sensor_types.append(sensor_type)
        sensor_names.append(f"SEMANTIC_{sensor_name}")

    elif "RADAR_" in sensor_name:
        blueprint_name = "sensor.other.radar"
        sensor_type = blueprint_name
        blueprint = blueprint_library.find(blueprint_name)     
        blueprint.set_attribute('horizontal_fov', str(30.0)) 
        blueprint.set_attribute('vertical_fov', str(30.0)) 
        blueprint.set_attribute('points_per_second', str(1e5))
        
        transform_matrix = extrinsics.get_T_from_to("center", sensor_name)
        transform_matrix = transform_to_carla('radar', transform_matrix)
        transform = matrix_to_transform(transform_matrix)
        attached_to = vehicle
        sensor = world.spawn_actor(blueprint, transform, attach_to=attached_to)
        sensors.append(sensor)
        sensor_types.append(sensor_type)
        sensor_names.append(sensor_name)

    elif "LIDAR_" in sensor_name:
        blueprint_name = "sensor.lidar.ray_cast_semantic"
        sensor_type = blueprint_name
        blueprint = blueprint_library.find(blueprint_name)
        blueprint.set_attribute("channels", str(64))
        blueprint.set_attribute("points_per_second", str(112000))
        blueprint.set_attribute("range", str(100))
        
        transform_matrix = extrinsics.get_T_from_to("center", sensor_name)
        transform_matrix = transform_to_carla('lidar', transform_matrix)
        transform = matrix_to_transform(transform_matrix)
        attached_to = vehicle
        sensor = world.spawn_actor(blueprint, transform, attach_to=attached_to)
        sensors.append(sensor)
        sensor_types.append(sensor_type)
        sensor_names.append(sensor_name)

In [None]:
print(f"Number of sensors spawned: {len(sensors)}")
print(sensor_names)
print(sensor_types)

#### Add traffic (non-ego vehicles and pedestrians) to simulation

In [12]:
def add_vehicles_to_simulation(n_vehicles: int=0):
    """
    Choos n_vehicles amount of vehicles from Carla's Blueprint Library and
    spawn them at random points on the map. Each of those vehicles are set
    on autopilot and controlled by Carla's Traffic Manager.
    
    NB! If randomly chosen spawn points collide, the vehicle is not spawned
    so there might be fewer vehicles than set by n_vehicles.

    Inputs:
        n_vehicles - amount of vehicles to spawn
    """
    if n_vehicles == 0:
        return
    
    vehicle_blueprints = blueprint_library.filter('vehicle')
    
    npcs = []
    collisions = 0
    for _ in range(n_vehicles):
        vehicle_bp = random.choice(vehicle_blueprints)
        spawn_point = random.choice(spawn_points)
        npc = world.try_spawn_actor(vehicle_bp, spawn_point)
        if npc:
            npcs.append(npc)
        else:
            collisions += 1
    
    if collisions:
        print(f"Failed to spawn {collisions}/{n_vehicles} vehicles because of collisions")
    
    port = traffic_manager.get_port()
    for npc in npcs:
        npc.set_autopilot(True, port)

def add_pedestrians_to_simulation(n_pedestrians: int=0,
                                  min_speed: float=1.0,
                                  max_speed: float=2.0) -> None:
    """
    Choose n_pedestrians amount of random pedestrians from Carla's
    Blueprint Library, attach them to AI and spawn them to random 
    positions in the simulation. Set a random location for them to 
    walk to and speed between min_speed and max_speed for them to
    walk at.

    NB! If the randomly chosen spawn points collide, a pedestrian
    is not spawned, so there might be fewer pedestrians than 
    n_pedestrians in the simulation.

    Inputs:
        n_pedestrians - number of pedestrians to spawn
        min_speed - minimum walking speed for a pedestrian
        max_speed - maximum walking speed for a pedestrian
    """
    if n_pedestrians == 0:
        return
    
    pedestrian_blueprints = blueprint_library.filter("walker.pedestrian.*")
    points = []
    for i in range(n_pedestrians):
        point = carla.Transform()
        point.location = world.get_random_location_from_navigation()
        if (point.location != None):
            points.append(point)
    batch = []

    # Create a batch of spawn commands
    for point in points:
        pedestrian_bp = random.choice(pedestrian_blueprints)
        batch.append(carla.command.SpawnActor(pedestrian_bp, point))

    # Apply the batch of commands
    results = client.apply_batch_sync(batch, True)
    walkers_list = []
    collisions = 0
    for i in range(len(results)):
        if results[i].error:
            collisions += 1
        else:
            walkers_list.append({"id": results[i].actor_id})

    if collisions:
        print(f"Failed to spawn {collisions}/{n_pedestrians} pedestrians because of collisions")

    # Create a batch of spawn commands for AI controllers
    batch = []
    walker_controller_bp = blueprint_library.find('controller.ai.walker')
    for i in range(len(walkers_list)):
        batch.append(carla.command.SpawnActor(walker_controller_bp, carla.Transform(), walkers_list[i]["id"]))

    # Apply the batch of commands
    results = client.apply_batch_sync(batch, True)
    for i in range(len(results)):
        if results[i].error:
            print(f"Spawning pedestrian AI failed: {results[i].error}")
        else:
            walkers_list[i]["con"] = results[i].actor_id

    all_ids = []
    for i in range(len(walkers_list)):
        all_ids.append(walkers_list[i]["con"])
        all_ids.append(walkers_list[i]["id"])
    all_actors = world.get_actors(all_ids)

    for i in range(0, len(all_actors), 2):
        all_actors[i].start()
        all_actors[i].go_to_location(world.get_random_location_from_navigation())
        all_actors[i].set_max_speed(random.uniform(min_speed, max_speed))

add_vehicles_to_simulation(n_vehicles=10)
add_pedestrians_to_simulation()

### Run full simulation in CARLA

##### Filesystem methods

In [13]:
def delete_all_in_directory(target_directory):
    if os.path.exists(target_directory):
        for filename in os.listdir(target_directory):
            file_path = os.path.join(target_directory, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.remove(file_path) 
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print(f'Failed to delete {file_path}. Reason: {e}')


def create_filename_from_timestamp(timestamp):
    SECONDS_TO_NANOSECONDS = 1000000000
    filename = str(math.trunc(timestamp * SECONDS_TO_NANOSECONDS))
    return filename


##### General sensor processing methods

In [14]:
def save_sensor_position(raw_data, target_directory, sensor_type=None):
    transform_matrix = raw_data.transform.get_matrix()
    if sensor_type is not None:
        transform_matrix = transform_from_carla(sensor_type, transform_matrix)
    if not os.path.exists(target_directory):
        os.makedirs(target_directory, exist_ok=True)
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".npy"
    np.save(f"{target_directory}/{filename}", transform_matrix)

##### RBG Camera Methods

In [15]:
def save_rgb_image(raw_data, target_directory):    
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    rgb_image = np.reshape(np.copy(raw_data.raw_data), (raw_data.height, raw_data.width, 4))
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".png"
    cv2.imwrite(f"{directory}/{filename}", rgb_image)

##### Depth Camera Methods

In [16]:
def save_depth_image(raw_data, target_directory):
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".png"
    raw_data.save_to_disk(f"{directory}/{filename}")

##### Semantic Camera Methods

In [17]:
def save_semantic_image(raw_data, target_directory):
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".png"
    raw_data.save_to_disk(f"{directory}/{filename}")

##### Radar Methods

In [18]:
def save_radar_readings(raw_data, target_directory):
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)

    radar_points_list = []
    for measurement in raw_data:
        azi = math.degrees(measurement.azimuth)
        alt = math.degrees(measurement.altitude)
        fw_vec = carla.Vector3D(x=measurement.depth)
        carla.Transform(
            carla.Location(),
            carla.Rotation(pitch=alt,yaw=azi,roll=0)
        ).transform(fw_vec)
        radar_points_list.append([fw_vec.x, fw_vec.y, fw_vec.z])
    
    points = np.array(radar_points_list)
    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(points)
    
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".ply"
    o3d.io.write_point_cloud(f"{directory}/{filename}", point_cloud)

##### Lidar methods

In [19]:
def visualize_lidar(raw_data, window_name):
    # Get the LiDAR point cloud from the data
    lidar_data = raw_data.raw_data
    lidar_data = np.frombuffer(lidar_data, dtype=np.dtype('f4'))
    lidar_data = np.reshape(lidar_data, (int(lidar_data.shape[0] / 4), 4))

    # Extract X, Y, Z coordinates and intensity values
    points_xyz = lidar_data[:, :3]
    #intensity = lidar_data[:, 3]

    # Intensity scaling factor
    intensity_scale = 2.0  # Adjust this value to control the brightness

    # Create a 2D histogram with a predetermined size
    width, height = 360, 360
    lidar_image_array = np.zeros((height, width))

    # Scale and shift X and Y coordinates to fit within the histogram size
    def normalize(data):
        shifted = data - np.min(data)
        normal = shifted / np.max(shifted)
        return normal
    X = (normalize(points_xyz[:, 0]) * (width-1)).astype(int)
    Y = (normalize(points_xyz[:, 1]) * (height-1)).astype(int)

    # Assign scaled intensity values to the corresponding pixel in the histogram
    lidar_image_array[Y, X] = 255

    # Display the processed image using Pygame
    cv2.imshow(window_name, lidar_image_array)

def save_lidar_readings(raw_data, target_directory):
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".ply"
    raw_data.save_to_disk(f"{directory}/{filename}")

##### Semantic Lidar Methods

In [20]:
def filter_semantic_lidar_readings(semantic_lidar_data):
    filtered_lidar_data = np.copy(semantic_lidar_data)
    
    unwanted_data_tags = np.array([
        SemanticTags.ROADLINE.value, SemanticTags.ROAD.value, SemanticTags.SIDEWALK.value,
        SemanticTags.GROUND.value, SemanticTags.WATER.value, SemanticTags.TERRAIN.value
    ]) 
    if len(unwanted_data_tags) > 0:
        data_tags = filtered_lidar_data[:, 3].flatten()
        filter_mask = np.isin(data_tags, unwanted_data_tags, invert=True)
        filtered_lidar_data = filtered_lidar_data[filter_mask]
    
    z_range = None
    if z_range:
        z_min, z_max = z_range
        z_values = filtered_lidar_data[:, 2:3].flatten()
        filter_mask = np.where(z_min <= z_values, True, False) * np.where(z_values <= z_max, True, False)
        filtered_lidar_data = filtered_lidar_data[filter_mask]
    
    return filtered_lidar_data

def save_semantic_lidar_readings(raw_data, target_directory):
    directory = os.path.dirname(target_directory)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)

    lidar_data = np.array([(detection.point.x, detection.point.y, detection.point.z, int(detection.object_tag)) for detection in raw_data])
    lidar_data = filter_semantic_lidar_readings(lidar_data)
    points = lidar_data[:, :3]
    # Create an Open3D point cloud object
    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(points)

    # Save the point cloud to a .ply file
    filename = create_filename_from_timestamp(raw_data.timestamp) + ".ply"
    o3d.io.write_point_cloud(f"{target_directory}/{filename}", point_cloud)

In [None]:
SIMULATED_DATA_DIRECTORY = "./generated_data"
delete_all_in_directory(SIMULATED_DATA_DIRECTORY)

cv2.namedWindow("Press Q to stop simulation")
cv2.imshow("Press Q to stop simulation", np.zeros((1,1)))

agent_path = get_agent_path(planned_path)
vehicle_control_agent.set_destination(agent_path.pop(0))
try:
    with CarlaSyncMode(world, sensors, fps=10) as sync_mode:
        print("Driving to the start of the path...")
        while vehicle_control_agent.done() is False:
            if cv2.waitKey(1) == ord('q'):
                break
            sync_mode.tick()
            vehicle.apply_control(vehicle_control_agent.run_step())
        print("Arrived at the start of the path!")

        print("Driving on path...")
        while True:
            simulation_results = sync_mode.tick(timeout=300.0)[1:]
            for i in range(len(simulation_results)):
                sensor = sensors[i]
                sensor_data = simulation_results[i]
                sensor_name = sensor_names[i].replace("base_link_to_", "")
                sensor_type = sensor_types[i]
                if ("camera.rgb" in sensor_type):
                    save_rgb_image(sensor_data, f"generated_data/{sensor_name}/")
                elif ("camera.semantic_segmentation" in sensor_type):
                    save_semantic_image(sensor_data, f"generated_data/{sensor_name}/")
                elif ("sensor.other.radar" in sensor_type):
                    save_radar_readings(sensor_data, f"generated_data/{sensor_name}/")
                elif ("camera.depth" in sensor_type):
                    save_depth_image(sensor_data, f"generated_data/{sensor_name}/") 
                elif ("sensor.lidar.ray_cast_semantic" in sensor_type):
                    save_semantic_lidar_readings(sensor_data, f"generated_data/{sensor_name}/")
                elif ("sensor.lidar" in sensor_type):
                    save_lidar_readings(sensor_data, f"generated_data/{sensor_name}/")
                save_sensor_position(sensor_data, f"generated_data/{sensor_name}/", sensor_type=sensor_type)
            
            if cv2.waitKey(1) == ord('q'):
                break

            if vehicle_control_agent.done():
                print(f"{datetime.now()} Checkpoint reached. Ego vehicle has reached {len(planned_path) - len(agent_path)}/{len(planned_path)} planned path points.")
                if len(agent_path) == 0:
                    break
                vehicle_control_agent.set_destination(agent_path.pop(0))
            vehicle.apply_control(vehicle_control_agent.run_step())

        print("Driving on path finished!")
finally:
    cv2.destroyAllWindows()
    for sensor in sensors:
        sensor.stop()