Full code to make all waypoints from road sections 

In [1]:
#all imports
import carla #the sim library itself
import time # to set a delay after each photo
import numpy as np #in this example to change image representation - re-shaping
import math
import sys
import random
import glob
import os
try:
    sys.path.append(glob.glob('./carla/dist/carla-0.9.14-py3.7-win-amd64.egg')[0])
except IndexError:
    print('Couldn\'t import Carla egg properly')
import carla

try:
    sys.path.insert(0,r'C:\Users\ashokkumar\source\repos\AD\carla')
except IndexError:
    pass

from carla import ColorConverter as cc
from agents.navigation.behavior_agent import BehaviorAgent   # type: ignore
from agents.navigation.basic_agent import BasicAgent  # type: ignore
from scipy.spatial import KDTree

SECONDS_PER_EPISODE = 25

SECONDS_PER_EPISODE = 10  # seconds per episode maybe needs to be removed later
FIXED_DELTA_SECONDS = 0.02 # seconds per frame
SHOW_PREVIEW = True # Show preview for debugging
NO_RENDERING = False # No rendering for training
SYNCRONOUST_MODE = False # Synchronous mode for training
SPIN = 10   # angle of spin for initail spawn
HEIGHT = 480 # height of image
WIDTH = 640 # width of image
SHOW_CAM = SHOW_PREVIEW     # render camera
front_camera = None
CAMERA_POS_Z = 1.3      # camera position similar to tesla model 3
CAMERA_POS_X = 1.4    # camera position similar to tesla model 3
im_width = WIDTH
im_height = HEIGHT


In [2]:
## INITIALIZE THE SIMULATOR

# connect to the sim 
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()
world = client.load_world('Town02')

settings = world.get_settings()
settings.no_rendering_mode = NO_RENDERING
settings.synchronous_mode = SYNCRONOUST_MODE
settings.fixed_delta_seconds = FIXED_DELTA_SECONDS
world.apply_settings(settings)
blueprint_library = world.get_blueprint_library()
actor_list = []
vehicle = None
collision_hist = []

# get map look at the map
if SHOW_CAM:
    spectator = world.get_spectator()

town_map = world.get_map()
roads = town_map.get_topology()



In [3]:
# Define the waypoint and KDTree generation logic

town_map = world.get_map()
waypoints = town_map.generate_waypoints(4)
waypoint_positions = np.array([(wp.transform.location.x, wp.transform.location.y) for wp in waypoints])
waypoint_tree = KDTree(waypoint_positions)
spawn_position = None        
start_waypoint = None
dest_waypoint = None
observation = None

print(f"Generated {len(waypoints)} waypoints")
# draw all point in the sim for 60 seconds
for wp in waypoints:
    world.debug.draw_string(wp.transform.location, '^', draw_shadow=False,
        color=carla.Color(r=0, g=0, b=255), life_time=20.0,
        persistent_lines=True)



Generated 770 waypoints


In [4]:
## CLEANUP THE SIMULATOR
def cleanup():
        for actor in actor_list:
            if actor.is_alive:
                try:
                    actor.destroy()
                except Exception as e:
                    print(f"Failed to destroy actor {actor.id}: {e}")


In [5]:
## SPAWN THE VEHICLE
def spawn_vehicle():
    spawn_points = world.get_map().get_spawn_points()
    transform = random.choice(spawn_points)
    blueprint_library = world.get_blueprint_library()
    model_3 = blueprint_library.find('vehicle.tesla.model3')
    vehicle = None
    while vehicle is None:
        try:
            vehicle = world.spawn_actor(model_3, transform)
        except Exception as e:
            print(f"Failed to spawn vehicle: {e}")
            time.sleep(0.2)  # Brief delay before retrying
    return vehicle

In [145]:
def initial_waypoint(initial_pos):
    # Query the KD-tree for the closest waypoint to initial_pos
    _, closest_index = waypoint_tree.query([initial_pos.x, initial_pos.y])
    closest_waypoint = waypoints[closest_index]
    return closest_waypoint

def next_waypoint(start_waypoint, look_ahead=2.0):
    next_wps = start_waypoint.next(look_ahead)
    if next_wps:
        return next_wps[0]
    else:
        # Handle the case where no next waypoint is found
        print("No next waypoint found")
        return None
    
def draw_waypoint(waypoint, color=carla.Color(255, 0, 0), lt=20.0):
    world.debug.draw_string(waypoint.transform.location, 'O', draw_shadow=False,
        color=color, life_time=lt, persistent_lines=True)
    
def vector3d_to_location(vector):
    return carla.Location(x=vector.x, y=vector.y, z=vector.z)

def draw_arrow(start_location, end_location,color=carla.Color(255, 0, 0), lt=20.0):
    if start_location and end_location:
        start_location = vector3d_to_location(start_location)
        end_location = vector3d_to_location(end_location)
        end_location += carla.Location(z=2)
        start_location += carla.Location(z=2)
        world.debug.draw_arrow(start_location, end_location, thickness=0.1, arrow_size=0.3, color=color, life_time=lt)

def draw_heading_arrow(location, yaw, length=5.0, color=carla.Color(255, 0, 0), life_time=5.0):
    yaw_corrected = (yaw) % 360

    # Convert yaw from degrees to radians
    yaw_rad = np.deg2rad(yaw_corrected)

    # Compute the direction vector based on the corrected yaw (heading)
    direction_x = np.cos(yaw_rad)
    direction_y = np.sin(yaw_rad)

    # Calculate the end location of the arrow using the direction vector
    end_location = carla.Location(
        x=location.x + length * direction_x,
        y=location.y + length * direction_y,
        z=2
    )
    location.z = 2
    # Draw the arrow in the CARLA world
    world.debug.draw_arrow(
        location,                 # Start point (the given location)
        end_location,             # End point (direction of the yaw)
        thickness=0.1,            # Arrow thickness
        arrow_size=0.3,           # Arrowhead size
        color=color,              # Color of the arrow
        life_time=life_time       # How long the arrow will stay in the world (in seconds)
    )

def draw_relative_heading_arrow(vehicle_transform, waypoint_transform, length=5.0, color=carla.Color(0, 255, 0), life_time=5.0):
# Extract vehicle and waypoint yaw in degrees
    vehicle_yaw = vehicle_transform.rotation.yaw
    waypoint_yaw = waypoint_transform.rotation.yaw

    # Calculate the relative heading (difference between vehicle and waypoint yaw)
    relative_heading = (vehicle_yaw - waypoint_yaw) % 360
    if relative_heading > 180:
        relative_heading -= 360

    # The correct heading to draw the arrow is the vehicle's global yaw minus the relative heading
    absolute_heading = vehicle_yaw - relative_heading

    # Convert the absolute heading to radians
    absolute_heading_rad = np.deg2rad(absolute_heading)

    # Get the vehicle's current location
    vehicle_location = vehicle_transform.location

    # Compute the arrow's end location based on the absolute heading direction
    arrow_end_location = carla.Location(
        x=vehicle_location.x + length * np.cos(absolute_heading_rad),
        y=vehicle_location.y + length * np.sin(absolute_heading_rad),
        z=vehicle_location.z
    )

    # Draw an arrow in the CARLA world from the vehicle location in the direction of absolute heading
    world.debug.draw_arrow(
        vehicle_location,             # Start point (vehicle's current location)
        arrow_end_location,           # End point (direction of the relative heading)
        thickness=0.1,                # Arrow thickness
        arrow_size=0.3,               # Arrowhead size
        color=color,                  # Color of the arrow
        life_time=life_time           # How long the arrow will stay in the world (in seconds)
    )




In [121]:
## OBSERVATION LOGIC - 1
def get_speed(v, max_speed=50.0):
    if not isinstance(v, carla.Vector3D):
        print("Invalid velocity vector provided.")
        return 0
    speed = int(3.6 * np.sqrt(v.x**2 + v.y**2 + v.z**2))
    normalized_speed = np.clip(speed / max_speed, -1, 1)  # Normalize speed to [-1, 1]
    return normalized_speed

def get_relative_heading( vehicle_transform, waypoint_transform):
    # Extract vehicle and waypoint yaw in degrees
    vehicle_yaw = vehicle_transform.rotation.yaw
    waypoint_yaw = waypoint_transform.rotation.yaw

    # Check if the current waypoint is part of a turn
    future_waypoint = next_waypoint(dest_waypoint, 10)  # Look ahead 10 waypoints

    if is_turn_or_intersection(dest_waypoint, future_waypoint,25):
        # Calculate average heading for waypoints in the turn
        turn_headings = []
        for i in range(1, 5):
            waypoint = next_waypoint(dest_waypoint, i)
            if waypoint:
                turn_headings.append(waypoint.transform.rotation.yaw)
            else:
                break
        
        if turn_headings:
            # Calculate average heading of the turn
            avg_heading = np.mean(turn_headings)
            # Compute the **absolute heading** difference
            relative_heading = (avg_heading - vehicle_yaw) % 360
            
            # Normalize to [-180, 180] range
            if relative_heading > 180:
                relative_heading -= 360
            normalized_heading = relative_heading / 180.0
            return np.clip(normalized_heading, -1, 1)

    # Compute the **absolute heading** (difference between waypoint and vehicle yaw)
    relative_heading = (waypoint_yaw - vehicle_yaw) % 360
    
    # Normalize to [-180, 180] range
    if relative_heading > 180:
        relative_heading -= 360
    normalized_heading = relative_heading / 180.0
    return np.clip(normalized_heading, -1, 1)

def is_turn_or_intersection(wp, next_wp, turn_threshold):
    # Extract yaw angles from waypoints
    yaw1 = wp.transform.rotation.yaw
    yaw2 = next_wp.transform.rotation.yaw

    # Calculate the absolute difference in yaw angles
    yaw_diff = abs(yaw1 - yaw2) % 360
    yaw_diff = yaw_diff if yaw_diff <= 180 else 360 - yaw_diff
    if yaw_diff > turn_threshold:
        print("Turn detected")
    # Determine if the yaw difference exceeds the turn threshold
    return yaw_diff > turn_threshold

In [13]:

def get_lateral_distance(vehicle_transform, waypoint, max_distance=2.0):
    # Get vehicle and waypoint positions
    waypoint_location = waypoint.transform.location
    waypoint_forward_vector = waypoint.transform.get_forward_vector()
    vehicle_location = vehicle_transform.location
    # Calculate the vector from the waypoint to the vehicle
    vehicle_vector = vehicle_location - waypoint_location

    # Convert the CARLA location objects to NumPy arrays for easier vector math
    waypoint_forward_np = np.array([waypoint_forward_vector.x, waypoint_forward_vector.y])
    vehicle_vector_np = np.array([vehicle_vector.x, vehicle_vector.y])

    # Normalize the waypoint's forward vector to get the direction
    waypoint_forward_np = waypoint_forward_np / np.linalg.norm(waypoint_forward_np)

    # Project the vehicle vector onto the waypoint's forward vector
    projection_length = np.dot(vehicle_vector_np, waypoint_forward_np)

    # Get the projection point (the closest point on the waypoint's direction line)
    projection_point = projection_length * waypoint_forward_np

    # Calculate the lateral distance as the distance between the vehicle's position and the projection point
    lateral_vector = vehicle_vector_np - projection_point
    lateral_distance = np.linalg.norm(lateral_vector)

    # Normalize the lateral distance to the range [-1, 1]
    normalized_lateral_distance = np.clip(lateral_distance / max_distance, -1, 1)

    return normalized_lateral_distance
    
def get_lateral_distance_to_center(vehicle_transform, waypoint, max_distance=2.0):
    # Get the vehicle and waypoint positions
    waypoint_location = waypoint.transform.location
    waypoint_forward_vector = waypoint.transform.get_forward_vector()
    vehicle_location = vehicle_transform.location

    # Convert CARLA location objects to NumPy arrays for easier vector calculations
    waypoint_location_np = np.array([waypoint_location.x, waypoint_location.y])
    waypoint_forward_np = np.array([waypoint_forward_vector.x, waypoint_forward_vector.y])
    vehicle_location_np = np.array([vehicle_location.x, vehicle_location.y])

    # Normalize the waypoint's forward vector (this is the direction of the lane)
    norm = np.linalg.norm(waypoint_forward_np)
    if norm == 0:
        return 0  # Prevent division by zero if the forward vector length is zero
    waypoint_forward_np = waypoint_forward_np / norm

    # Vector from the waypoint (center of the lane) to the vehicle
    vehicle_vector_np = vehicle_location_np - waypoint_location_np

    # Project the vehicle vector onto the waypoint's forward vector (lane direction)
    projection_length = np.dot(vehicle_vector_np, waypoint_forward_np)
    projection_point = projection_length * waypoint_forward_np

    # Calculate the lateral distance between the vehicle and the center line
    lateral_vector = vehicle_vector_np - projection_point
    lateral_distance = np.linalg.norm(lateral_vector)

    # Normalize the lateral distance to the range [-1, 1]
    normalized_lateral_distance = np.clip(lateral_distance / max_distance, -1, 1)

    # Check if the vehicle is to the left or right of the lane center by using a cross product
    cross_product = np.cross(waypoint_forward_np, vehicle_vector_np)
    if cross_product < 0:
        # If the cross product is negative, the vehicle is to the right of the lane
        normalized_lateral_distance *= -1

    return normalized_lateral_distance, lateral_distance

In [14]:
## OBSERVATION LOGIC - 2

def get_observation(vehicle_transform, dest_waypoint):
        if vehicle and vehicle.is_alive:
                velocity = vehicle.get_velocity()
                speed = get_speed(velocity)
        else:
                speed = 0  # Default speed if vehicle is not available
        lateral_distance = get_lateral_distance(vehicle_transform, dest_waypoint)
        heading = get_relative_heading(vehicle_transform, dest_waypoint.transform)

        print(f"lateral_distance: {lateral_distance}, speed: {speed}, heading: {heading}")
        obs = np.array([
                lateral_distance,
                speed,
                heading,
        ], dtype=np.float32)
        return obs

In [15]:
## RESET THE SIMULATOR
cleanup()
vehicle = spawn_vehicle()
actor_list.append(vehicle)
world.tick()
vehicle_transform = vehicle.get_transform()

print("Vehicle spawned at location ", vehicle_transform.location)

spectator.set_transform(carla.Transform(vehicle_transform.location + carla.Location(z=40), carla.Rotation(pitch=-90)))

Vehicle spawned at location  Location(x=162.920029, y=237.429962, z=0.490200)


In [111]:
vehicle.set_transform(dest_waypoint.transform)
vehicle_transform = vehicle.get_transform()
world.wait_for_tick()

<carla.libcarla.WorldSnapshot at 0x1718c591430>

In [112]:
start_waypoint = initial_waypoint(vehicle_transform.location)
dest_waypoint = next_waypoint(start_waypoint, look_ahead=5.0)
print("Start waypoint: ", start_waypoint.transform.location)
print("Dest waypoint: ", dest_waypoint.transform.location) 

draw_waypoint(start_waypoint, carla.Color(0, 255, 0))
draw_waypoint(dest_waypoint, carla.Color(255, 0, 0))

Start waypoint:  Location(x=146.521240, y=236.932526, z=0.000000)
Dest waypoint:  Location(x=141.521240, y=236.929489, z=0.000000)


In [124]:
## Speed Control

#vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0.0))
speed = get_speed(vehicle.get_velocity())
print (vehicle.get_velocity())
print("Speed: ", speed)
vehicle_transform = vehicle.get_transform()
#vehicle_transform.rotation.yaw += 90
vehicle.set_transform(start_waypoint.transform)
print("Vehicle moved to location ", vehicle.get_transform().location)
vehicle_transform = vehicle.get_transform()
print(vehicle_transform.location)

Vector3D(x=0.025454, y=0.000005, z=-0.000002)
Speed:  0.0
Vehicle moved to location  Location(x=150.902588, y=236.935059, z=0.221678)
Location(x=150.902588, y=236.935059, z=0.221678)


In [141]:
dest_waypoint = next_waypoint(start_waypoint, look_ahead=5.0)
print("Start waypoint: ", start_waypoint.transform)    
print("Dest waypoint: ", dest_waypoint.transform)
draw_waypoint(dest_waypoint, carla.Color(255, 0, 0))
draw_waypoint(start_waypoint, carla.Color(0, 255, 0))

Start waypoint:  Transform(Location(x=146.521240, y=236.932526, z=0.000000), Rotation(pitch=0.000000, yaw=-179.965195, roll=0.000000))
Dest waypoint:  Transform(Location(x=141.521240, y=236.929489, z=0.000000), Rotation(pitch=0.000000, yaw=-539.965149, roll=0.000000))


In [135]:
turn_waypoint = next_waypoint(start_waypoint, 10)
is_turn_or_intersection(start_waypoint,turn_waypoint,25)
print("next_waypoint: ", turn_waypoint.transform)
draw_waypoint(turn_waypoint, carla.Color(0, 0, 255))
heading = get_relative_heading(vehicle_transform, dest_waypoint.transform)
print(f"Heading: {heading:.3f}")


Turn detected
next_waypoint:  Transform(Location(x=137.983292, y=236.027344, z=0.000000), Rotation(pitch=0.000000, yaw=-143.621689, roll=0.000000))
Turn detected
Heading: -0.418


In [148]:
def next_waypoint( start_waypoint, look_ahead=5.0):
    next_wps = start_waypoint.next(look_ahead)
    if next_wps:
        return next_wps[0]
    else:
        # Handle the case where no next waypoint is found
        return None

In [153]:
def get_relative_heading_new( vehicle_transform, waypoint_transform):
    # Extract vehicle and waypoint yaw in degrees
    vehicle_yaw = vehicle_transform.rotation.yaw
    waypoint_yaw = waypoint_transform.rotation.yaw
    
    # Look ahead 10 meters (or further until the turn ends)
    future_waypoint = next_waypoint(dest_waypoint, 10)  # Look ahead 10 meters

    # Check if the current waypoint is part of a turn
    if is_turn_or_intersection(dest_waypoint, future_waypoint, 25):
        # Gather headings for waypoints in the turn until the road straightens out
        turn_headings = []
        current_waypoint = dest_waypoint
        distance_lookahead = 2.0  # Step size for looking ahead in meters
        
        while True:
            next_waypoint = next_waypoint(current_waypoint, distance_lookahead)
            
            # Check if we found a valid next waypoint
            if not next_waypoint:
                break
            
            # Calculate yaw difference between current and next waypoint
            yaw_diff = abs(current_waypoint.transform.rotation.yaw - next_waypoint.transform.rotation.yaw) % 360
            yaw_diff = yaw_diff if yaw_diff <= 180 else 360 - yaw_diff
            
            # If the yaw difference is small, assume the road has straightened out
            if yaw_diff < 10:  # Threshold for straightening out
                break
            
            # Append the yaw of the next waypoint
            turn_headings.append(next_waypoint.transform.rotation.yaw)
            current_waypoint = next_waypoint  # Move to the next waypoint

        if turn_headings:
            # Calculate the average heading of the turn
            avg_heading = np.mean(turn_headings)
            
            # Check for left or right turn
            if avg_heading > vehicle_yaw:
                # Positive relative heading, indicating a right turn
                turn_direction = 1.0
            else:
                # Negative relative heading, indicating a left turn
                turn_direction = -1.0
            
            # Compute the **absolute heading** difference between average heading and vehicle yaw
            relative_heading = (avg_heading - vehicle_yaw) % 360
            
            # Normalize to [-180, 180] range
            if relative_heading > 180:
                relative_heading -= 360
            
            # Adjust the heading based on the turn direction (positive for right, negative for left)
            normalized_heading = turn_direction * (relative_heading / 180.0)
            return np.clip(normalized_heading, -1, 1)

    # If not a turn, compute the **absolute heading** (difference between waypoint and vehicle yaw)
    relative_heading = (waypoint_yaw - vehicle_yaw) % 360

    # Normalize to [-180, 180] range
    if relative_heading > 180:
        relative_heading -= 360

    normalized_heading = relative_heading / 180.0
    return np.clip(normalized_heading, -1, 1)


In [154]:
turn_waypoint = next_waypoint(start_waypoint, 10)
is_turn_or_intersection(start_waypoint,turn_waypoint,25)
print("turn_waypoint: ", turn_waypoint.transform)
draw_waypoint(turn_waypoint, carla.Color(0, 0, 255))
heading = get_relative_heading(vehicle_transform, dest_waypoint.transform)
print(f"Heading: {heading:.3f}")
draw_heading_arrow(vehicle_transform.location, vehicle_transform.rotation.yaw, length=5.0, color=carla.Color(255, 0, 0), life_time=5.0)
draw_heading_arrow(vehicle_transform.location, heading*360, length=5.0, color=carla.Color(0, 255, 0), life_time=5.0)

Turn detected
turn_waypoint:  Transform(Location(x=137.983292, y=236.027344, z=0.000000), Rotation(pitch=0.000000, yaw=-143.621689, roll=0.000000))


UnboundLocalError: local variable 'next_waypoint' referenced before assignment