In [1]:
import carla
import math
import random
import time
import numpy as np
import cv2
import subprocess
import threading
import os
from pynput import keyboard
import tensorflow as tf
from tensorflow.keras.models import load_model

# Define output directories (if needed for video or logging)
output_dir = r"C:\Users\okeiy\OneDrive - University of Salford\Documents\Sch Notes\Dissertation\Codes\Output"
os.makedirs(output_dir, exist_ok=True)

# Connect to CARLA and get the world
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()
bp_lib = world.get_blueprint_library()
spawn_points = world.get_map().get_spawn_points()

# Spawn the ego vehicle
vehicle_bp = bp_lib.find('vehicle.dodge.charger')
vehicle = world.try_spawn_actor(vehicle_bp, random.choice(spawn_points))

# Set the spectator view (optional)
spectator = world.get_spectator()
transform = carla.Transform(vehicle.get_transform().transform(carla.Location(x=-4, z=2.5)),
                             vehicle.get_transform().rotation)
spectator.set_transform(transform)

# Start traffic simulation processes (traffic lights and additional traffic)
traffic_light_process = subprocess.Popen(['python', 'traffic_light_controller.py'])
traffic_process = subprocess.Popen(['python', 'generate_traffic.py', '--number-of-vehicles', '30', '--number-of-walkers', '50'])
time.sleep(5)

# Set up traffic manager (if needed)
traffic_manager = client.get_trafficmanager()

# Attach an RGB camera to the vehicle
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_init_trans = carla.Transform(carla.Location(z=1.6, x=0.1))
camera = world.spawn_actor(camera_bp, camera_init_trans, attach_to=vehicle)
time.sleep(0.2)
spectator.set_transform(camera.get_transform())

# Global dictionaries for sensor data and control
image_w = camera_bp.get_attribute('image_size_x').as_int()
image_h = camera_bp.get_attribute('image_size_y').as_int()
camera_data = {'image': np.zeros((image_h, image_w, 4), dtype=np.uint8)}
# We'll use lane detection only from the camera, so LiDAR is not used in this example.
current_control = carla.VehicleControl()

# Disable CARLA's built-in autopilot
vehicle.set_autopilot(False)

In [2]:
# Camera callback: update camera_data
def camera_callback(image, data_dict):
    data_dict['image'] = np.copy(image.raw_data).reshape((image.height, image.width, 4)).astype(np.uint8)

def start_camera():
    camera.listen(lambda image: camera_callback(image, camera_data))

# Start the camera listener in its own thread
camera_thread = threading.Thread(target=start_camera, daemon=True)
camera_thread.start()

# (Optional) Set up a keyboard listener for quitting, etc.
key_states = {'q': False}
def on_press(key):
    try:
        if key.char in key_states:
            key_states[key.char] = True
    except AttributeError:
        pass

def on_release(key):
    try:
        if key.char in key_states:
            key_states[key.char] = False
    except AttributeError:
        pass

keyboard_listener = keyboard.Listener(on_press=on_press, on_release=on_release)
keyboard_listener.start()

In [3]:
import cv2
import numpy as np

def detect_lanes(image):
    """
    Given an RGB image, apply edge detection and Hough Transform to detect lane lines.
    Returns the detected lines (if any).
    """
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)
    
    # Define a region-of-interest mask (focus on lower half of the image)
    height, width = edges.shape
    mask = np.zeros_like(edges)
    polygon = np.array([[
        (0, height),
        (width, height),
        (width, int(height * 0.6)),
        (0, int(height * 0.6))
    ]], np.int32)
    cv2.fillPoly(mask, polygon, 255)
    masked_edges = cv2.bitwise_and(edges, mask)
    
    # Use Hough Transform to detect lines
    lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, threshold=50, minLineLength=50, maxLineGap=150)
    return lines

def compute_lane_center_error(lines, image):
    """
    Given detected lines and the original image, computes a simple lane center error.
    This function divides lines into left and right, averages their x-coordinates at the bottom,
    and computes the error relative to the image center.
    Returns a normalized error in [-1, 1] (negative means lane center is to the right).
    """
    if lines is None:
        return 0.0
    left_lines, right_lines = [], []
    for line in lines:
        for x1, y1, x2, y2 in line:
            # Compute slope (avoid division by zero)
            slope = (y2 - y1) / (x2 - x1 + 1e-6)
            if slope < -0.5:
                left_lines.append((x1, y1, x2, y2))
            elif slope > 0.5:
                right_lines.append((x1, y1, x2, y2))
    
    def average_line(lines):
        if len(lines) == 0:
            return None
        x_coords = []
        for x1, y1, x2, y2 in lines:
            x_coords.append(x1)
            x_coords.append(x2)
        return np.mean(x_coords)
    
    left_avg = average_line(left_lines)
    right_avg = average_line(right_lines)
    
    if left_avg is None or right_avg is None:
        return 0.0  # Cannot determine lane center reliably
    
    lane_center = (left_avg + right_avg) / 2.0
    image_center = image.shape[1] / 2.0
    error = (image_center - lane_center) / image_center  # normalized error
    return error

def show_camera_with_lanes():
    cv2.namedWindow('RGB Camera with Lanes', cv2.WINDOW_AUTOSIZE)
    while True:
        # Ensure we have a valid image
        if camera_data['image'] is None or np.sum(camera_data['image']) == 0:
            time.sleep(0.01)
            continue
        # Convert the BGRA image to RGB
        rgb_image = cv2.cvtColor(camera_data['image'], cv2.COLOR_BGRA2RGB)
        # Detect lanes
        lines = detect_lanes(rgb_image)
        lane_image = rgb_image.copy()
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                cv2.line(lane_image, (x1, y1), (x2, y2), (0, 255, 0), 5)
        # Compute lane center error for debugging
        error = compute_lane_center_error(lines, rgb_image)
        cv2.putText(lane_image, f"Lane Center Error: {error:.2f}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
        # Convert back to BGRA for display if needed
        output_image = cv2.cvtColor(lane_image, cv2.COLOR_RGB2BGRA)
        cv2.imshow('RGB Camera with Lanes', output_image)
        if cv2.waitKey(10) & 0xFF == ord('q') or key_states['q']:
            break
    cv2.destroyAllWindows()

# Start lane detection display in a separate thread
lane_display_thread = threading.Thread(target=show_camera_with_lanes, daemon=True)
lane_display_thread.start()

Exception in thread Thread-5 (show_camera_with_lanes):
Traceback (most recent call last):
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\okeiy\AppData\Local\Temp\ipykernel_29636\1667400073.py", line 83, in show_camera_with_lanes
NameError: name 'cv2' is not defined


In [4]:
# In this example, we use lane detection error to adjust steering.
# You could also combine this with CNN predictions. Here, we override steering using a simple proportional controller.
def autonomous_drive_with_lane_detection():
    global current_control
    while True:
        # Wait for a valid camera image
        if camera_data['image'] is None or np.sum(camera_data['image']) == 0:
            time.sleep(0.01)
            continue
        
        # Preprocess the image (for lane detection, use the same conversion as before)
        rgb_image = cv2.cvtColor(camera_data['image'], cv2.COLOR_BGRA2RGB)
        
        # Use lane detection to compute lane center error
        lines = detect_lanes(rgb_image)
        error = compute_lane_center_error(lines, rgb_image)
        # For debugging, print the error
        print("Lane center error:", error)
        
        # Use a simple proportional controller to compute steering.
        # For example, if error is positive, lane center is to the left, so steer right (positive steering).
        # Adjust gain (e.g., 0.5) as needed.
        steering = 0.5 * error
        
        # Set a constant throttle.
        throttle = 0.5
        brake = 0.0
        
        # Optionally, you can add conditions to brake if the error is very high.
        if abs(error) > 0.5:
            throttle = 0.0
            brake = 0.5
        
        current_control.throttle = throttle
        current_control.brake = brake
        current_control.steer = steering
        
        vehicle.apply_control(current_control)
        time.sleep(0.05)

# Run the autonomous driving loop with lane detection in a separate thread.
auto_drive_lane_thread = threading.Thread(target=autonomous_drive_with_lane_detection, daemon=True)
auto_drive_lane_thread.start()

Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0872115384615384
Lane center error: 0.0872115384615384
Lane center error: 0.0872115384615384
Lane center error: 0.0872115384615384
Lane center error: 0.0872115384615384
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane center error: 0.0
Lane 

Exception in thread Thread-6 (autonomous_drive_with_lane_detection):
Traceback (most recent call last):
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\okeiy\.conda\envs\carla-sim\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\okeiy\AppData\Local\Temp\ipykernel_29636\3062162965.py", line 15, in autonomous_drive_with_lane_detection
  File "C:\Users\okeiy\AppData\Local\Temp\ipykernel_29636\1667400073.py", line 15, in detect_lanes
NameError: name 'np' is not defined
