# Adaptive Cruise Control with Depth Camera in CARLA

This notebook implements an ACC system that only uses synchronous mode during the simulation phase, not during vehicle spawning.

## 1. Import Dependencies

In [1]:
import glob
import os
import sys
import time
import numpy as np
import cv2
import math
import random
import queue

try:
    sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
        sys.version_info.major,
        sys.version_info.minor,
        'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
    pass

import carla

# Define synchronous mode manager class - will only be used during PID simulation
class CarlaSyncMode(object):
    """
    Context manager to synchronize output from different sensors. Synchronous
    mode is enabled as long as we are inside this context
    """
    def __init__(self, world, *sensors, **kwargs):
        self.world = world
        self.sensors = sensors
        self.frame = None
        self.delta_seconds = 1.0 / kwargs.get('fps', 20)
        self._queues = []
        self._settings = None

    def __enter__(self):
        self._settings = self.world.get_settings()
        self.frame = self.world.apply_settings(carla.WorldSettings(
            no_rendering_mode=False,
            synchronous_mode=True,
            fixed_delta_seconds=self.delta_seconds))

        def make_queue(register_event):
            q = queue.Queue()
            register_event(q.put)
            self._queues.append(q)

        make_queue(self.world.on_tick)
        for sensor in self.sensors:
            make_queue(sensor.listen)
        return self

    def tick(self, timeout):
        self.frame = self.world.tick()
        data = [self._retrieve_data(q, timeout) for q in self._queues]
        assert all(x.frame == self.frame for x in data)
        return data

    def __exit__(self, *args, **kwargs):
        self.world.apply_settings(self._settings)

    def _retrieve_data(self, sensor_queue, timeout):
        while True:
            data = sensor_queue.get(timeout=timeout)
            if data.frame == self.frame:
                return data

print("Dependencies imported successfully!")

Dependencies imported successfully!


## 2. Connect to CARLA

In [2]:
# Initialize lists to track actors and sensors
actor_list = []
sensor_list = []

# Connect to CARLA server (using explicit IPv4 address)
client = carla.Client('127.0.0.1', 2000)
client.set_timeout(20.0)  # Increase timeout for better stability

# Check client and server versions
client_version = client.get_client_version()
server_version = client.get_server_version()
print(f"Connected to CARLA: Client version {client_version}, Server version {server_version}")

# Get world
world = client.get_world()

# Get blueprint library
blueprints = world.get_blueprint_library()

# Store original settings to restore later
original_settings = world.get_settings()

print("Connected to CARLA successfully!")

Connected to CARLA: Client version 0.9.15-228-geeb507e58, Server version 0.9.15-228-geeb507e58
Connected to CARLA successfully!


## 3. Set Up Specific Spawn Points

In [3]:
# Get spawn points
spawn_points = world.get_map().get_spawn_points()
if not spawn_points:
    print("No spawn points available.")
else:
    # Set up specific spawn points
    ego_spawn_point = spawn_points[20]
    lead_spawn_point = carla.Transform(
        ego_spawn_point.location - carla.Location(x=-15),
        ego_spawn_point.rotation
    )
    print(f"Spawn points set up successfully!")
    print(f"Ego vehicle spawn location: {ego_spawn_point.location}")
    print(f"Lead vehicle spawn location: {lead_spawn_point.location}")

Spawn points set up successfully!
Ego vehicle spawn location: Location(x=48.546078, y=140.975540, z=0.600001)
Lead vehicle spawn location: Location(x=63.546078, y=140.975540, z=0.600001)


## 4. Transfer to That Location (Set Spectator View)

In [4]:
# Move spectator to view the spawn points
spectator = world.get_spectator()
spectator_transform = carla.Transform(
    ego_spawn_point.location + carla.Location(z=50, x=-15), 
    carla.Rotation(pitch=-90)
)
spectator.set_transform(spectator_transform)
print("Spectator view set successfully!")

Spectator view set successfully!


## 5. Spawn Vehicles

In [5]:
# Important: Use asynchronous mode for vehicle spawning (synchronous mode crashes)
settings = world.get_settings()
settings.synchronous_mode = False  # Make sure we're in asynchronous mode
world.apply_settings(settings)

# Create ego vehicle
ego_blueprint = blueprints.filter('vehicle.dodge.charger_2020')[0]
ego_vehicle = world.spawn_actor(ego_blueprint, ego_spawn_point)
ego_vehicle.set_autopilot(False)
actor_list.append(ego_vehicle)
print(f"Ego vehicle spawned: {ego_vehicle.type_id}")

# Create lead vehicle
lead_blueprint = blueprints.filter('vehicle.jeep.wrangler_rubicon')[0]
lead_vehicle = world.spawn_actor(lead_blueprint, lead_spawn_point)
actor_list.append(lead_vehicle)
print(f"Lead vehicle spawned: {lead_vehicle.type_id}")

# Wait a moment for vehicle spawning to complete
time.sleep(1.0)

print("Vehicles spawned successfully!")

Ego vehicle spawned: vehicle.dodge.charger_2020
Lead vehicle spawned: vehicle.jeep.wrangler_rubicon
Vehicles spawned successfully!


In [None]:
ego_vehicle.destroy()
lead_vehicle.destroy()

## 6. Setup Camera and Distance Measurement Logic

In [6]:
def process_depth_image(depth_image, prev_lead_x=None):
    """
    Process the depth image from CARLA's depth camera to extract distance information
    with improved lead vehicle tracking during turns
    
    Args:
        depth_image: Image from CARLA depth camera
        prev_lead_x: Previous x-position of lead vehicle in image (for tracking)
        
    Returns:
        processed_image, distance_to_object, lead_vehicle_x_position
    """
    # Convert depth image to numpy array
    depth_array = np.frombuffer(depth_image.raw_data, dtype=np.dtype("uint8"))
    depth_array = np.reshape(depth_array, (depth_image.height, depth_image.width, 4))
    
    # Extract RGB channels (first 3 channels)
    depth_array = depth_array[:, :, :3]
    
    # According to CARLA documentation:
    # normalized = (R + G * 256 + B * 256 * 256) / (256 * 256 * 256 - 1)
    # in_meters = 1000 * normalized
    
    # Extract R, G, B channels
    r = depth_array[:, :, 2].astype(np.float32)  # Red channel
    g = depth_array[:, :, 1].astype(np.float32)  # Green channel
    b = depth_array[:, :, 0].astype(np.float32)  # Blue channel
    
    # Calculate normalized depth
    normalized = (r + g * 256 + b * 256 * 256) / (256 * 256 * 256 - 1)
    
    # Convert to distance in meters
    depth_meters = 1000 * normalized
    
    # Create grayscale visualization (closer=black, farther=white)
    depth_visualization = (normalized * 255).astype(np.uint8)
    
    # Convert to 3-channel grayscale for display compatibility
    depth_visualization = cv2.cvtColor(depth_visualization, cv2.COLOR_GRAY2BGR)
    
    # Setup basic ROI dimensions
    center_h = depth_image.height // 2
    center_w = depth_image.width // 2
    roi_h = 100  # Height of ROI
    roi_w = 200  # Width of ROI
    
    # Dynamic ROI adjustment based on previous lead vehicle position
    if prev_lead_x is not None and prev_lead_x > 1:
        x_shift = int((prev_lead_x - center_w) * 0.3)  # Smooth tracking with 0.3 factor
        roi_w_start = max(0, center_w - roi_w//2 + x_shift)
        roi_w_end = min(depth_image.width, center_w + roi_w//2 + x_shift)
    else:
        roi_w_start = center_w - roi_w//2
        roi_w_end = center_w + roi_w//2

    # Extract ROI from depth image
    roi = depth_meters[
        center_h - roi_h//2:center_h + roi_h//2,
        roi_w_start:roi_w_end
    ]
    
    # Draw ROI rectangle on visualization
    cv2.rectangle(
        depth_visualization, 
        (roi_w_start, center_h - roi_h//2), 
        (roi_w_end, center_h + roi_h//2), 
        (0, 255, 0), 
        2
    )
    
    # Filter unrealistic distances (0 and > 100m)
    roi_filtered = roi.copy()
    roi_filtered[roi_filtered < 0.1] = 100  # Set very close/invalid pixels to far distance
    roi_filtered[roi_filtered > 100] = 100  # Cap maximum distance
    
    # Find minimum distance to object within filtered ROI
    min_distance = np.min(roi_filtered)
    if min_distance >= 100:
        min_distance = 100  # No object detected within reasonable range
    
    # Find position of closest object for tracking in next frame
    lead_x = None
    if roi_filtered.size > 0:
        min_idx = np.unravel_index(np.argmin(roi_filtered), roi_filtered.shape)
        lead_x = roi_w_start + min_idx[1]  # Horizontal position of lead vehicle
        
        # Draw a marker at the closest point
        marker_y = center_h - roi_h//2 + min_idx[0]
        cv2.circle(depth_visualization, (lead_x, marker_y), 5, (0, 0, 255), -1)
    
    return depth_visualization, min_distance, lead_x


# Add RGB camera to ego vehicle
camera_bp = blueprints.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', '800')
camera_bp.set_attribute('image_size_y', '600')
camera_bp.set_attribute('fov', '110')
camera_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
camera_rgb = world.spawn_actor(camera_bp, camera_transform, attach_to=ego_vehicle)
sensor_list.append(camera_rgb)

# Add depth camera to ego vehicle for distance measurement
depth_camera_bp = blueprints.find('sensor.camera.depth')
depth_camera_bp.set_attribute('image_size_x', '800')
depth_camera_bp.set_attribute('image_size_y', '600')
depth_camera_bp.set_attribute('fov', '110')
depth_camera = world.spawn_actor(depth_camera_bp, camera_transform, attach_to=ego_vehicle)
sensor_list.append(depth_camera)

# Variables to store camera data
image_w = int(camera_bp.get_attribute('image_size_x').as_int())
image_h = int(camera_bp.get_attribute('image_size_y').as_int())

# Wait for a moment to make sure sensors are properly initialized
time.sleep(0.5)

print("Cameras and distance measurement set up successfully!")

Cameras and distance measurement set up successfully!


## 7. Set Up PID Control for Ego Vehicle

In [7]:
# PID Controller for vehicle control
class PIDController:
    def __init__(self, Kp, Ki, Kd, dt):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.dt = dt
        self.previous_error = 0
        self.integral = 0
        self.integral_max = 10.0  # Anti-windup limit
        
    def step(self, error):
        # Proportional term
        P = self.Kp * error
        
        # Integral term with anti-windup
        self.integral += error * self.dt
        self.integral = max(-self.integral_max, min(self.integral_max, self.integral))  # Clamp integral
        I = self.Ki * self.integral
        
        # Derivative term
        derivative = (error - self.previous_error) / self.dt
        D = self.Kd * derivative
        
        # Total output
        output = P + I + D
        
        # Save error for next iteration
        self.previous_error = error
        
        return output
    
    def reset(self):
        self.previous_error = 0
        self.integral = 0

# Combined PID controller for vehicle control
class VehiclePIDController:
    def __init__(self, args_lateral, args_longitudinal, max_throttle=0.75, max_brake=0.3, max_steering=0.8):
        self.max_throttle = max_throttle
        self.max_brake = max_brake
        self.max_steering = max_steering
        
        self.lon_controller = PIDController(
            args_longitudinal['Kp'],
            args_longitudinal['Ki'],
            args_longitudinal['Kd'],
            args_longitudinal['dt']
        )
        
        self.lat_controller = PIDController(
            args_lateral['Kp'],
            args_lateral['Ki'],
            args_lateral['Kd'],
            args_lateral['dt']
        )
    
    def run_step(self, target_speed, current_speed, waypoint, vehicle_transform, lead_vehicle=None):
        """
        Execute one step of control with improved lead vehicle tracking during turns
        
        lead_vehicle: Optional lead vehicle actor to follow during turns
        """
        # Longitudinal control - adjust speed
        acceleration = self.lon_controller.step(target_speed - current_speed)
        
        if acceleration >= 0:
            throttle = min(acceleration, self.max_throttle)
            brake = 0
        else:
            throttle = 0
            brake = min(abs(acceleration), self.max_brake)
        
        # Lateral control - adjust steering
        # Calculate steering based on waypoint or lead vehicle position
        v_begin = vehicle_transform.location
        v_end = v_begin + carla.Location(x=math.cos(math.radians(vehicle_transform.rotation.yaw)),
                                        y=math.sin(math.radians(vehicle_transform.rotation.yaw)))
        
        v_vec = np.array([v_end.x - v_begin.x, v_end.y - v_begin.y, 0.0])
        
        # Use lead vehicle position for navigation if available and within range
        if lead_vehicle is not None:
            lead_loc = lead_vehicle.get_location()
            # Calculate distance to lead vehicle
            dist_to_lead = math.sqrt((lead_loc.x - v_begin.x)**2 + (lead_loc.y - v_begin.y)**2)
            
            if dist_to_lead < 50.0:  # Only use lead vehicle for navigation when within 50m
                # Add prediction to lead vehicle position based on its velocity
                lead_vel = lead_vehicle.get_velocity()
                prediction_time = min(dist_to_lead * 0.1, 1.0)  # Adjust prediction based on distance
                
                predicted_x = lead_loc.x + lead_vel.x * prediction_time
                predicted_y = lead_loc.y + lead_vel.y * prediction_time
                
                # Use predicted position for steering 
                w_vec = np.array([predicted_x - v_begin.x, predicted_y - v_begin.y, 0.0])
            else:
                # Default to waypoint if lead vehicle is too far
                w_vec = np.array([waypoint.transform.location.x - v_begin.x,
                                 waypoint.transform.location.y - v_begin.y,
                                 0.0])
        else:
            # Default to waypoint if no lead vehicle is provided
            w_vec = np.array([waypoint.transform.location.x - v_begin.x,
                             waypoint.transform.location.y - v_begin.y,
                             0.0])
        
        # Normalize vectors
        v_vec_norm = np.linalg.norm(v_vec)
        w_vec_norm = np.linalg.norm(w_vec)
        
        if v_vec_norm > 0 and w_vec_norm > 0:
            v_vec = v_vec / v_vec_norm
            w_vec = w_vec / w_vec_norm
            
            # Calculate the angle
            dot = np.clip(np.dot(v_vec, w_vec), -1.0, 1.0)
            angle = math.acos(dot)
            
            # Determine sign of the angle
            cross = np.cross(v_vec, w_vec)
            if cross[2] < 0:
                angle = -angle
        else:
            angle = 0.0
        
        # Apply PID to the calculated angle
        steering = self.lat_controller.step(angle)
        steering = max(-self.max_steering, min(self.max_steering, steering))
        
        control = carla.VehicleControl()
        control.throttle = throttle
        control.brake = brake
        control.steer = steering
        
        return control
    
    def reset(self):
        self.lon_controller.reset()
        self.lat_controller.reset()

# Set up PID controller parameters with improved values for turns
args_lateral = {
    'Kp':1,      # Increased for better turn responsiveness
    'Ki': 0.01,     # Reduced to prevent overshoot
    'Kd': 0.15,     # Increased for better damping
    'dt': 0.02      # Faster update rate
}

args_longitudinal = {
    'Kp': 0.5,
    'Ki': 0.1,
    'Kd': 0.005,
    'dt': 0.05
}

vehicle_controller = VehiclePIDController(
    args_lateral,
    args_longitudinal,
    max_throttle=0.75,
    max_brake=0.3,
    max_steering=0.8
)

# Set target distance for ACC
target_distance = 10.0  # meters

print("PID controller set up for ego vehicle!")

PID controller set up for ego vehicle!


## 8. Implement Autopilot on Lead Vehicle

In [8]:
# Make sure we're still in asynchronous mode for setting up autopilot
if world.get_settings().synchronous_mode:
    settings = world.get_settings()
    settings.synchronous_mode = False
    world.apply_settings(settings)

# Set lead vehicle to autopilot mode
lead_vehicle.set_autopilot(True)

# Adjust traffic manager parameters for the lead vehicle
traffic_manager = client.get_trafficmanager()
traffic_manager.global_percentage_speed_difference(30.0)  # Reduce speed by 30%
traffic_manager.vehicle_percentage_speed_difference(lead_vehicle, 30.0)
traffic_manager.distance_to_leading_vehicle(lead_vehicle, 2.0)

# Wait for a moment to ensure autopilot is properly activated
time.sleep(0.5)

print("Autopilot activated for lead vehicle!")

Autopilot activated for lead vehicle!


## 9. Implement PID Control on Ego Vehicle and Display Results with OpenCV

Now we'll enable synchronous mode ONLY for the PID simulation part

In [9]:
# Initialize Video Writer
video_filename = "acc.avi"
frame_width = 640
frame_height = 360
fps = 30  # Frames per second

# Define VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # Codec for .avi format
video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (frame_width, frame_height))

# Check if VideoWriter opened successfully
if not video_writer.isOpened():
    print("Error: VideoWriter failed to open.")
else:
    print(f"Recording started: Saving video to {video_filename}")

try:
    # Create a window for the display
    cv2.namedWindow('Ego Vehicle Camera with Depth', cv2.WINDOW_AUTOSIZE)

    # Initialize variables for lead vehicle tracking
    prev_lead_x = None

    # Enter synchronous mode for the simulation
    with CarlaSyncMode(world, camera_rgb, depth_camera, fps=20) as sync_mode:
        print("Entered synchronous mode for PID simulation")

        # Main control loop
        iteration_count = 0
        max_iterations = 2000  # Run for 2000 iterations max

        print("Starting adaptive cruise control. Press 'q' or 'ESC' to quit.")

        # Main loop
        while iteration_count < max_iterations:
            iteration_count += 1

            # Get synchronized data
            snapshot, rgb_data, depth_data = sync_mode.tick(timeout=2.0)

            # Process RGB image
            rgb_array = np.frombuffer(rgb_data.raw_data, dtype=np.dtype("uint8"))
            rgb_array = np.reshape(rgb_array, (image_h, image_w, 4))
            rgb_array = rgb_array[:, :, :3]  # Remove alpha channel
            rgb_image = rgb_array

            # Process depth image with dynamic ROI (pass previous lead position)
            depth_visualization, current_distance, lead_x = process_depth_image(depth_data, prev_lead_x)
            prev_lead_x = lead_x  # Update previous lead position for next iteration

            # Get current speed of ego vehicle (m/s)
            current_speed = ego_vehicle.get_velocity().length()

            # Get lead vehicle speed (m/s)
            lead_speed = lead_vehicle.get_velocity().length()

            # Calculate target speed based on distance error
            distance_error = current_distance - target_distance

            # Adaptive cruise control logic
            if distance_error > 0:  # Too far, need to catch up
                target_speed = min(lead_speed + 0.5 * distance_error, 30.0)  # Cap max speed
            else:  # Too close, need to slow down
                target_speed = max(lead_speed + 0.5 * distance_error, 0.0)  # Don't go below 0

            # Get base waypoint for ego vehicle
            ego_waypoint = world.get_map().get_waypoint(ego_vehicle.get_location())

            # Get lead vehicle location for better path planning
            lead_transform = lead_vehicle.get_transform()
            lead_location = lead_transform.location

            # Decide which waypoint to follow based on distance
            straight_distance = math.sqrt((lead_location.x - ego_vehicle.get_location().x)**2 + 
                                         (lead_location.y - ego_vehicle.get_location().y)**2)

            if straight_distance < 50.0:  # Close enough to follow lead vehicle
                predicted_location = carla.Location(
                    lead_location.x + lead_vehicle.get_velocity().x * 0.5,
                    lead_location.y + lead_vehicle.get_velocity().y * 0.5,
                    lead_location.z
                )
                target_waypoint = world.get_map().get_waypoint(predicted_location)
            else:
                target_waypoint = ego_waypoint.next(5.0)[0]  # Look 5 meters ahead

            # Get control command from PID controller
            control = vehicle_controller.run_step(
                target_speed, 
                current_speed,
                target_waypoint,
                ego_vehicle.get_transform(),
                lead_vehicle
            )

            # Apply control to ego vehicle
            ego_vehicle.apply_control(control)

            # Prepare display image
            if rgb_image is not None and depth_visualization is not None:
                display_img = cv2.resize(rgb_image, (frame_width, frame_height))

                # Resize depth visualization to a smaller size (240x180)
                depth_small = cv2.resize(depth_visualization, (240, 180))

                # Position depth visualization at bottom right corner
                h, w = depth_small.shape[:2]
                display_img[frame_height - h : frame_height, frame_width - w : frame_width] = depth_small

                # Add HUD information
                font = cv2.FONT_HERSHEY_SIMPLEX
                cv2.putText(display_img, f'Distance: {current_distance:.2f}m', (10, 30), font, 0.7, (0, 255, 0), 2)
                cv2.putText(display_img, f'Speed: {current_speed * 3.6:.2f} km/h', (10, 60), font, 0.7, (0, 255, 0), 2)
                cv2.putText(display_img, f'Target Speed: {target_speed * 3.6:.2f} km/h', (10, 90), font, 0.7, (0, 255, 0), 2)
                cv2.putText(display_img, f'Iteration: {iteration_count}/{max_iterations}', (10, 120), font, 0.7, (0, 255, 0), 2)

                # Show the image
                cv2.imshow('Ego Vehicle Camera with Depth', display_img)

                # Write frame to video file
                video_writer.write(display_img)

            # Check for key presses (quit if 'q' or ESC is pressed)
            key = cv2.waitKey(1) & 0xFF
            if key in (27, ord('q')):
                break

        print(f"ACC simulation completed after {iteration_count} iterations!")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    print("Cleaning up...")

    # Ensure VideoWriter is released
    if video_writer.isOpened():
        video_writer.release()
        print(f"Video saved successfully as {video_filename}")

    # Close any remaining OpenCV windows
    cv2.destroyAllWindows()

    # Make sure we're in asynchronous mode for cleanup
    settings = world.get_settings()
    if settings.synchronous_mode:
        settings.synchronous_mode = False
        world.apply_settings(settings)

    # Turn off autopilot for lead vehicle before destroying
    if lead_vehicle.is_alive:
        try:
            lead_vehicle.set_autopilot(False)
        except:
            pass

    # Destroy all sensors
    for sensor in sensor_list:
        if sensor.is_alive:
            sensor.destroy()

    # Destroy all actors
    for actor in actor_list:
        if actor.is_alive:
            actor.destroy()

    # Restore original settings
    world.apply_settings(original_settings)

    print("Cleanup complete!")


Recording started: Saving video to acc.avi
Entered synchronous mode for PID simulation
Starting adaptive cruise control. Press 'q' or 'ESC' to quit.
ACC simulation completed after 2000 iterations!
Cleaning up...
Video saved successfully as acc.avi
Cleanup complete!


## 10. Cleanup Resources

In [None]:
# Cleanup function
def cleanup():
    print("Cleaning up...")
    
    # Make sure we're in asynchronous mode for cleanup
    settings = world.get_settings()
    if settings.synchronous_mode:
        settings.synchronous_mode = False
        world.apply_settings(settings)
    
    # Close any remaining OpenCV windows
    cv2.destroyAllWindows()
    
    # Turn off autopilot for lead vehicle before destroying
    if lead_vehicle.is_alive:
        try:
            lead_vehicle.set_autopilot(False)
        except:
            pass
            
    # Destroy all sensors
    for sensor in sensor_list:
        if sensor.is_alive:
            sensor.destroy()
    
    # Destroy all actors
    for actor in actor_list:
        if actor.is_alive:
            actor.destroy()
    
    print("Cleanup complete!")

# Execute cleanup
cleanup()