# Blind Spot Detection (BSD) with Active Intervention

*Project for the course Smart Vehicular Systems (Academic Year 2024/2025).*

**Name:** Pablo Sebastian

**Surname:** Vargas Grateron

**Email:** pablo.vargasgrateron@studio.unibo.it

This project implements a blind spot detection system using [CARLA simulator](https://carla.org/) (Version 0.9.15) and MQTT for communication. The system detects vehicles in the blind spot of a simulated car and can trigger an active intervention if necessary.

> Before running the code, ensure that the CARLA server is running and the MQTT broker is accessible.

## Libraries and setup

To execute the code, you need to install the libraries listed in the `requirements.txt` file. You can install them using:
```bash
pip install -r requirements.txt
```

For the local imports, ensure that the `manual_control.py` and `manual_control_steeringwheel.py` files are in the same directory as this notebook or adjust the import path accordingly.

> The local imports can be found in the CARLA repository under the `PythonAPI/examples` directory.

In [None]:
import carla, time, pygame, math, random, cv2, os
import numpy as np
import paho.mqtt.client as mqtt
from ultralytics import YOLO

# Local imports
import manual_control
import manual_control_steeringwheel

Set the following boolean to `False` if you want to use the keyboard control instead of the steering wheel, or set it to `True` if you want to use the steering wheel control.
Set the `CAR_MODEL` to the desired vehicle model you want to use in the simulation. You can find a list of available vehicle models in [this list](https://github.com/carla-simulator/carla/blob/ue5-dev/Docs/catalogue_vehicles.md#audi-tt).

> Remember to set the `wheel_config.ini` file in the same directory as this notebook to configure the steering wheel settings.

In [None]:
IS_STEERING_WHEEL = False
CAR_MODEL = "vehicle.audi_tt"

This cell configures the CARLA server connection.

In [None]:
CARLA_HOST = 'localhost'
CARLA_PORT = 2000
CARLA_TIMEOUT = 10.0

carla_client = carla.Client(CARLA_HOST, CARLA_PORT)
carla_client.set_timeout(CARLA_TIMEOUT)

world = carla_client.get_world()
spectator = world.get_spectator()

This cell configures the MQTT client. We will use the public broker `test.mosquitto.org` for testing purposes. In a production environment, you should use a secure and private MQTT broker.

In [None]:
MQTT_BROKER = "test.mosquitto.org"
MQTT_PORT = 1883
MQTT_TOPIC = "svs_blind_spot_detection"

mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)

Further setup for the MQTT client, including connection and message handling.

In [None]:
def mqtt_on_connect(client, userdata, flags, rc):
    """
    Callback function triggered when MQTT client connects to broker
    - Automatically subscribes to the BSD topic once connected
    - rc codes: 0=success, 1=wrong protocol, 2=invalid client ID, etc.
    """
    print(f"[MQTT] Connected (code {rc})")
    client.subscribe(MQTT_TOPIC)

def mqtt_on_message(client, userdata, message):
    """
    Callback function triggered when a message is received on subscribed topics
    - Decodes incoming messages and prints them
    """
    decoded_message = message.payload.decode()
    print(f"[MQTT] Message on {message.topic} -> {decoded_message}")

def mqtt_send_message(topic, message):
    """
    Send a message to specified MQTT topic
    - Used to publish BSD alerts, status updates, or intervention commands
    - Message format: string (will be encoded to bytes automatically)
    """
    mqtt_client.publish(topic, message)
    print(f"[MQTT] Sent message: {message} to topic: {topic}")

mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_message = mqtt_on_message    

mqtt_client.loop_start()

## Functions

### CARLA basic functions (From the laboratory)

In [None]:
def move_spectator_to(transform, distance=5.0, x=0, y=0, z=4, yaw=0, pitch=-30, roll=0):
    back_location = transform.location - transform.get_forward_vector() * distance
    
    back_location.x += x
    back_location.y += y
    back_location.z += z
    transform.rotation.yaw += yaw
    transform.rotation.pitch = pitch
    transform.rotation.roll = roll
    
    spectator_transform = carla.Transform(back_location, transform.rotation)
    
    spectator.set_transform(spectator_transform)

def spawn_vehicle(vehicle_index=0, spawn_index=0, pattern='vehicle.*'):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter(pattern)[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

def draw_on_screen(world, transform, content='O', color=carla.Color(0, 255, 0), life_time=20):
    world.debug.draw_string(transform.location, content, color=color, life_time=life_time)

### Computer vision functions

We will implement functions to capture images from the car's cameras and detect vehicles using [YOLOv5](https://docs.ultralytics.com/models/yolov5/) model. We will use the nano version for real-time processing.

> Make sure to download the YOLOv5 nano model and place it in the `cv_model` directory (Or change the path in the code below).

In [None]:
MODEL_DIR = os.path.join(os.getcwd(), "cv_model")
MODEL_NAME = "yolov5nu.pt"

cv_model_path = os.path.join(MODEL_DIR, MODEL_NAME)
cv_model = YOLO(cv_model_path)

def detect_vehicles(image, model=cv_model, conf=0.25):
    """
    Detect vehicles (car, truck, bus, motorcycle) in an image using YOLOv5.

    Args:
        image (numpy.ndarray): Input image in RGB format.
        model: YOLOv5 model.
        conf (float): Confidence threshold (default: 0.25).

    Returns:
        list of dict: Detected vehicles with bounding box and class info.
    """
    results = model(image)
    
    # Results in tensor [x1, y1, x2, y2, conf, class]
    predictions = results.xyxy[0].cpu().numpy()
    detections = []

    for pred in predictions:
        x1, y1, x2, y2, confidence, class_id = pred
        if confidence >= conf:
            label = model.names[int(class_id)]
            if label in ['car', 'truck', 'bus', 'motorcycle']:
                detections.append({
                    'xmin': int(x1),
                    'ymin': int(y1),
                    'xmax': int(x2),
                    'ymax': int(y2),
                    'confidence': float(confidence),
                    'class_id': int(class_id),
                    'label': label
                })

    return detections

def carla_image_to_array(image):
    """
    Convert CARLA raw image data to NumPy RGB array for YOLO processing
    
    Args:
        image: CARLA camera sensor image object with raw_data attribute
    
    Returns:
        numpy.ndarray: RGB image array (height, width, 3) suitable for YOLO
    """
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = array.reshape((image.height, image.width, 4))
    array = array[:, :, :3]
    array = array[:, :, [2, 1, 0]] # Convert BGR to RGB
    return array

def process_camera_image(image):
    """
    Complete pipeline: Convert CARLA image and detect vehicles for BSD system
    
    Args:
        image: CARLA camera sensor image object
    
    Returns:
        tuple: (rgb_array, detections)
            - rgb_array: Converted RGB image as NumPy array
            - detections: List of vehicle detections from YOLO
    """
    rgb_array = carla_image_to_array(image)
    detections = detect_vehicles(rgb_array)
    
    if detections:
        print(f"Detected {len(detections)} vehicles:")
        for det in detections:
            print(f"  - {det['label']}: {det['confidence']:.2f} confidence")
    
    return rgb_array, detections

### Sensors functions

The next cells implement the functions to spawn:

- The **camera sensors** to capture image to further process with YOLOv5 and detect vehicles.
- The **radar sensor** to detect vehicles in the blind spot.

In [None]:
def spawn_camera(attach_to=None, transform=carla.Transform(carla.Location(x=0.0, y=0.0, z=1.0), carla.Rotation(pitch=0)), width=800, height=600):
    """
    Spawn an RGB camera sensor in the CARLA world for vehicle detection
    
    Args:
        attach_to: Vehicle or object to attach camera to (None = spawn freely)
        transform: Camera position and orientation relative to attachment point
        width: Image width in pixels (default: 800)
        height: Image height in pixels (default: 600)
    
    Returns:
        carla.Sensor: RGB camera sensor object for capturing images
    """
    camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
    camera_bp.set_attribute('image_size_x', str(width))
    camera_bp.set_attribute('image_size_y', str(height))
    camera = world.spawn_actor(camera_bp, transform, attach_to=attach_to)
    return camera

def spawn_radar(attach_to=None, transform=carla.Transform(carla.Location(x=0.0, y=0.0, z=1.0), carla.Rotation(pitch=0)), range=100, horizontal_fov=30, vertical_fov=30):
    """
    Spawn a radar sensor in the CARLA world for object detection
    
    Args:
        attach_to: Vehicle or object to attach radar to (None = spawn freely)
        transform: Radar position and orientation relative to attachment point
        range: Detection range in meters (default: 100m)
        horizontal_fov: Horizontal field of view in degrees (default: 30°)
        vertical_fov: Vertical field of view in degrees (default: 30°)
    
    Returns:
        carla.Sensor: Radar sensor object for detecting nearby objects
    """
    radar_bp = world.get_blueprint_library().find('sensor.other.radar')
    radar_bp.set_attribute('horizontal_fov', str(horizontal_fov))
    radar_bp.set_attribute('vertical_fov', str(vertical_fov))
    radar_bp.set_attribute('range', str(range))
    radar = world.spawn_actor(radar_bp, transform, attach_to=attach_to)
    return radar

## Main loop code

### Car setup

The following cell contains the setup for the car and its sensors. It includes the car's position, orientation, and the sensors to be used for blind spot detection.

### Game loop

The following cell contains the game loop and the main function to run the CARLA simulation. It initializes the world, spawns the car, and starts the sensors.

In [None]:
def gameLoop(args, client=None, sim_world=None):
    pygame.init()
    pygame.font.init()
    world = None
    original_settings = None

    try:
        if client is None:
            client = carla.Client(args.host, args.port)
            client.set_timeout(10.0)
        else:
            print("[GAMELOOP] -> Using existing CARLA client instance")

        if sim_world is None:
            sim_world = client.get_world()
        else:
            print("[GAMELOOP] -> Using existing CARLA world instance")

        if args.sync:
            original_settings = sim_world.get_settings()
            settings = sim_world.get_settings()
            if not settings.synchronous_mode:
                settings.synchronous_mode = True
                settings.fixed_delta_seconds = 0.05
            sim_world.apply_settings(settings)

            traffic_manager = client.get_trafficmanager()
            traffic_manager.set_synchronous_mode(True)

        if args.autopilot and not sim_world.get_settings().synchronous_mode:
            print("[GAMELOOP] -> WARNING: You are currently in asynchronous mode and could "
                  "experience some issues with the traffic simulation")

        display = pygame.display.set_mode(
            (args.width, args.height),
            pygame.HWSURFACE | pygame.DOUBLEBUF)
        display.fill((0,0,0))
        pygame.display.flip()

        hud = manual_control.HUD(args.width, args.height)
        world = manual_control.World(sim_world, hud, args)
        controller = None

        # Create controller based on user input
        if args.steering_wheel:
            controller = manual_control_steeringwheel.DualControl(world, args.autopilot)
        else:
            controller = manual_control.KeyboardControl(world, args.autopilot)

        if args.sync:
            sim_world.tick()
        else:
            sim_world.wait_for_tick()

        ## ========================================
        ## ======== GAME LOGIC STARTS HERE ========
        ## ========================================

        clock = pygame.time.Clock()
        while True:
            if args.sync:
                sim_world.tick()
            clock.tick_busy_loop(args.fps)
            if controller.parse_events(client, world, clock, args.sync):
                return
            world.tick(clock)
            world.render(display)
            pygame.display.flip()
 
    finally:

        if original_settings:
            sim_world.apply_settings(original_settings)

        if (world and world.recording_enabled):
            client.stop_recorder()

        if world is not None:
            world.destroy()

        pygame.quit()


This cell contains the configuration for the game loop, including argument parsing and logging setup.

> To configure the game loop, you can modify the `gameSetup` function to include additional parameters or change the default values.

In [None]:
def gameSetup(client=None, sim_world=None):
    argparser = manual_control.argparse.ArgumentParser(
        description='CARLA Manual Control Client')
    argparser.add_argument(
        '-v', '--verbose',
        action='store_true',
        dest='debug',
        help='print debug information')
    argparser.add_argument(
        '--host',
        metavar='H',
        default='127.0.0.1',
        help='IP of the host server (default: 127.0.0.1)')
    argparser.add_argument(
        '-p', '--port',
        metavar='P',
        default=2000,
        type=int,
        help='TCP port to listen to (default: 2000)')
    argparser.add_argument(
        '-a', '--autopilot',
        action='store_true',
        help='enable autopilot')
    argparser.add_argument(
        '--res',
        metavar='WIDTHxHEIGHT',
        default='1280x720',
        help='window resolution (default: 1280x720)')
    argparser.add_argument(
        '--filter',
        metavar='PATTERN',
        default='vehicle.*',
        help='actor filter (default: "vehicle.*")')
    argparser.add_argument(
        '--generation',
        metavar='G',
        default='2',
        help='restrict to certain actor generation (values: "1","2","All" - default: "2")')
    argparser.add_argument(
        '--rolename',
        metavar='NAME',
        default='hero',
        help='actor role name (default: "hero")')
    argparser.add_argument(
        '--gamma',
        default=2.2,
        type=float,
        help='Gamma correction of the camera (default: 2.2)')
    argparser.add_argument(
        '--sync',
        action='store_true',
        help='Activate synchronous mode execution')
    args = argparser.parse_args()

    args.width, args.height = [int(x) for x in args.res.split('x')]

    args.fps = 60

    args.steering_wheel = IS_STEERING_WHEEL

    args.filter = CAR_MODEL

    log_level = manual_control.logging.DEBUG if args.debug else manual_control.logging.INFO
    manual_control.logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)

    manual_control.logging.info('[LOG] -> Listening to server %s:%s', args.host, args.port)

    print(__doc__)

    try:
        gameLoop(args, client=client, sim_world=sim_world)

    except KeyboardInterrupt:
        print('\n[GAMELOOP] -> Cancelled by user. Bye!')

### Execution

In [None]:
gameSetup(client=carla_client, sim_world=world)