In [1]:
import carla
import random
import time
import numpy as np
import cv2
import matplotlib.pyplot as plt
from ultralytics import YOLO
import math

In [None]:
# Sensor Configuration Parameters
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
LIDAR_CHANNELS = 32
RADAR_RANGE = 100  # meters
YOLO_MODEL_PATH = 'yolov8n.pt'

class SensorProcessor:
    def __init__(self):
        self.yolo_model = YOLO(YOLO_MODEL_PATH)
        self.collision_detected = False
        self.lidar_data = None
        self.radar_data = None
        self.frame_number = 0
        self.simulation_running = True  # Flag to control simulation
        
        # Initialize plots for RADAR and LIDAR
        plt.ion()
        self.fig, (self.ax1, self.ax2) = plt.subplots(1, 2, figsize=(12, 5))
        self.lidar_scatter = self.ax1.scatter([], [], s=1)
        self.radar_scatter = self.ax2.scatter([], [], s=1)
        
        self.ax1.set_xlim(-50, 50)
        self.ax1.set_ylim(0, 100)
        self.ax1.set_xlabel('Azimuth (degrees)')
        self.ax1.set_ylabel('Depth (meters)')
        self.ax1.set_title('LIDAR Data Visualization')
        
        self.ax2.set_xlim(-50, 50)
        self.ax2.set_ylim(0, RADAR_RANGE)
        self.ax2.set_xlabel('Azimuth (degrees)')
        self.ax2.set_ylabel('Depth (meters)')
        self.ax2.set_title('RADAR Data Visualization')
        
    def camera_callback(self, image):
        try:
            img = np.reshape(np.frombuffer(image.raw_data, dtype=np.uint8),
                             (image.height, image.width, 4))[:, :, :3]
            results = self.yolo_model(img)
            annotated_img = results[0].plot()
            for det in results[0].boxes:
                cls = self.yolo_model.names[int(det.cls)]
                conf = float(det.conf)
                if cls == "person" and conf > 0.7:
                    self.handle_pedestrian_detection()
            cv2.imshow('Autonomous View', annotated_img)
            key = cv2.waitKey(1)
            if key == 27:  # Esc key
                self.simulation_running = False
        except Exception as e:
            print(f"Camera error: {str(e)}")

    def lidar_callback(self, point_cloud):
        try:
            points = np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4'))
            points = np.reshape(points, (-1, 4))[:, :3]
            self.lidar_data = points
            azimuth = np.degrees(np.arctan2(points[:, 1], points[:, 0]))
            depth = np.sqrt(points[:, 0]**2 + points[:, 1]**2)
            self.lidar_scatter.set_offsets(np.column_stack((azimuth, depth)))
            self.fig.canvas.flush_events()
            self.process_lidar_data()
        except Exception as e:
            print(f"LIDAR error: {str(e)}")

    def radar_callback(self, radar_data):
        try:
            points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
            points = np.reshape(points, (-1, 4))
            self.radar_data = points
            azimuth = np.degrees(points[:, 1])
            depth = points[:, 0]
            self.radar_scatter.set_offsets(np.column_stack((azimuth, depth)))
            self.fig.canvas.flush_events()
            self.process_radar_data()
        except Exception as e:
            print(f"RADAR error: {str(e)}")

    def process_lidar_data(self):
        if self.lidar_data is not None:
            points = self.lidar_data[(self.lidar_data[:, 0] > 0) & 
                                     (np.abs(self.lidar_data[:, 1]) < 5) &
                                     (self.lidar_data[:, 0] < 50)]
            if len(points) > 0:
                closest = np.min(points[:, 0])
                self.adjust_speed_based_on_distance(closest)

    def process_radar_data(self):
        if self.radar_data is not None:
            frontal_points = self.radar_data[(np.abs(np.degrees(self.radar_data[:, 1])) < 30) &
                                             (self.radar_data[:, 0] < RADAR_RANGE)]
            if len(frontal_points) > 0:
                closest = frontal_points[np.argmin(frontal_points[:, 0])]
                relative_velocity = closest[3]
                self.adjust_speed_based_on_velocity(relative_velocity)

    def handle_pedestrian_detection(self):
        print("Pedestrian detected! Emergency braking.")
        vehicle.apply_control(carla.VehicleControl(brake=1.0))

    def adjust_speed_based_on_distance(self, distance):
        if distance < 10:
            throttle = max(0.0, 1 - (1 / distance))
            brake = 0.5 if distance < 5 else 0.1
            control = carla.VehicleControl(throttle=throttle, brake=brake)
            vehicle.apply_control(control)

    def adjust_speed_based_on_velocity(self, relative_velocity):
        if relative_velocity < -3:
            vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.5))

# Initialize CARLA
client = carla.Client('localhost', 2000)
client.set_timeout(20.0)
world = client.get_world()

# Set synchronous mode
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05
world.apply_settings(settings)

# Setup Traffic Manager
traffic_manager = client.get_trafficmanager()
traffic_manager.set_synchronous_mode(True)
traffic_manager.global_percentage_speed_difference(-20)

# Spawn vehicle
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.filter('vehicle.lincoln.mkz')[0]
spawn_point = random.choice(world.get_map().get_spawn_points())
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
vehicle.set_autopilot(True, traffic_manager.get_port())

# Create sensor processor
sensor_processor = SensorProcessor()

# Setup Camera
camera_bp = blueprint_library.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', str(CAMERA_WIDTH))
camera_bp.set_attribute('image_size_y', str(CAMERA_HEIGHT))
camera_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)
camera.listen(sensor_processor.camera_callback)

# Setup LIDAR
lidar_bp = blueprint_library.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', str(LIDAR_CHANNELS))
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('points_per_second', '100000')
lidar_transform = carla.Transform(carla.Location(z=2.5))
lidar = world.spawn_actor(lidar_bp, lidar_transform, attach_to=vehicle)
lidar.listen(sensor_processor.lidar_callback)

# Setup RADAR
radar_bp = blueprint_library.find('sensor.other.radar')
radar_bp.set_attribute('horizontal_fov', '30')
radar_bp.set_attribute('vertical_fov', '30')
radar_bp.set_attribute('range', str(RADAR_RANGE))
radar_transform = carla.Transform(carla.Location(x=2.5, z=1.0))
radar = world.spawn_actor(radar_bp, radar_transform, attach_to=vehicle)
radar.listen(sensor_processor.radar_callback)

# Spectator Camera Setup
spectator = world.get_spectator()

def update_spectator():
    vehicle_transform = vehicle.get_transform()
    vehicle_location = vehicle_transform.location
    local_offset = carla.Location(x=-6, y=0, z=8)
    camera_location = vehicle_transform.transform(local_offset)
    direction = vehicle_location - camera_location
    horizontal_distance = math.sqrt(direction.x**2 + direction.y**2)
    if horizontal_distance > 0:
        yaw = math.degrees(math.atan2(direction.y, direction.x))
        pitch = -math.degrees(math.atan2(-direction.z, horizontal_distance))
    else:
        yaw = vehicle_transform.rotation.yaw
        pitch = -60 if direction.z > 0 else 60
    spectator.set_transform(carla.Transform(camera_location, carla.Rotation(pitch=pitch, yaw=yaw)))

# Main loop with Esc key to stop
try:
    while sensor_processor.simulation_running:
        world.tick()
        update_spectator()  # Update spectator camera
except KeyboardInterrupt:
    print("Simulation stopped by user")

finally:
    settings.synchronous_mode = False
    world.apply_settings(settings)
    for actor in [camera, lidar, radar, vehicle]:
        if actor and actor.is_alive:
            actor.destroy()
    cv2.destroyAllWindows()
    sensor_processor.fig.canvas.flush_events()  # Ensure final plot update
    plt.ioff()
    plt.show()  # Keeps plots open until manually closed
    print("Simulation cleaned up")