In [3]:
import carla
import random

# Load map and set waypoints

In [14]:
client = carla.Client('localhost', 2000)
world = client.load_world('Town04')
map = world.get_map()

waypoints = map.generate_waypoints(distance=2.0)

for waypoint in waypoints:
    world.debug.draw_point(waypoint.transform.location, size=0.1, color=carla.Color(255, 0, 0), life_time=0)

In [15]:
len(waypoints)

16919

In [16]:
intersections = []
for waypoint in waypoints:
    if waypoint.is_intersection == True:
        intersections.append(waypoint)

In [17]:
def is_intersection(waypoint):
    return len(waypoint.next(0.1)) > 1

# Separate waypoints into intersections and non-intersections
intersection_waypoints = [wp for wp in waypoints if is_intersection(wp)]
non_intersection_waypoints = [wp for wp in waypoints if not is_intersection(wp)]

In [19]:
len(intersection_waypoints)

30

In [18]:
len(non_intersection_waypoints)

16889

In [23]:
def generate_route_with_turns(carla_map, min_length, max_length, num_turns):
    for _ in range(1000):  # Try up to 100 times to find a valid route
        route = []
        turn_count = 0
        
        # Randomly select a starting waypoint from non-intersections
        current_wp = random.choice(non_intersection_waypoints)
        route.append(current_wp)
        
        while len(route) < max_length:
            # Determine if the current waypoint is at an intersection
            if is_intersection(current_wp):
                turn_count += 1
            
            # Get next waypoints
            next_wps = current_wp.next(2.0)
            if not next_wps:
                break  # Dead end reached
            
            # At intersections, randomly choose a turn
            if is_intersection(current_wp):
                current_wp = random.choice(next_wps)
            else:
                # Continue straight if possible
                straight_wps = [wp for wp in next_wps if wp.lane_id == current_wp.lane_id]
                if straight_wps:
                    current_wp = straight_wps[0]
                else:
                    current_wp = random.choice(next_wps)
            
            route.append(current_wp)
            
            # Check if route meets criteria
            if len(route) >= min_length and turn_count >= num_turns:
                break
        
        # Verify route meets the exact number of turns required
        if turn_count == num_turns and min_length <= len(route) <= max_length:
            return route  # Valid route found
    
    print("Could not find a route meeting the criteria.")
    return None


In [25]:
# Parameters
min_length_1_turn = 25
max_length_1_turn = 34
min_length_2_turns = 35
max_length_2_turns = 49

# Generate a route with 1 turn
route_1_turn = generate_route_with_turns(
    map, min_length_1_turn, max_length_1_turn, num_turns=1
)

# Generate a route with 2 turns
route_2_turns = generate_route_with_turns(
    map, min_length_2_turns, max_length_2_turns, num_turns= 1
)


In [26]:
route_1_turn

[<carla.libcarla.Waypoint at 0x7f64dcb45120>,
 <carla.libcarla.Waypoint at 0x7f64dda90c80>,
 <carla.libcarla.Waypoint at 0x7f64dda90c10>,
 <carla.libcarla.Waypoint at 0x7f64dd9675f0>,
 <carla.libcarla.Waypoint at 0x7f64dd967740>,
 <carla.libcarla.Waypoint at 0x7f64ddab6b30>,
 <carla.libcarla.Waypoint at 0x7f64ddab6c80>,
 <carla.libcarla.Waypoint at 0x7f64ddab6eb0>,
 <carla.libcarla.Waypoint at 0x7f64ddab6ba0>,
 <carla.libcarla.Waypoint at 0x7f64ddab6e40>,
 <carla.libcarla.Waypoint at 0x7f64ddab6cf0>,
 <carla.libcarla.Waypoint at 0x7f64ddab6d60>,
 <carla.libcarla.Waypoint at 0x7f64ddab6ac0>,
 <carla.libcarla.Waypoint at 0x7f64ddab6c10>,
 <carla.libcarla.Waypoint at 0x7f64ddab6dd0>,
 <carla.libcarla.Waypoint at 0x7f64ddab6f90>,
 <carla.libcarla.Waypoint at 0x7f64ddab6f20>,
 <carla.libcarla.Waypoint at 0x7f64dd9549e0>,
 <carla.libcarla.Waypoint at 0x7f64dd954a50>,
 <carla.libcarla.Waypoint at 0x7f64dd954cf0>,
 <carla.libcarla.Waypoint at 0x7f64dd954190>,
 <carla.libcarla.Waypoint at 0x7f6

In [29]:
# 可视化路线上的路点
for waypoint in route_2_turns:
    world.debug.draw_point(waypoint.transform.location, size=0.2, color=carla.Color(0, 255, 0), life_time=0)


In [30]:
def get_number_of_obstacles(route_length):
    if route_length > 40:
        return 2
    else:
        return 1


In [33]:
def spawn_obstacles_along_route(world, route, num_obstacles):
    obstacle_waypoints = random.sample(route[5:-5], num_obstacles)  # Avoid start and end points
    obstacles = []
    
    blueprint_library = world.get_blueprint_library()
    pedestrian_bps = list(blueprint_library.filter('walker.pedestrian.*'))
    vehicle_bps = list(blueprint_library.filter('vehicle.*.*'))
    obstacle_bps = pedestrian_bps + vehicle_bps

    for wp in obstacle_waypoints:
        obstacle_bp = random.choice(obstacle_bps)
        transform = wp.transform
        obstacle = world.try_spawn_actor(obstacle_bp, transform)
        if obstacle:
            obstacles.append(obstacle)
    return obstacles


In [34]:
# For route with 1 turn
num_obstacles = get_number_of_obstacles(len(route_1_turn))
obstacles_route_1 = spawn_obstacles_along_route(world, route_1_turn, num_obstacles)

# For route with 2 turns
num_obstacles = get_number_of_obstacles(len(route_2_turns))
obstacles_route_2 = spawn_obstacles_along_route(world, route_2_turns, num_obstacles)


In [35]:
obstacles_route_1

[<carla.libcarla.Walker at 0x7f64dd98e970>]

In [36]:
for obs in obstacles_route_1:
        world.debug.draw_box(obs.bounding_box, obs.get_transform().rotation, 0.1, carla.Color(0, 0, 255), life_time=0.0)


In [8]:
def determine_action(current_wp, next_wp, obstacles):
    # Logic to determine action based on waypoint orientation and road options
    # This is simplified and may need adjustments
    if next_wp.road_id != current_wp.road_id:
        # Likely a turn
        yaw_diff = next_wp.transform.rotation.yaw - current_wp.transform.rotation.yaw
        if yaw_diff > 10:
            return 'Left'
        elif yaw_diff < -10:
            return 'Right'
    if current_wp in obstacles:
        return 'Stop'
    return 'Forward'

In [9]:
from agents.navigation.local_planner import RoadOption
def spawn_landmark(name, transform):
    landmark_bp = blueprint_library.find(name)
    landmark = world.spawn_actor(landmark_bp, transform)
    return landmark

In [10]:
def get_nearby_obstacles(waypoint, world, threshold_distance=1.0):
    """
    Identifies obstacles near the given waypoint.

    Parameters:
    - waypoint: The waypoint to check around.
    - world: The CARLA world instance.
    - threshold_distance: The maximum distance to consider an obstacle as nearby.

    Returns:
    - obstacles: A list of dictionaries containing obstacle information.
    """
    obstacles = []
    waypoint_location = waypoint.transform.location

    # Get all actors in the world
    actors = world.get_actors()

    # Define obstacle types to consider
    obstacle_types = ['vehicle.', 'walker.pedestrian.', 'static.prop.']

    for actor in actors:
        # Check if the actor is one of the obstacle types
        if any(actor.type_id.startswith(obstacle_type) for obstacle_type in obstacle_types):
            # Calculate distance between the actor and the waypoint
            actor_location = actor.get_location()
            distance = actor_location.distance(waypoint_location)
            if distance < threshold_distance:
                # Collect obstacle information
                obstacle_info = {
                    'id': actor.id,
                    'type': actor.type_id,
                    'location': actor_location,
                    'distance': distance
                }
                obstacles.append(obstacle_info)

    return obstacles


In [11]:
def get_nearby_landmarks(waypoint):
    landmarks = []
    actors = world.get_actors()
    for actor in actors:
        if actor.type_id.startswith('static.prop') or actor.type_id.startswith('static.building'):
            distance = actor.get_location().distance(waypoint.transform.location)
            if distance < 1.0:
                landmarks.append(actor)
    return landmarks


In [16]:
# List to store all obstacles in the environment
obstacles = []

# Example function to spawn obstacles and add them to the list
def spawn_obstacle_at_waypoint(waypoint, obstacle_type='walker.pedestrian.*'):
    blueprint_library = world.get_blueprint_library()
    obstacle_bp = random.choice(blueprint_library.filter(obstacle_type))
    transform = waypoint.transform
    obstacle = world.spawn_actor(obstacle_bp, transform)
    obstacles.append(obstacle)
    return obstacle


In [17]:
obstacles

[]

In [13]:
route = generate_complex_route(map, 50)

In [15]:
len(route)

50

In [16]:
route_data = []
for wp in route:
    
    data_point = {
        'waypoint': wp,
        'action': determine_action(wp, route[i + 1] if i + 1 < len(route) else None, obstacles),
        'obstacles': get_nearby_obstacles(wp),
        'landmarks': get_nearby_landmarks(wp),
        'road_type': wp.lane_type  # or other relevant info
    }
    route_data.append(data_point)


TypeError: determine_action() missing 2 required positional arguments: 'next_wp' and 'obstacles'

In [None]:
blueprint_library = world.get_blueprint_library()

# Spawn a pedestrian
walker_bp = random.choice(blueprint_library.filter('walker.pedestrian.*'))
spawn_point = random.choice(world.get_map().get_spawn_points())
pedestrian = world.spawn_actor(walker_bp, spawn_point)

# Spawn a vehicle
vehicle_bp = random.choice(blueprint_library.filter('vehicle.*.*'))
vehicle_spawn_point = random.choice(world.get_map().get_spawn_points())
other_vehicle = world.spawn_actor(vehicle_bp, vehicle_spawn_point)
other_vehicle.set_autopilot(True)


In [18]:
import carla
import random

# Initialize the CARLA client and world
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()
blueprint_library = world.get_blueprint_library()
map = world.get_map()

# Generate the route
n = 50  # Number of waypoints
route = generate_complex_route(map, n)

# List to store all obstacles
obstacles = []

# Function to spawn obstacles and add them to the obstacles list
def spawn_obstacle_at_waypoint(waypoint, obstacle_type='walker.pedestrian.*'):
    obstacle_bp = random.choice(blueprint_library.filter(obstacle_type))
    transform = waypoint.transform
    obstacle = world.try_spawn_actor(obstacle_bp, transform)
    if obstacle:
        obstacles.append(obstacle)
        return obstacle
    else:
        return None

# Spawn obstacles at selected waypoints along the route
for wp in route:
    if random.random() < 0.1:  # 10% chance to spawn an obstacle at a waypoint
        spawn_obstacle_at_waypoint(wp)

# Data collection loop
route_data = []
for i, wp in enumerate(route):
    # Get nearby obstacles at the current waypoint
    obstacles_nearby = get_nearby_obstacles(wp, obstacles, threshold_distance=15.0)

    # Get nearby landmarks
    landmarks_nearby = get_nearby_landmarks(wp)

    # Record road type or other environmental data
    road_type = wp.lane_type

    # Determine the action at this waypoint
    next_wp = route[i + 1] if i + 1 < len(route) else None
    action = determine_action(wp, next_wp, obstacles)

    # Create the data point
    data_point = {
        'waypoint': wp,
        'action': action,
        'obstacles': obstacles_nearby,
        'landmarks': landmarks_nearby,
        'road_type': road_type
    }

    # Generate the instruction using the LLM
    instruction = generate_instruction(action, data_point)
    data_point['instruction'] = instruction

    # Add the data point to the route data
    route_data.append(data_point)

# Remember to clean up actors after simulation
for obstacle in obstacles:
    obstacle.destroy()


AttributeError: 'list' object has no attribute 'get_actors'