Problem Statement: End-to-End Autonomous Driving Pipeline
Context
The advancement of autonomous vehicles promises to revolutionize transportation by enhancing safety, efficiency, and accessibility. However, developing a reliable autonomous driving system requires integrating complex subsystems for perception, planning, and control, all while ensuring real-time performance and robustness in dynamic environments.
Problem
The primary challenge is to design and implement an end-to-end pipeline for autonomous driving that enables a vehicle to navigate safely from a starting point to a destination in a simulated urban environment. The system must:

Accurately perceive the environment using sensor data (e.g., camera and LiDAR) to detect objects and obstacles.
Plan a safe and efficient path, accounting for static and dynamic obstacles while adhering to traffic rules.
Execute precise vehicle control to follow the planned path, maintaining stability and passenger comfort.
Operate in real-time with minimal latency to respond to changing conditions.

Objectives

Develop a modular pipeline that integrates sensor data acquisition, perception, path planning, and vehicle control.
Ensure the system can detect and classify objects (e.g., vehicles, pedestrians) with high accuracy using camera-based object detection.
Identify obstacles in 3D space using LiDAR data to enhance environmental understanding.
Generate a collision-free path that aligns with a global route and adapts to local obstacles.
Implement a control mechanism to smoothly execute the planned path with appropriate speed and steering.
Validate the pipeline in a simulated environment (e.g., CARLA) to demonstrate functionality and robustness.

Constraints

The pipeline must process sensor data and generate control commands within a 100ms cycle to ensure real-time performance.
The system should operate within the computational limits of a standard GPU-enabled platform (e.g., NVIDIA RTX 3080).
The solution must prioritize safety by avoiding collisions and adhering to speed limits (e.g., 30 km/h in urban settings).
The pipeline should be developed using open-source tools and libraries (e.g., Python, OpenCV, YOLOv5, Open3D) to ensure reproducibility.

Success Criteria

The vehicle successfully navigates a predefined route in a simulated urban environment without collisions.
Object detection achieves at least 90% precision and recall for critical classes (e.g., vehicles, pedestrians).
The system maintains a maximum deviation of 0.5 meters from the planned path during navigation.
Control commands ensure smooth operation with acceleration/deceleration within ±2 m/s² for passenger comfort.
The pipeline operates reliably for at least 10 minutes of continuous driving in varied simulated conditions (e.g., traffic, obstacles).

Scope
This problem focuses on a simulated environment to demonstrate core autonomous driving capabilities. Real-world deployment considerations, such as hardware redundancy, regulatory compliance, and extreme weather handling, are out of scope but noted for future extensions.

import numpy as np
import cv2
import open3d as o3d
from typing import List, Tuple
import carla
import torch
from yolov5 import YOLOv5  # Assuming YOLOv5 for object detection
import time


In [None]:

# Configuration
class Config:
    SENSOR_TICK = 0.1  # Sensor update frequency (seconds)
    VEHICLE_SPEED_LIMIT = 30.0  # km/h
    LIDAR_RANGE = 50.0  # meters
    CAMERA_FOV = 90.0  # degrees
    WAYPOINT_SPACING = 2.0  # meters

In [None]:
# Sensor Data Acquisition
class SensorSuite:
    def __init__(self, vehicle: carla.Vehicle, world: carla.World):
        self.vehicle = vehicle
        self.world = world
        self.camera = None
        self.lidar = None
        self.setup_sensors()

In [None]:
def setup_sensors(self):
        # Camera setup
        camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb')
        camera_bp.set_attribute('image_size_x', '1920')
        camera_bp.set_attribute('image_size_y', '1080')
        camera_bp.set_attribute('fov', str(Config.CAMERA_FOV))
        camera_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
        self.camera = self.world.spawn_actor(camera_bp, camera_transform, attach_to=self.vehicle)
        self.camera.listen(lambda image: self.process_camera(image))

        # LiDAR setup
        lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast')
        lidar_bp.set_attribute('range', str(Config.LIDAR_RANGE))
        lidar_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
        self.lidar = self.world.spawn_actor(lidar_bp, lidar_transform, attach_to=self.vehicle)
        self.lidar.listen(lambda point_cloud: self.process_lidar(point_cloud))

In [None]:
def process_camera(self, image):
        # Convert CARLA image to OpenCV format
        array = np.frombuffer(image.raw_data, dtype=np.uint8)
        array = array.reshape((image.height, image.width, 4))
        self.latest_camera = cv2.cvtColor(array, cv2.COLOR_BGRA2BGR)

In [None]:
def process_lidar(self, point_cloud):
        # Convert CARLA point cloud to Open3D format
        points = np.array([[p.x, p.y, p.z] for p in point_cloud])
        self.latest_lidar = o3d.geometry.PointCloud()
        self.latest_lidar.points = o3d.utility.Vector3dVector(points)

In [None]:
# Perception Module
class Perception:
    def __init__(self):
        # Initialize YOLOv5 model for object detection
        self.model = YOLOv5('yolov5s.pt', device='cuda' if torch.cuda.is_available() else 'cpu')
        self.objects = []

    def detect_objects(self, image: np.ndarray) -> List[dict]:
        # Run YOLOv5 inference
        results = self.model.predict(image)
        self.objects = []
        for det in results.xyxy[0]:  # [x1, y1, x2, y2, conf, cls]
            x1, y1, x2, y2, conf, cls = det
            if conf > 0.5:  # Confidence threshold
                self.objects.append({
                    'class': results.names[int(cls)],
                    'bbox': [x1, y1, x2, y2],
                    'confidence': conf
                })
        return self.objects

In [None]:
def process_lidar(self, point_cloud: o3d.geometry.PointCloud) -> List[Tuple[float, float, float]]:
        # Cluster points to detect obstacles
        labels = np.array(point_cloud.cluster_dbscan(eps=0.5, min_points=10))
        obstacles = []
        for label in np.unique(labels):
            if label != -1:  # Ignore noise
                cluster_points = np.asarray(point_cloud.points)[labels == label]
                centroid = np.mean(cluster_points, axis=0)
                obstacles.append(tuple(centroid))
        return obstacles

In [None]:
# Path Planning
class PathPlanner:
    def __init__(self, world: carla.World):
        self.world = world
        self.map = world.get_map()
        self.global_path = []

    def generate_global_path(self, start: carla.Location, end: carla.Location):
        # Use CARLA's global planner
        start_waypoint = self.map.get_waypoint(start)
        end_waypoint = self.map.get_waypoint(end)
        self.global_path = start_waypoint.get_route_to(end_waypoint, spacing=Config.WAYPOINT_SPACING)
        return self.global_path

    def local_path(self, current_pose: carla.Transform, obstacles: List[Tuple[float, float, float]]) -> List[carla.Waypoint]:
        # Simple local planner: Follow global path, avoid obstacles
        local_path = []
        for waypoint in self.global_path[:10]:  # Look ahead 10 waypoints
            safe = True
            for obs in obstacles:
                dist = np.sqrt((waypoint.transform.location.x - obs[0])**2 + (waypoint.transform.location.y - obs[1])**2)
                if dist < 2.0:  # Obstacle too close
                    safe = False
                    break
            if safe:
                local_path.append(waypoint)
        return local_path

In [None]:
# Vehicle Control
class Controller:
    def __init__(self, vehicle: carla.Vehicle):
        self.vehicle = vehicle
        self.target_speed = Config.VEHICLE_SPEED_LIMIT / 3.6  # Convert km/h to m/s

    def compute_control(self, current_pose: carla.Transform, target_waypoint: carla.Waypoint) -> carla.VehicleControl:
        # Simple PID controller for throttle and steering
        control = carla.VehicleControl()
        
        # Steering
        target_x, target_y = target_waypoint.transform.location.x, target_waypoint.transform.location.y
        current_x, current_y = current_pose.location.x, current_pose.location.y
        angle = np.arctan2(target_y - current_y, target_x - current_x)
        current_yaw = np.deg2rad(current_pose.rotation.yaw)
        steering = np.clip(angle - current_yaw, -1.0, 1.0)
        control.steer = steering

        # Throttle
        current_speed = np.sqrt(self.vehicle.get_velocity().x**2 + self.vehicle.get_velocity().y**2)
        throttle = 0.5 * (self.target_speed - current_speed)  # Simple proportional control
        control.throttle = np.clip(throttle, 0.0, 1.0)
        control.brake = 0.0 if throttle > 0 else 0.5

        return control

In [None]:
# Main Pipeline
class AutonomousDrivingPipeline:
    def __init__(self, host='localhost', port=2000):
        self.client = carla.Client(host, port)
        self.client.set_timeout(10.0)
        self.world = self.client.get_world()
        self.vehicle = None
        self.sensors = None
        self.perception = Perception()
        self.planner = PathPlanner(self.world)
        self.controller = None

    def initialize(self):
        # Spawn vehicle
        blueprint_library = self.world.get_blueprint_library()
        vehicle_bp = blueprint_library.filter('model3')[0]
        spawn_point = self.world.get_map().get_spawn_points()[0]
        self.vehicle = self.world.spawn_actor(vehicle_bp, spawn_point)
        self.sensors = SensorSuite(self.vehicle, self.world)
        self.controller = Controller(self.vehicle)

        # Set destination and generate global path
        destination = carla.Location(x=100, y=100, z=0)  # Example destination
        self.planner.generate_global_path(self.vehicle.get_location(), destination)

    def run_step(self):
        # Perception
        if hasattr(self.sensors, 'latest_camera'):
            objects = self.perception.detect_objects(self.sensors.latest_camera)
        else:
            objects = []
        if hasattr(self.sensors, 'latest_lidar'):
            obstacles = self.perception.process_lidar(self.sensors.latest_lidar)
        else:
            obstacles = []

        # Planning
        local_path = self.planner.local_path(self.vehicle.get_transform(), obstacles)
        if not local_path:
            return  # Stop if no safe path

        # Control
        control = self.controller.compute_control(self.vehicle.get_transform(), local_path[0])
        self.vehicle.apply_control(control)

    def run(self):
        self.initialize()
        try:
            while True:
                self.run_step()
                time.sleep(Config.SENSOR_TICK)
        finally:
            # Cleanup
            if self.sensors.camera:
                self.sensors.camera.destroy()
            if self.sensors.lidar:
                self.sensors.lidar.destroy()
            if self.vehicle:
                self.vehicle.destroy()

if __name__ == '__main__':
    pipeline = AutonomousDrivingPipeline()
    pipeline.run()