In [1]:
import carla
import math
import random
import time
import numpy as np
import cv2
import subprocess
import open3d as o3d
import matplotlib

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
# Create a new client instance
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)

world = client.get_world()

bp_lib = world.get_blueprint_library()
spawn_points = world.get_map().get_spawn_points()

vehicle_bp = bp_lib.find('vehicle.dodge.charger')
vehicle = world.try_spawn_actor(vehicle_bp, random.choice(spawn_points))

spectator = world.get_spectator()
transform = carla.Transform(vehicle.get_transform().transform(carla.Location(x=-4, z=2.5)), vehicle.get_transform().rotation)
spectator.set_transform(transform)

In [3]:
# Start the traffic light controller in the background
traffic_light_process = subprocess.Popen(['python', 'traffic_light_controller.py'])
traffic_process = subprocess.Popen(['python', 'generate_traffic.py', '--number-of-vehicles', '30', '--number-of-walkers', '50'])
time.sleep(5)

# Set up traffic manager
traffic_manager = client.get_trafficmanager()
vehicle.set_autopilot(True, traffic_manager.get_port())

In [4]:
VIRIDIS = np.array(matplotlib.colormaps.get_cmap('plasma').colors)
VID_RANGE = np.linspace(0.0, 1.0, VIRIDIS.shape[0])
COOL_RANGE = np.linspace(0.0, 1.0, VIRIDIS.shape[0])
COOL = np.array(matplotlib.colormaps.get_cmap('winter')(COOL_RANGE))
COOL = COOL[:, :3]  # Remove alpha channel

camera_init_trans = carla.Transform(carla.Location(x=0.0, y=0.0, z=1.6), carla.Rotation(pitch=0.0))

def set_camera_attributes(camera_bp):
    camera_bp.set_attribute('image_size_x', '800')
    camera_bp.set_attribute('image_size_y', '600')
    camera_bp.set_attribute('fov', '90')

# RGB Camera
rgb_camera_bp = bp_lib.find('sensor.camera.rgb')
set_camera_attributes(rgb_camera_bp)
rgb_camera = world.spawn_actor(rgb_camera_bp, camera_init_trans, attach_to=vehicle)

# Semantic Segmentation Camera
sem_camera_bp = bp_lib.find('sensor.camera.semantic_segmentation')
set_camera_attributes(sem_camera_bp)
sem_camera = world.spawn_actor(sem_camera_bp, camera_init_trans, attach_to=vehicle)

# Instance Segmentation Camera
inst_camera_bp = bp_lib.find('sensor.camera.instance_segmentation')
set_camera_attributes(inst_camera_bp)
inst_camera = world.spawn_actor(inst_camera_bp, camera_init_trans, attach_to=vehicle)

# Depth Camera
depth_camera_bp = bp_lib.find('sensor.camera.depth')
set_camera_attributes(depth_camera_bp)
depth_camera = world.spawn_actor(depth_camera_bp, camera_init_trans, attach_to=vehicle)
# GNSS Sensor
gnss_bp = bp_lib.find('sensor.other.gnss')
gnss_sensor = world.spawn_actor(gnss_bp, carla.Transform(), attach_to=vehicle)

# IMU Sensor
imu_bp = bp_lib.find('sensor.other.imu')
imu_sensor = world.spawn_actor(imu_bp, carla.Transform(), attach_to=vehicle)

# Collision Sensor
collision_bp = bp_lib.find('sensor.other.collision')
collision_sensor = world.spawn_actor(collision_bp, carla.Transform(), attach_to=vehicle)

# Obstacle Sensor – note the adjusted transform
obstacle_bp = bp_lib.find('sensor.other.obstacle')
obstacle_bp.set_attribute('hit_radius', '2')
obstacle_bp.set_attribute('distance', '50')
obstacle_sensor = world.spawn_actor(obstacle_bp, 
                                    carla.Transform(carla.Location(x=2.5, z=1.5)),
                                    attach_to=vehicle)

In [5]:
# ---------------------------
# Initialize image data storage and sensor data dictionary
# ---------------------------
image_w = 800
image_h = 600
sensor_data = {
    'rgb_image': np.zeros((image_h, image_w, 4), dtype=np.uint8),
    'sem_image': np.zeros((image_h, image_w, 4), dtype=np.uint8),
    'inst_image': np.zeros((image_h, image_w, 4), dtype=np.uint8),
    'depth_image': np.zeros((image_h, image_w, 4), dtype=np.uint8),
    # Extra sensor fields:
    'gnss': [0, 0],
    'imu': {
        'gyro': carla.Vector3D(),
        'accel': carla.Vector3D(),
        'compass': 0
    },
    'collision': False,
    'obstacle': []
}

# ---------------------------
# Define helper functions for overlaying labels and sensor info
# ---------------------------
def add_label(image, label, position=(10, 50), font_scale=0.8, color=(255, 255, 255)):
    labeled_image = image.copy()
    cv2.putText(labeled_image, label, position, cv2.FONT_HERSHEY_SIMPLEX, 
                font_scale, color, 2, cv2.LINE_AA)
    return labeled_image

# Camera image callbacks
def rgb_callback(image, data_dict):
    img = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    # Save a plain copy (without overlays)
    data_dict['rgb_image_raw'] = img.copy()
    # Create a copy with label for the tiled view
    data_dict['rgb_image'] = add_label(img, "RGB Camera")

def sem_callback(image, data_dict):
    image.convert(carla.ColorConverter.CityScapesPalette)
    img = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    data_dict['sem_image'] = add_label(img, "Semantic Segmentation")

def inst_callback(image, data_dict):
    img = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    data_dict['inst_image'] = add_label(img, "Instance Segmentation")

def depth_callback(image, data_dict):
    image.convert(carla.ColorConverter.LogarithmicDepth)
    img = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    data_dict['depth_image'] = add_label(img, "Depth Camera")

# Additional sensor callbacks
def gnss_callback(data, data_dict):
    data_dict['gnss'] = [data.latitude, data.longitude]

def imu_callback(data, data_dict):
    data_dict['imu'] = {
        'gyro': data.gyroscope,
        'accel': data.accelerometer,
        'compass': data.compass
    }

def collision_callback(event, data_dict):
    data_dict['collision'] = True

# For projecting obstacle positions onto the image we need an intrinsic matrix.
def build_projection_matrix(w, h, fov):
    focal = w / (2.0 * np.tan(fov * np.pi / 360.0))
    K = np.identity(3)
    K[0, 0] = K[1, 1] = focal
    K[0, 2] = w / 2.0
    K[1, 2] = h / 2.0
    return K

def get_image_point(loc, K, w2c):
    # Create a homogeneous point
    point = np.array([loc.x, loc.y, loc.z, 1])
    point_camera = np.dot(w2c, point)
    # Adjust coordinate order (if necessary)
    point_camera = [point_camera[1], -point_camera[2], point_camera[0]]
    point_img = np.dot(K, point_camera)
    point_img[0] /= point_img[2]
    point_img[1] /= point_img[2]
    return tuple(map(int, point_img[0:2]))

def obstacle_callback(event, data_dict, camera, K, image_w, image_h):
    # Only consider vehicles or walkers
    if not ('vehicle' in event.other_actor.type_id or 'walker' in event.other_actor.type_id):
        return

    data_dict.setdefault('obstacle', []).append({
        'transform': event.other_actor.type_id,
        'frame': event.frame
    })

    world_2_camera = np.array(camera.get_transform().get_inverse_matrix())
    obstacle_location = event.other_actor.get_location()
    image_point = get_image_point(obstacle_location, K, world_2_camera)
    if 'rgb_image' not in data_dict or data_dict['rgb_image'] is None:
        return
    if 0 < image_point[0] < image_w and 0 < image_point[1] < image_h:
        cv2.circle(data_dict['rgb_image'], tuple(image_point), 10, (0, 0, 255), 2)

In [6]:
# A simple compass drawing function
def draw_compass(img, theta):
    compass_center = (700, 100)
    compass_size = 50
    cardinal_directions = [
        ('N', [0, -1]),
        ('E', [1, 0]),
        ('S', [0, 1]),
        ('W', [-1, 0])
    ]
    for label, offset in cardinal_directions:
        cv2.putText(
            img, label,
            (int(compass_center[0] + 1.2 * compass_size * offset[0]), 
             int(compass_center[1] + 1.2 * compass_size * offset[1])),
            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 200), 2
        )
    compass_point = (
        int(compass_center[0] + compass_size * math.sin(theta)),
        int(compass_center[1] - compass_size * math.cos(theta))
    )
    cv2.line(img, compass_center, compass_point, (0, 140, 255), 3)

rgb_camera.listen(lambda image: rgb_callback(image, sensor_data))
sem_camera.listen(lambda image: sem_callback(image, sensor_data))
inst_camera.listen(lambda image: inst_callback(image, sensor_data))
depth_camera.listen(lambda image: depth_callback(image, sensor_data))
time.sleep(5)

gnss_sensor.listen(lambda data: gnss_callback(data, sensor_data))
imu_sensor.listen(lambda data: imu_callback(data, sensor_data))
collision_sensor.listen(lambda event: collision_callback(event, sensor_data))
obstacle_sensor.listen(lambda event: obstacle_callback(event, sensor_data, rgb_camera,
                                                         build_projection_matrix(image_w, image_h, 90),
                                                         image_w, image_h))

lidar_bp = bp_lib.find('sensor.lidar.ray_cast_semantic')
lidar_bp.set_attribute('range', '100.0')
lidar_bp.set_attribute('upper_fov', '15.0')
lidar_bp.set_attribute('lower_fov', '-25.0')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('rotation_frequency', '20.0')
lidar_bp.set_attribute('points_per_second', '500000')
lidar_init_trans = carla.Transform(carla.Location(z=2))
lidar = world.spawn_actor(lidar_bp, lidar_init_trans, attach_to=vehicle)
time.sleep(5)

radar_bp = bp_lib.find('sensor.other.radar')
radar_bp.set_attribute('horizontal_fov', '30.0')
radar_bp.set_attribute('vertical_fov', '30.0')
radar_bp.set_attribute('points_per_second', '10000')
radar_init_trans = carla.Transform(carla.Location(z=2))
radar = world.spawn_actor(radar_bp, radar_init_trans, attach_to=vehicle)

lidar_pcd = o3d.geometry.PointCloud()
radar_pcd = o3d.geometry.PointCloud()

def get_random_color():
    color = np.random.rand(3)
    while np.linalg.norm(color) < 0.3:
        color = np.random.rand(3)
    return color

def lidar_callback(point_cloud, pcd):
    data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4')))
    if data.shape[0] == 0:
        return
    data = np.reshape(data, (-1, 6))
    points = data[:, :3]
    colors = np.array([get_random_color() for _ in range(points.shape[0])])
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.colors = o3d.utility.Vector3dVector(colors)

def radar_callback(data, pcd):
    radar_data = np.zeros((len(data), 4))
    for i, detection in enumerate(data):
        x = detection.depth * math.cos(detection.altitude) * math.cos(detection.azimuth)
        y = detection.depth * math.cos(detection.altitude) * math.sin(detection.azimuth)
        z = detection.depth * math.sin(detection.altitude)
        radar_data[i, :] = [x, y, z, detection.velocity]
    intensity = np.abs(radar_data[:, -1])
    intensity_safe = np.clip(intensity, 1e-6, None)
    intensity_col = 1.0 - np.log(intensity_safe) / np.log(np.exp(-0.004 * 100))
    int_color = np.c_[
        np.interp(intensity_col, COOL_RANGE, COOL[:, 0]),
        np.interp(intensity_col, COOL_RANGE, COOL[:, 1]),
        np.interp(intensity_col, COOL_RANGE, COOL[:, 2])
    ]
    points = radar_data[:, :3]
    points[:, :1] = -points[:, :1]
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.colors = o3d.utility.Vector3dVector(int_color)

lidar.listen(lambda data: lidar_callback(data, lidar_pcd))
radar.listen(lambda data: radar_callback(data, radar_pcd))
cv2.namedWindow('All Cameras', cv2.WINDOW_NORMAL)
cv2.resizeWindow('All Cameras', 1280, 960)
cv2.namedWindow('Duplicate RGB', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Duplicate RGB', 800, 600)

def preprocess_image(img):
    if img.shape[2] == 4:
        img = img[:, :, :3]
    return img

vis = o3d.visualization.Visualizer()
vis.create_window(window_name='Lidar View', width=960, height=540, left=400, top=270)
vis.get_render_option().background_color = [0.05, 0.05, 0.05]
vis.get_render_option().point_size = 1
vis.get_render_option().show_coordinate_frame = True

def add_open3d_axis(vis):
    axis = o3d.geometry.LineSet()
    axis.points = o3d.utility.Vector3dVector(np.array([
        [0.0, 0.0, 0.0],
        [0.1, 0.0, 0.0],
        [0.0, 0.1, 0.0],
        [0.0, 0.0, 0.1]
    ]))
    axis.lines = o3d.utility.Vector2iVector(np.array([
        [0, 1],
        [0, 2],
        [0, 3]
    ]))
    axis.colors = o3d.utility.Vector3dVector(np.array([
        [1, 0, 0],
        [0, 1, 0],
        [0, 0, 1]
    ]))
    vis.add_geometry(axis)

In [7]:
add_open3d_axis(vis)

lidar_added = False
radar_added = False

# Counter for collision display duration
collision_counter = 20

while True:
    try:
        # First, create a copy of the current raw RGB image (without overlays)
        if 'rgb_image_raw' in sensor_data:
            duplicate_rgb = sensor_data['rgb_image_raw'].copy()
        else:
            duplicate_rgb = np.zeros((image_h, image_w, 4), dtype=np.uint8)

        rgb_img = sensor_data['rgb_image'].copy()
        font = cv2.FONT_HERSHEY_PLAIN # cv2.FONT_HERSHEY_SIMPLEX
        fontScale = 0.5
        fontScale_2 = 1.
        fontColor = (255, 255, 255)
        thickness = 2

        start_y = 500  # starting y coordinate for the sensor overlays
        line_spacing = 20  # space between lines

        lat_val = sensor_data['gnss'][0] * 100000
        long_val = sensor_data['gnss'][1] * 100000

        cv2.putText(rgb_img, 
                    'Lat: ' + f"{lat_val:.2f}",
                    (10, start_y), font, fontScale_2, fontColor, thickness, cv2.LINE_AA)
        cv2.putText(rgb_img, 
                    'Long: ' + f"{long_val:.2f}",
                    (10, start_y + line_spacing), font, fontScale_2, fontColor, thickness, cv2.LINE_AA)

        accel = sensor_data['imu']['accel'] - carla.Vector3D(x=0, y=0, z=9.81)
        accel_val = accel.length() * 100

        cv2.putText(rgb_img, 
                    'Accel: ' + f"{accel_val:.2f}",
                    (10, start_y + 2 * line_spacing), font, fontScale_2, fontColor, thickness, cv2.LINE_AA)
        
        gyro_val = sensor_data['imu']['gyro'].length() * 100
        cv2.putText(rgb_img, 
                    'Gyro: ' + f"{gyro_val:.2f}",
                    (10, start_y + 3 * line_spacing), font, fontScale_2, fontColor, thickness, cv2.LINE_AA)

        
        compass_val = sensor_data['imu']['compass'] * 10
        cv2.putText(rgb_img, 
                    'Compass: ' + f"{compass_val:.2f}",
                    (10, start_y + 4 * line_spacing), font, fontScale_2, fontColor, thickness, cv2.LINE_AA)
        
        draw_compass(rgb_img, sensor_data['imu']['compass'])

        # Display collision alert (if any)
        if sensor_data['collision']:
            collision_counter -= 1
            if collision_counter <= 1:
                sensor_data['collision'] = False
            cv2.putText(rgb_img, 
                        'COLLISION',  
                        (250, 300), font, 2, (255, 255, 255), 3, cv2.LINE_AA)
        else:
            collision_counter = 20

        # Update the overlay version of the rgb_image
        sensor_data['rgb_image'] = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2BGRA)

        # Process and prepare images for display (drop alpha channel if needed)
        processed_data = {key: preprocess_image(img) for key, img in sensor_data.items() if 'image' in key and key != 'rgb_image_raw'}

        # Create a tiled display:
        # Top row: RGB (with overlays) and Semantic
        top_row = np.concatenate([processed_data['rgb_image'], processed_data['sem_image']], axis=1)
        # Bottom row: Depth and Instance
        bottom_row = np.concatenate([processed_data['depth_image'], processed_data['inst_image']], axis=1)
        tiled = np.concatenate((top_row, bottom_row), axis=0)
        cv2.imshow('All Cameras', tiled)
        # Show the plain duplicate RGB window (without overlays)
        cv2.imshow('Duplicate RGB', preprocess_image(duplicate_rgb))
        
        # Update Open3D visualizer with LiDAR and Radar point clouds
        if not lidar_added:
            vis.add_geometry(lidar_pcd)
            lidar_added = True
        if not radar_added:
            vis.add_geometry(radar_pcd)
            radar_added = True
        vis.update_geometry(lidar_pcd)
        vis.update_geometry(radar_pcd)
        vis.poll_events()
        vis.update_renderer()

        if cv2.waitKey(1) == ord('q'):
            break

        time.sleep(0.005)

    except Exception as e:
        print("Error:", e)
        break

cv2.destroyAllWindows()
vis.destroy_window()

