In [1]:
from enum import Enum

import carla
import networkx as nx
import numpy as np
import pandas as pd
import pickle
import scipy.spatial.distance as distance

class LaneChange(Enum):
    """Enumerator to identify lane change availability."""
    NONE = 0
    RIGHT = 1
    LEFT = 2
    BOTH = 3

class LaneType(Enum):
    """"""
    STRAIGHT = 1
    CURVE = 2

Majority of code is copied from CARLA examples i.e. `/path/to/carla/PythonAPI/carla/agents/navigation/global_route_planner.py`

In [2]:
MAPNAME = "Town10HD"
GRAPH_RESOLUTION = 2 # in carla-meter

client = carla.Client("localhost", 2000)
client.set_timeout(10.0)

world = client.load_world(MAPNAME)
world_map = world.get_map()
map_topology = world_map.get_topology()

In [None]:
## default topology from carla API creates only the minimal number of nodes
## piecewise_topology contains fine-grained resolution
piecewise_topology = []

for segment in map_topology:
    wp1, wp2 = segment[0], segment[1]
    l1, l2 = wp1.transform.location, wp2.transform.location
    
    # rounding off to avoid floating point imprecision
    x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0)
    wp1.transform.location, wp2.transform.location = l1, l2

    """
        seg_dict = {
            "entry": carla.Waypoint,
            "exit": carla.Waypoint,
            "entryxyz": Tuple[float, float, float],
            "exitxyz": Tuple[float, float, float],
            "path": List[carla.Waypoint]
        }
    """
    seg_dict = dict()
    seg_dict['entry'], seg_dict['exit'] = wp1, wp2
    seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2)
    seg_dict['path'] = []
    
    length = 0
    speedlimit = float("inf")
    
    ## refine the waypoint between entry and exit excluding the entry and exit
    endloc = wp2.transform.location
    startwp = wp1
    
    while startwp.transform.location.distance(endloc) > GRAPH_RESOLUTION:
        ## if the list of speed limit is not empty, we find the minimum speed limit
        ## else set to None
        list_speedlimit = startwp.get_landmarks_of_type(GRAPH_RESOLUTION, carla.LandmarkType.RecomendedSpeed)
        speedlimit_now = min(list_speedlimit) if list_speedlimit else None

        startwp = startwp.next(GRAPH_RESOLUTION)[0]
        
        startwp_2dloc = (startwp.transform.location.x, startwp.transform.location.y)
        seg_dict["path"].append(startwp_2dloc)

        ## find the minimum between current wp and previous wp
        ## if current wp has no speed limit (None), then the lowest is the previous speed limit
        speedlimit = min(speedlimit, speedlimit_now) if speedlimit_now else speedlimit
        
        length += GRAPH_RESOLUTION
    length += startwp.transform.location.distance(endloc)
    
    ## speed limit will be float("inf") if there are no speed limit along the waypoints
    ## set it to none instead of infinity in that case
    seg_dict["length"] = length
    seg_dict["speed_limit"] = speedlimit if speedlimit != float("inf") else None
    
    piecewise_topology.append(seg_dict)

In [None]:
graph = networkx.DiGraph()
id_map = dict() ## {(x,y,z): id, ... }

for segment in piecewise_topology:
    entry_xyz, exit_xyz = segment['entryxyz'], segment['exitxyz']
    path = segment['path']
    entry_wp, exit_wp = segment['entry'], segment['exit']
    
    road_id, section_id, lane_id = entry_wp.road_id, entry_wp.section_id, entry_wp.lane_id

    for vertex in entry_xyz, exit_xyz:
        # Adding unique nodes and populating id_map
        if vertex not in id_map:
            new_id = len(id_map)
            id_map[vertex] = new_id
            graph.add_node(new_id, vertex=(int(vertex[0]), int(vertex[1])))

    n1 = id_map[entry_xyz]
    n2 = id_map[exit_xyz]
    
    # Adding edge with attributes
    entry_2d_loc = (entry_wp.transform.location.x, entry_wp.transform.location.y)
    exit_2d_loc = (exit_wp.transform.location.x, exit_wp.transform.location.y)

    # If the `entry` allows lane change, then the whole path until next waypoint is allowed to lane change
    if entry_wp.lane_change == carla.LaneChange.NONE:
        lanechange = LaneChange.NONE
    elif entry_wp.lane_change == carla.LaneChange.Both:
        lanechange = LaneChange.BOTH
    if entry_wp.lane_change == carla.LaneChange.Right:
        lanechange = LaneChange.RIGHT
    if entry_wp.lane_change == carla.LaneChange.Left:
        lanechange = LaneChange.LEFT

    graph.add_edge(
        n1, n2,
        length=segment["length"],
        path=path,
        entry=entry_2d_loc,
        exit=exit_2d_loc,
        type=LaneType.CURVE,
        lanechange=lanechange,
        speed_limit=segment["speed_limit"]
    )

In [None]:
for edge in graph.edges.data():
    entry_node_id, exit_node_id, data = edge
    entry_loc = graph.nodes[entry_node_id]["vertex"]
    exit_loc = graph.nodes[exit_node_id]["vertex"]
    path = [entry_loc] + data["path"] + [exit_loc]
    # path = [entry_loc, exit_loc]
    color = np.random.randint(0, 256, 3).tolist() + [10]
    color = carla.Color(*color)

    for start, end in zip(path[:-1], path[1:]):
        start = carla.Location(start[0], start[1], 1)
        end = carla.Location(end[0], end[1], 1)
        world.debug.draw_line(start, end, color=color, thickness=0.02)

In [None]:
with open("%s_graph_map.pickle" % MAPNAME, "wb") as f:
    pickle.dump(graph, f)

## Check saved graph

In [3]:
graph = pd.read_pickle("../dataset/Town01_graph_map.pickle")
print(type(graph))
print(isinstance(graph, nx.DiGraph))
a = next(iter(graph.edges.data()))
print(a)
print(graph.nodes[0])

<class 'networkx.classes.digraph.DiGraph'>
True
(0, 1, {'length': 22.180007934570312, 'path': [(146.990234375, 59.493106842041016), (148.990234375, 59.492862701416016), (150.990234375, 59.492618560791016), (152.990234375, 59.492374420166016), (154.990234375, 59.49213409423828), (156.990234375, 59.49188995361328), (158.990234375, 59.49164581298828), (160.990234375, 59.49140167236328), (162.990234375, 59.49115753173828), (164.990234375, 59.49091339111328), (166.990234375, 59.49066925048828)], 'entry': (144.990234375, 59.493350982666016), 'exit': (167.1702423095703, 59.49064636230469), 'type': <LaneType.CURVE: 2>, 'lanechange': <LaneChange.NONE: 0>, 'speed_limit': None})
{'vertex': (145, 59)}


## Check planned route

In [None]:
def heuristic(start_idx, goal_idx):
    loc1 = np.array(graph.nodes[start_idx]["vertex"])
    loc2 = np.array(graph.nodes[goal_idx]["vertex"])
    return np.linalg.norm(loc1 - loc2)

In [None]:
route = nx.astar_path(graph, 0, 5, heuristic, "length")

In [None]:
edge_data = graph.get_edge_data(0, 1)
entry = edge_data["entry"]
path = edge_data["path"]

In [None]:
graph.get_edge_data(1, 0)

In [None]:
for succ in graph.successors(0):
    print(succ)