In [3]:
import carla
import random
import time
import pygame
import numpy as np

# Global variable to store latest lane invasion info
latest_lane_invasion = {"types": [], "timestamp": 0}

def lane_invasion_callback(event):
    """Callback function to handle lane invasion events."""
    global latest_lane_invasion
    lane_types = [str(marking.type) for marking in event.crossed_lane_markings]
    latest_lane_invasion = {
        "types": lane_types,
        "timestamp": time.time()
    }
    print(f"Crossed Lane Markings: {lane_types}")

# Connect to the CARLA server
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

# Clean up existing vehicles
for actor in world.get_actors().filter('*vehicle*'):
    actor.destroy()

world_map = world.get_map()
spawn_points = world_map.get_spawn_points() 

# Get spectator and spawn vehicle
spectator = world.get_spectator()
vehicle_bp = world.get_blueprint_library().filter('*model3*')
vehicle = world.try_spawn_actor(vehicle_bp[0], spawn_points[0])
time.sleep(2)

# Attach lane invasion sensor
lane_invasion_bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
lane_invasion_sensor = world.spawn_actor(lane_invasion_bp, carla.Transform(), attach_to=vehicle)
lane_invasion_sensor.listen(lane_invasion_callback)

# Attach RGB camera for top-down view
camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', '800')
camera_bp.set_attribute('image_size_y', '600')
camera_bp.set_attribute('fov', '90')

# Position camera above vehicle (top-down view)
camera_transform = carla.Transform(
    carla.Location(x=0, y=0, z=25),  # 25 meters above
    carla.Rotation(pitch=-90)  # Looking straight down
)
camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)

# Image queue for camera
image_queue = []
def camera_callback(image):
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = np.reshape(array, (image.height, image.width, 4))
    array = array[:, :, :3]  # Remove alpha channel
    array = array[:, :, ::-1]  # Convert BGR to RGB
    image_queue.append(array)

camera.listen(camera_callback)

# Initialize pygame
pygame.init()
screen = pygame.display.set_mode((800, 600))
pygame.display.set_caption("CARLA Top-Down View - Manual Control")
clock = pygame.time.Clock()
font = pygame.font.Font(None, 36)
small_font = pygame.font.Font(None, 24)

# Control variables
throttle = 0.0
steer = 0.0
brake = 0.0
reverse = False

print("\nControls:")
print("W/S - Throttle/Reverse")
print("A/D - Steering")
print("SPACE - Brake")
print("ESC - Quit\n")

# Main control loop
running = True
while running:
    clock.tick(30)
    
    # Handle events
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
        elif event.type == pygame.KEYDOWN:
            if event.key == pygame.K_ESCAPE:
                running = False
    
    # Get key presses
    keys = pygame.key.get_pressed()
    
    # Throttle control
    if keys[pygame.K_w]:
        throttle = min(throttle + 0.1, 1.0)
        reverse = False
    elif keys[pygame.K_s]:
        if throttle > 0:
            throttle = max(throttle - 0.1, 0.0)
        else:
            reverse = True
            throttle = min(throttle + 0.1, 1.0)
    else:
        throttle = max(throttle - 0.05, 0.0)
    
    # Steering control
    if keys[pygame.K_a]:
        steer = max(steer - 0.05, -1.0)
    elif keys[pygame.K_d]:
        steer = min(steer + 0.05, 1.0)
    else:
        steer = steer * 0.9  # Smooth return to center
    
    # Brake control
    brake = 1.0 if keys[pygame.K_SPACE] else 0.0
    
    # Apply controls
    control = carla.VehicleControl()
    control.throttle = throttle
    control.steer = steer
    control.brake = brake
    control.reverse = reverse
    vehicle.apply_control(control)
    
    # Update spectator position to follow vehicle
    vehicle_transform = vehicle.get_transform()
    spectator_transform = carla.Transform(
        vehicle_transform.location + carla.Location(z=30),
        carla.Rotation(pitch=-90)
    )
    spectator.set_transform(spectator_transform)
    
    # Render camera view if available
    if image_queue:
        image = image_queue.pop(0)
        surface = pygame.surfarray.make_surface(image.swapaxes(0, 1))
        screen.blit(surface, (0, 0))
    else:
        screen.fill((0, 0, 0))
    
    # Get vehicle velocity
    velocity = vehicle.get_velocity()
    speed = 3.6 * (velocity.x**2 + velocity.y**2 + velocity.z**2)**0.5  # km/h
    
    # Draw HUD
    # Speed display
    speed_text = font.render(f"Speed: {speed:.1f} km/h", True, (255, 255, 255))
    screen.blit(speed_text, (10, 10))
    
    # Control display
    throttle_text = small_font.render(f"Throttle: {throttle:.2f}", True, (0, 255, 0))
    steer_text = small_font.render(f"Steer: {steer:.2f}", True, (255, 255, 0))
    brake_text = small_font.render(f"Brake: {brake:.2f}", True, (255, 0, 0))
    
    screen.blit(throttle_text, (10, 50))
    screen.blit(steer_text, (10, 75))
    screen.blit(brake_text, (10, 100))
    
    # Lane invasion display
    if latest_lane_invasion["types"]:
        time_since_invasion = time.time() - latest_lane_invasion["timestamp"]
        
        # Show for 5 seconds after invasion
        if time_since_invasion < 5.0:
            # Background for better visibility
            overlay = pygame.Surface((780, 100))
            overlay.set_alpha(200)
            overlay.fill((50, 50, 50))
            screen.blit(overlay, (10, 490))
            
            # Lane invasion warning
            warning_text = font.render("LANE INVASION!", True, (255, 0, 0))
            screen.blit(warning_text, (20, 500))
            
            # Display lane types
            lane_types_str = ", ".join(latest_lane_invasion["types"])
            lane_text = small_font.render(f"Types: {lane_types_str}", True, (255, 255, 255))
            screen.blit(lane_text, (20, 540))
            
            # Fade effect
            alpha = int(255 * (1 - time_since_invasion / 5.0))
            warning_text.set_alpha(alpha)
    
    # Instructions
    instructions = small_font.render("W/S: Throttle | A/D: Steer | SPACE: Brake | ESC: Quit", True, (200, 200, 200))
    screen.blit(instructions, (10, 570))
    
    pygame.display.flip()

# Cleanup
print("\nCleaning up...")
camera.stop()
camera.destroy()
lane_invasion_sensor.destroy()
vehicle.destroy()
pygame.quit()
print("Done!")


Controls:
W/S - Throttle/Reverse
A/D - Steering
SPACE - Brake
ESC - Quit

Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['SolidSolid']
Crossed Lane Markings: ['SolidSolid']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Broken']
Crossed Lane Markings: ['Solid']
Crossed Lane Markings: ['Broken']
Crossed