In [1]:
import carla
import random
import time
import os
from queue import Queue
import networkx as nx
import math
import re

In [15]:
client = carla.Client('localhost', 2000)
# Set timeout to limit the networking operations so that the these don't block the client forever.
# An error will be returned if connection fails.
client.set_timeout(5.0)
# Available maps are Town 01,02,03,04,05,06,07,10
#    world = client.get_world()
print(client.get_available_maps())
#    world = client.get_world()
world = client.load_world('Town01')

# settings = world.get_settings()
# settings.no_rendering_mode = True
# world.apply_settings(settings)

['/Game/Carla/Maps/Town01', '/Game/Carla/Maps/Town01_Opt', '/Game/Carla/Maps/Town02', '/Game/Carla/Maps/Town02_Opt', '/Game/Carla/Maps/Town03', '/Game/Carla/Maps/Town03_Opt', '/Game/Carla/Maps/Town04', '/Game/Carla/Maps/Town04_Opt', '/Game/Carla/Maps/Town05', '/Game/Carla/Maps/Town05_Opt', '/Game/Carla/Maps/Town10HD', '/Game/Carla/Maps/Town10HD_Opt']


In [4]:
actor_list = []
sensor_list = []
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.find('vehicle.audi.tt')
vehicle_bp.set_attribute('role_name', 'ego')
spawn_point = random.choice(world.get_map().get_spawn_points())
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
actor_list.append(vehicle)
spectator = world.get_spectator()
spectator.set_transform(vehicle.get_transform())

In [5]:
waypoint = world.get_map().get_waypoint(vehicle.get_location(),project_to_road=True, lane_type=(carla.LaneType.Driving | carla.LaneType.Shoulder | carla.LaneType.Sidewalk))
print("Current lane type: " + str(waypoint.lane_type))
# Check current lane change allowed
print("Current Lane change:  " + str(waypoint.lane_change))
# Left and Right lane markings
print("L lane marking type: " + str(waypoint.left_lane_marking.type))
print("L lane marking change: " + str(waypoint.left_lane_marking.lane_change))
print("R lane marking type: " + str(waypoint.right_lane_marking.type))
print("R lane marking change: " + str(waypoint.right_lane_marking.lane_change))
# ...

Current lane type: Driving
Current Lane change:  NONE
L lane marking type: Broken
L lane marking change: NONE
R lane marking type: NONE
R lane marking change: NONE


In [3]:
spawn_points = world.get_map().get_spawn_points()

In [4]:
_map = world.get_map()

In [5]:
positive_direction = []
for i in range(len(spawn_points)):
    if _map.get_waypoint(spawn_points[i].location).lane_id > 0:
        positive_direction.append(spawn_points[i])

In [6]:
start = positive_direction[4]
end = positive_direction[20]

In [7]:
topology = world.get_map().get_topology()
G = nx.DiGraph() # undirected graph does not work 

In [8]:
waypoints_info = {}
for i in range(len(topology)):
    waypoint1 = topology[i][0]
    waypoint2 = topology[i][1]
    waypoint1_info = "%s-%s-%s" % (waypoint1.road_id, waypoint1.section_id, waypoint1.lane_id)
    waypoint2_info = "%s-%s-%s" % (waypoint2.road_id, waypoint2.section_id, waypoint2.lane_id)
    distance = math.sqrt(math.pow((waypoint2.transform.location.x - waypoint1.transform.location.x), 2) +
                                 math.pow((waypoint2.transform.location.y - waypoint1.transform.location.y), 2))
    # G.add_edge(waypoint1_info, waypoint2_info)
    waypoints_info[waypoint1_info] = waypoint1
    G.add_weighted_edges_from([(waypoint1_info, waypoint2_info, distance)])

In [9]:
start_waypoint = _map.get_waypoint(start.location)
start_info = "%s-%s-%s" % (start_waypoint.road_id, start_waypoint.section_id, start_waypoint.lane_id)
end_waypoint = _map.get_waypoint(end.location)
end_info = "%s-%s-%s" % (end_waypoint.road_id, end_waypoint.section_id, end_waypoint.lane_id)
path = nx.astar_path(G, start_info, end_info)
print(path)

['10-0-1', '341-0-1', '25-0-1', '75-0-1', '1-0-1', '27-0-1', '16-0--1', '243-0--1', '17-0--1', '296-0--1', '18-0--1', '218-0--1', '19-0--1', '169-0--1', '6-0-1']


In [None]:
RouteList = []
#    list_to_end = start_waypoint.next_until_lane_end(0.5)
#     for j in range(len(list_to_end)):
#         temp_waypoint = list_to_end[j]
#         temp_loc = temp_waypoint.transform.location
#         RouteList.append(temp_loc)

for i in range(len(path)):
    _waypoint = waypoints_info[path[i]]
    list_to_end = _waypoint.next_until_lane_end(0.5)
#        world.debug.draw_string(_waypoint.transform.location, '%f' % _waypoint.lane_width, draw_shadow=False,
#                                color=carla.Color(r=255, g=0, b=0), life_time=100.0)

    

In [None]:
    for j in range(1,len(list_to_end)):
        temp_waypoint = list_to_end[j]
        temp_loc = temp_waypoint.transform.location
#            RouteList.append(temp_loc)
#            world.debug.draw_string(temp_loc, '%f' % temp_waypoint.transform.rotation.yaw, draw_shadow=False,
#                                color=carla.Color(r=0, g=255, b=0), life_time=100.0)
#        time.sleep(5)
#    return RouteList, path

In [29]:
print(waypoints_info[path[1]].road_id,waypoints_info[path[1]].lane_id,waypoints_info[path[1]].lane_id)

341

In [16]:
def generate_routelist(start, end, G, waypoints_info, _map):
    start_waypoint = _map.get_waypoint(start.location)
    start_info = "%s-%s-%s" % (start_waypoint.road_id, start_waypoint.section_id, start_waypoint.lane_id)
    end_waypoint = _map.get_waypoint(end.location)
    end_info = "%s-%s-%s" % (end_waypoint.road_id, end_waypoint.section_id, end_waypoint.lane_id)
    path = nx.astar_path(G, start_info, end_info)
    print(path)
    RouteList = []
#    list_to_end = start_waypoint.next_until_lane_end(0.5)
#     for j in range(len(list_to_end)):
#         temp_waypoint = list_to_end[j]
#         temp_loc = temp_waypoint.transform.location
#         RouteList.append(temp_loc)
    
    for i in range(len(path)):
        _waypoint = waypoints_info[path[i]]
        list_to_end = _waypoint.next_until_lane_end(0.5)
        world.debug.draw_string(_waypoint.transform.location, '%s-%s-%s' % (_waypoint.road_id,_waypoint.section_id, _waypoint.lane_id), draw_shadow=False,
                                color=carla.Color(r=255, g=0, b=0), life_time=100.0)

#         for j in range(1,len(list_to_end)):
#             temp_waypoint = list_to_end[j]
#             temp_loc = temp_waypoint.transform.location
#             RouteList.append(temp_loc)
#             world.debug.draw_string(temp_loc, '%s-%s-%s' % (temp_waypoint.road_id,temp_waypoint.section_id, temp_waypoint.lane_id), draw_shadow=False,
#                                color=carla.Color(r=0, g=255, b=0), life_time=100.0)
        time.sleep(3)
#    return RouteList, path

In [17]:
generate_routelist(start, end, G, waypoints_info, _map)

['10-0-1', '341-0-1', '25-0-1', '75-0-1', '1-0-1', '27-0-1', '16-0--1', '243-0--1', '17-0--1', '296-0--1', '18-0--1', '218-0--1', '19-0--1', '169-0--1', '6-0-1']


In [35]:
route, path= generate_routelist(start, end, G, waypoints_info, _map)

['19-0--1', '218-0--1', '18-0--1', '296-0--1', '17-0--1', '243-0--1', '16-0--1', '253-0--1', '10-0-1', '232-0-1', '17-0-1']


In [32]:
for wp in path:
    world.debug.draw_string(wp, 'O', draw_shadow=False,
                            color=carla.Color(r=0, g=255, b=0), life_time=50.0)
    

In [38]:
def draw_waypoints(waypoints, path, life_time=0):
    for wp in waypoints:
        for pa in path:
            road_id = eval(re.split('[-]+',pa)[0])
            section_id = eval(re.split('[-]+',pa)[1])
            lane_id = eval(re.split('[-]+',pa)[2])
            if(wp.road_id == road_id and wp.section_id == section_id and wp.lane_id == lane_id):
                world.debug.draw_string(wp.transform.location, 'O', draw_shadow=False,
                                   color=carla.Color(r=0, g=255, b=0), life_time=life_time)
                break 

In [39]:
draw_waypoints(waypoints,path,50)

In [None]:
current_w = map.get_waypoint(vehicle.get_location())
while True:

    next_w = map.get_waypoint(vehicle.get_location(), lane_type=carla.LaneType.Driving | carla.LaneType.Shoulder | carla.LaneType.Sidewalk )
    # Check if the vehicle is moving
    if next_w.id != current_w.id:
        vector = vehicle.get_velocity()
        # Check if the vehicle is on a sidewalk
        if current_w.lane_type == carla.LaneType.Sidewalk:
            draw_waypoint_union(debug, current_w, next_w, cyan if current_w.is_junction else red, 60)
        else:
            draw_waypoint_union(debug, current_w, next_w, cyan if current_w.is_junction else green, 60)
        debug.draw_string(current_w.transform.location, str('%15.0f km/h' % (3.6 * math.sqrt(vector.x**2 + vector.y**2 + vector.z**2))), False, orange, 60)
        draw_transform(debug, current_w.transform, white, 60)

    # Update the current waypoint and sleep for some time
    current_w = next_w
    time.sleep(args.tick_time)