In [1]:
## Libraries import

import carla, time, pygame, cv2, math, random, os, traceback
# import paho.mqtt.client as mqtt
import numpy as np
import tkinter as tk

pygame 2.6.1 (SDL 2.28.4, Python 3.7.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
## Carla set up

try:
    client = carla.Client('localhost', 2000)
    client.set_timeout(10.0)
    world = client.get_world()
except Exception as e:
    print(f"Failed to connect to CARLA: {e}")
    exit(1)

if not all(os.path.exists(f) for f in ["yolov4-tiny.weights", "yolov4-tiny.cfg"]):
    print("YOLO model files not found!")
    exit(1)

world = client.get_world()
spectator = world.get_spectator()
carla_settings = world.get_settings()

capture = True
stored_image = None

In [3]:
## Utliity functions

def spawn_vehicle(world, vehicle_index=0, spawn_index=0):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter('vehicle.*')[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

def spawn_walker(world):
    blueprint_library = world.get_blueprint_library()
    
    # Get a random pedestrian blueprint
    walker_bp = random.choice(blueprint_library.filter('walker.pedestrian.*'))
    
    # Set a random speed
    speed = random.uniform(0.8, 2.0)
    walker_bp.set_attribute('speed', str(speed))
    
    # Get a random spawn point
    spawn_points = world.get_map().get_spawn_points()
    spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform()
    
    # Spawn the pedestrian
    walker = world.try_spawn_actor(walker_bp, spawn_point)
    if not walker:
        return None, None  # Return None if spawning failed
    
    # Spawn the AI controller
    controller_bp = blueprint_library.find('controller.ai.walker')
    controller = world.try_spawn_actor(controller_bp, carla.Transform(), walker)
    
    if controller:
        # Start the pedestrian's movement
        controller.start()
        destination = world.get_random_location_from_navigation()
        if destination:
            controller.go_to_location(destination)
        controller.set_max_speed(speed)  # Set the max speed
    
    return walker, controller

def callback(image):
    image.convert(carla.ColorConverter.CityScapesPalette)
    time.sleep(3)
    image.save_to_disk('output/%.6d.png' % image.frame)

def render(image, display):
    if image is not None:
        array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
        array = np.reshape(array, (image.height, image.width, 4))
        array = array[:,:,:3]
        array = array[:,:,::-1]
        raw_image = cv2.cvtColor(array, cv2.COLOR_BGR2RGB)
        return raw_image
    
def image_processing(image, target_size):
    ih, iw = target_size
    h, w, _ = image.shape

    scale = min(iw / w, ih / h)
    nw, nh = int(scale * w), int(scale * h)

    image_resized = cv2.resize(image, (nw, nh), interpolation = cv2.INTER_LINEAR)

    image_padded = np.full((ih, iw, 3), 128.0)
    dw, dh = (iw - nw) // 2, (ih - nh) // 2
    image_padded[dh:nh+dh, dw:nw+dw, :] = image_resized

    image_padded = image_padded / 255.0
    return image_padded.astype(np.float32)

def move_spectator_to(transform, spectator, distance=5.0, x=0, y=0, z=4, yaw=0, pitch=-30, roll=0):
    back_location = transform.location - transform.get_forward_vector() * distance

    back_location.x += x
    back_location.y += y
    back_location.z += z
    transform.rotation.yaw += yaw
    transform.rotation.pitch = pitch
    transform.rotation.roll = roll

    spectator_transform = carla.Transform(back_location, transform.rotation)

    spectator.set_transform(spectator_transform)

In [4]:
## warning system (using MQTT)

def brake(vehicle):
    """Apply emergency braking to the vehicle."""
    control = carla.VehicleControl()
    control.throttle = 0.0
    control.brake = 1.0
    vehicle.apply_control(control)

# mqtt_client = mqtt.Client()
# mqtt_client.connect("localhost", 1883, 60)

def send_warning(vehicle):
    # mqtt_client.publish("warning", "Warning! Collision imminent!")
    brake(vehicle)

In [None]:
## Main utility functions

VIEW_WIDTH = tk.Tk().winfo_screenwidth()//3
VIEW_HEIGHT = tk.Tk().winfo_screenheight()//3
VIEW_FOV = 90

BB_COLOR = (248, 64, 24)

def camera_blueprint():
    """
    Returns camera blueprint.
    """
    camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
    camera_bp.set_attribute('image_size_x', str(VIEW_WIDTH))
    camera_bp.set_attribute('image_size_y', str(VIEW_HEIGHT))
    camera_bp.set_attribute('fov', str(VIEW_FOV))

    return camera_bp

def setup_car():
    """
    Spawns actor-vehicle to be controled.
    """
    car_bp = world.get_blueprint_library().filter('vehicle.*')[0]
    location = random.choice(world.get_map().get_spawn_points())
    car = world.spawn_actor(car_bp, location)
    return car

def callback(image):
    """
    Callback function to be called when new image is received.
    """
    image.convert(carla.ColorConverter.CityScapesPalette)
    time.sleep(3)
    image.save_to_disk('output/%.6d.png' % image.frame)

def set_image(img):
    global stored_image
    global capture
    if capture:
        stored_image = img
        capture = False

stored_rgb_image = None
stored_depth_image = None

def set_rgb_image(image):
    """Stores the latest RGB image."""
    global stored_rgb_image
    stored_rgb_image = image

def set_depth_image(image):
    """Stores the latest Depth image."""
    global stored_depth_image
    stored_depth_image = image


def setup_camera(car):
    """
    Spawns both an RGB and a Depth camera on the vehicle.
    """
    camera_transform = carla.Transform(carla.Location(x=1.6, z=1.7), carla.Rotation(pitch=0))

    # Get camera blueprints
    blueprint_library = world.get_blueprint_library()
    
    # RGB Camera
    rgb_bp = blueprint_library.find('sensor.camera.rgb')
    rgb_bp.set_attribute('image_size_x', str(VIEW_WIDTH))
    rgb_bp.set_attribute('image_size_y', str(VIEW_HEIGHT))
    rgb_camera = world.spawn_actor(rgb_bp, camera_transform, attach_to=car)
    rgb_camera.listen(lambda image: set_rgb_image(image))

    # Depth Camera
    depth_bp = blueprint_library.find('sensor.camera.depth')
    depth_bp.set_attribute('image_size_x', str(VIEW_WIDTH))
    depth_bp.set_attribute('image_size_y', str(VIEW_HEIGHT))
    depth_camera = world.spawn_actor(depth_bp, camera_transform, attach_to=car)
    depth_camera.listen(lambda image: set_depth_image(image))

    # Camera calibration for image processing
    calibration = np.identity(3)
    calibration[0, 2] = VIEW_WIDTH / 2.0
    calibration[1, 2] = VIEW_HEIGHT / 2.0
    calibration[0, 0] = calibration[1, 1] = VIEW_WIDTH / (2.0 * np.tan(VIEW_FOV * np.pi / 360.0))
    rgb_camera.calibration = calibration
    depth_camera.calibration = calibration

    return rgb_camera, depth_camera

def get_depth_at_pixel(x, y):
    """
    Retrieves the depth value (in meters) for a given pixel (x, y).
    Assumes the stored_depth_image is in BGRA format.
    """
    if stored_depth_image is None:
        return None

    if x < 0 or x >= stored_depth_image.width or y < 0 or y >= stored_depth_image.height:
        raise ValueError("Pixel coordinates are out of bounds.")

    depth_array = np.frombuffer(stored_depth_image.raw_data, dtype=np.uint8)
    depth_array = np.reshape(depth_array, (stored_depth_image.height, stored_depth_image.width, 4))

    blue  = depth_array[y, x, 0]
    green = depth_array[y, x, 1]
    red   = depth_array[y, x, 2]

    normalized_depth = (red + green * 256 + blue * 256**2) / (256**3 - 1)

    depth_in_meters = normalized_depth * 1000.0

    return depth_in_meters

def control_car(car):
    """
    Applies control to main car based on pygame pressed keys.
    Will return True If ESCAPE is hit, otherwise False to end main loop.
    """
    # move_spectator_to(car.get_transform(), spectator)
    control = car.get_control()

    control.brake = 0.0
    control.steer = 0.0

    for event in pygame.event.get():
        if event.type == pygame.KEYDOWN:
            key = event.key
            # print(key)
            if key == pygame.K_ESCAPE:
                return True # TODO:change
            if key == pygame.K_w:
                control.reverse = False
                control.throttle = 0.6
            if key == pygame.K_s:
                control.reverse = True
                control.throttle = 0.6
            if key == pygame.K_a:
                control.throttle = 0.4
                control.steer = -0.45
            if key == pygame.K_d:
                control.throttle = 0.4
                control.steer = 0.45

        car.apply_control(control)

def pedestrian_safety_monitoring(vehicle, results, reaction_time=1.5, deceleration=7.5):
    """
    Monitors pedestrian safety using vehicle speed, braking distance, and depth camera data.
    
    - Stops the vehicle if a pedestrian is too close.
    - Uses depth camera to get accurate pedestrian distances.
    """
    # Get vehicle speed and compute minimum safe stopping distance
    velocity = vehicle.get_velocity()
    vehicle_speed = math.sqrt((velocity.x**2 + velocity.y**2 + velocity.z**2))

    reaction_distance = vehicle_speed * reaction_time
    braking_distance = (vehicle_speed ** 2) / (2 * deceleration)
    # min_safe_distance = (reaction_distance + braking_distance) * 10
    min_safe_distance = 4 # m
    safe_distance_threshold = 1.5 # m

    object_speed = 0  # Assume pedestrian speed is negligible
    relative_speed = max(vehicle_speed - object_speed, 0.01)  # Avoid division by zero

    for confidence, bbox, centroid in results:
        depth = get_depth_at_pixel(int(centroid[0]), int(centroid[1]))
        if depth is None or depth > 100:
            continue

        distance = depth

        # print(f"Distance {distance:.2f}m, Minimum Safe Distance: {min_safe_distance:.2f}m")
        control = vehicle.get_control()
        if distance < min_safe_distance and vehicle_speed > 0.0:
            control.throttle = 0.0
            control.brake = 0.5
            vehicle.apply_control(control)
            print("Emergency braking")
            time.sleep(2)
        else:
            control_car(vehicle)

def pedestrian_detection(image, model, layer_name, vehicle):
    """
    Detects pedestrians in the image using YOLOv3 model.
    """
    NMS_THRESHOLD = 0.3
    MIN_CONFIDENCE = 0.2
    (H, W) = image.shape[:2]
    results = []
    personidz = 0
    # constructu blob and this will retirn the bounding boxes and confidence values
    blob = cv2.dnn.blobFromImage(image, 1 / 255.0, (416, 416),
                                swapRB=True, crop=False)
    model.setInput(blob)
    layerOutputs = model.forward(layer_name)

    boxes = []
    centroids = []
    confidences = []

    # LayerOutputs is a list of lists containing outputs. Each list in layer output contains details about single prediction like its bounding box confidence
    for output in layerOutputs:
        for detection in output:

            scores = detection[5:]
            classID = np.argmax(scores)
            confidence = scores[classID]

            if classID == personidz and confidence > MIN_CONFIDENCE:
                box = detection[0:4] * np.array([W, H, W, H])
                (centerX, centerY, width, height) = box.astype("int")

                x = int(centerX - (width / 2))
                y = int(centerY - (height / 2))

                boxes.append([x, y, int(width), int(height)])
                centroids.append((centerX, centerY))
                confidences.append(float(confidence))
    # apply non-maxima suppression to suppress weak, overlapping
    # bounding boxes
    idzs = cv2.dnn.NMSBoxes(boxes, confidences, MIN_CONFIDENCE, NMS_THRESHOLD)
    # ensure at least one detection exists
    if len(idzs) > 0:
        # loop over the indexes we are keeping
        for i in idzs.flatten():
            # extract the bounding box coordinates
            (x, y) = (boxes[i][0], boxes[i][1])
            (w, h) = (boxes[i][2], boxes[i][3])
            # update our results list to consist of the person
            # prediction probability, bounding box coordinates,
            # and the centroid
            res = (confidences[i], (x, y, x + w, y + h), centroids[i])
            results.append(res)
    # return the list of results
    pedestrian_safety_monitoring(vehicle, results)
    return results

In [8]:
def main():
    try:
        # Spawn vehicle
        vehicle = setup_car()
        time.sleep(2)

        # Spawn camera
        rgb_camera, depth_camera = setup_camera(vehicle)
    
        # Spawn test pedestrians (still)
        walkers = []
        controllers = []
        for _ in range(50):
            walker, controller = spawn_walker(world)
            if walker and controller:
                walkers.append(walker)
                controllers.append(controller)

        pygame.init()
        pygame.display.set_mode((200,200))

        display = pygame.display.set_mode((VIEW_WIDTH, VIEW_HEIGHT), pygame.HWSURFACE | pygame.DOUBLEBUF)

        input_size = 416
        
        # labels_path = "coco.names"
        # LABELS = open(labels_path).read().strip().split("\n")

        weights_path = "yolov4-tiny.weights"
        config_path = "yolov4-tiny.cfg"
        model = cv2.dnn.readNetFromDarknet(config_path, weights_path)

        layer_name = model.getLayerNames()
        layer_name = [layer_name[i - 1] for i in model.getUnconnectedOutLayers()]
        writer = None

        while True:
            world.tick()
            move_spectator_to(vehicle.get_transform(), spectator)
            # Wait for both RGB and Depth images to be available
            while stored_rgb_image is None or stored_depth_image is None:
                world.tick()

            # Convert images for YOLO processing
            raw_image = render(stored_rgb_image, display)
            raw_image = cv2.cvtColor(raw_image, cv2.COLOR_BGR2RGB)
            # frame_size = raw_image.shape[:2]
            image_data = image_processing(np.copy(raw_image), [input_size, input_size])
            image_data = image_data[np.newaxis, ...]

            results = pedestrian_detection(raw_image, model, layer_name, vehicle)

            for res in results:
                cv2.rectangle(raw_image, (res[1][0], res[1][1]), (res[1][2], res[1][3]), (0,255,0), 2)
            cv2.imshow("Detection", raw_image)

            pygame.display.flip()
            control_car(vehicle)
            # time.sleep(0.1)
    except KeyboardInterrupt:
        print('\nSimulation interrupted by user')

    finally:
        print('Cleaning up...')
        pygame.quit()
        cv2.destroyAllWindows()
        actors = world.get_actors()
        # Cleanup
        if 'rgb_camera' in locals():
            rgb_camera.destroy()
        if 'depth_camera' in locals():
            depth_camera.destroy()
        actors = world.get_actors()
        for actor in actors:
            if isinstance(actor, (carla.Vehicle, carla.Walker)):
                actor.destroy()
        print('Done.')

if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        traceback.print_exc() 
        print(f'Exception occurred: {e}')

Emergency braking
Emergency braking

Simulation interrupted by user
Cleaning up...
Done.


In [10]:
actors = world.get_actors()
for actor in actors:
    if isinstance(actor, carla.Vehicle) or isinstance(actor, carla.Walker):
        actor.destroy()