In [1]:
import numpy as np
import pandas as pd
import networkx as nx

In [2]:
%config InlineBackend.figure_format='retina'
%matplotlib qt 
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (8, 6)
import matplotlib.pyplot as plt

In [3]:
from CarlaBEV.envs.utils import load_planning_map, load_map

planning_map = load_planning_map()
rgbmap, _ = load_map(size=1024)

print(f"Image Size: {planning_map.shape}")
print(f"Image Size: {rgbmap.shape}")

Image Size: (1280, 1024)
Image Size: (10240, 8192, 3)


In [16]:
# Check map
plt.imshow(rgbmap)
plt.show()

In [17]:
# Example: Patch a white rectangle at (y0, x0) with height h and width w

# If rgbmap is float [0,1], use 1.0; if uint8 [0,255], use 255
if rgbmap.dtype == np.uint8:
    white = 255
else:
    white = 1.0

y0, x0 = 7409, 6792    # Top-left corner of the rectangle (change as needed)
h, w = 5, 5       # Height and width of the rectangle (change as needed)
rgbmap[y0:y0+h, x0:x0+w, :] = white

y0, x0 = 3466, 6792    # Top-left corner of the rectangle (change as needed)
h, w = 5, 5       # Height and width of the rectangle (change as needed)
rgbmap[y0:y0+h, x0:x0+w, :] = white

y0, x0 = 7256, 3492    # Top-left corner of the rectangle (change as needed)
h, w = 5, 5       # Height and width of the rectangle (change as needed)
rgbmap[y0:y0+h, x0:x0+w, :] = white

y0, x0 = 3406, 4689    # Top-left corner of the rectangle (change as needed)
h, w = 5, 5       # Height and width of the rectangle (change as needed)
rgbmap[y0:y0+h, x0:x0+w, :] = white

plt.imshow(rgbmap)
plt.title("Patched Map")
plt.show()

In [9]:
intersections = [
    (8642, 1564),
    (8654, 6755),
    (7250, 1552),
    (7241, 2446),
    (7242, 3652),
    (7242, 4704),
    (7257, 6773),
    (6199, 1552),
    (6197, 2439),
    (3349, 1545),
    (3350, 2456),
    (3350, 3639),
    (3335, 4714),
    (3315, 6773),
    (2456, 1563),
    (2446, 6757),
 ]

In [10]:
map_edges = {}
for i, coord in enumerate(intersections):
    x, y = coord
    map_edges[i] = (x, y)

In [26]:
from skimage.color import rgb2gray
from skimage.morphology import skeletonize

def get_midlane_coords(rgbmap):
    gray = rgb2gray(rgbmap)
    binary = gray > 0.99  # Adjust threshold if needed
    # Skeletonize to get lane centerlines
    skeleton = skeletonize(binary)
    # Get coordinates of skeleton pixels (centerline points)
    centerline_coords = np.argwhere(skeleton)
    return centerline_coords

def sample_equidistant_points(path_coords, step=100):
    """Given a list of (y, x) coords, return equidistant points along the path."""
    path_coords = np.array(path_coords)
    # Compute cumulative distance along the path
    deltas = np.diff(path_coords, axis=0)
    dists = np.sqrt((deltas**2).sum(axis=1))
    cumdist = np.insert(np.cumsum(dists), 0, 0)
    total_dist = cumdist[-1]
    n_points = int(total_dist // step)
    if n_points < 2:
        return path_coords  # Path too short, return as is
    sample_dists = np.linspace(0, total_dist, n_points)
    sampled_points = np.empty((n_points, 2))
    sampled_points[0] = path_coords[0]
    sampled_points[-1] = path_coords[-1]
    j = 1
    for i in range(1, n_points-1):
        d = sample_dists[i]
        while cumdist[j] < d:
            j += 1
        ratio = (d - cumdist[j-1]) / (cumdist[j] - cumdist[j-1])
        sampled_points[i] = path_coords[j-1] + ratio * (path_coords[j] - path_coords[j-1])
    return sampled_points.astype(int)

def get_route_waypoints(pixel_graph, node_a, node_b ):
    # 2. For each consecutive pair, find pixel path and discretize
    pixel_path = nx.shortest_path(pixel_graph, node_a, node_b)
    waypoints = sample_equidistant_points(pixel_path, step=300)  # step in pixels
    return waypoints

def compute_lane_offsets(waypoints, lane_width=6.0):
    """
    Given centerline waypoints (N,2), return left and right lane waypoints.
    lane_width: total width of both lanes (in pixels, adjust as needed)
    """
    waypoints = np.array(waypoints)
    left_lane = []
    right_lane = []
    side_1, side_2 = [], []
    offset = lane_width / 2.0
    offset_ped = lane_width + 10

    for i in range(len(waypoints)):
        # Compute direction vector
        if i == 0:
            direction = waypoints[i+1] - waypoints[i]
        elif i == len(waypoints) - 1:
            direction = waypoints[i] - waypoints[i-1]
        else:
            direction = waypoints[i+1] - waypoints[i-1]
        direction = direction / np.linalg.norm(direction)
        # Normal vector (perpendicular, 2D)
        normal = np.array([-direction[1], direction[0]])
        # Offset points
        left_lane.append(waypoints[i] + offset * normal)
        right_lane.append(waypoints[i] - offset * normal)
        side_1.append(waypoints[i] + offset_ped * normal)
        side_2.append(waypoints[i] - offset_ped * normal)
    return np.array(left_lane), np.array(right_lane), np.array(side_1), np.array(side_2)


In [12]:
def show_route(rgbmap, waypoints):
    # 3. Visualize the full route
    plt.imshow(rgbmap)
    for idx, p in enumerate(intersections):
        plt.scatter(p[1], p[0], c='red', s=80, zorder=3)
        plt.annotate(str(idx), (p[1], p[0]), color='k', fontsize=12, weight='bold', zorder=4)
    plt.plot(waypoints[:,1], waypoints[:,0], 'orange', lw=2, zorder=5, label='Discretized Route')
    plt.scatter(waypoints[:,1], waypoints[:,0], c='b', s=1, zorder=6, label='Waypoints')
    plt.axis('off')
    plt.legend()
    plt.show()

In [18]:
centerline_coords = get_midlane_coords(rgbmap)

In [19]:
# 1. Find the closest centerline point for each intersection
intersection_nodes = []
for intersection in intersections:
    dists = np.linalg.norm(centerline_coords - intersection, axis=1)
    idx_min = np.argmin(dists)
    intersection_nodes.append(tuple(centerline_coords[idx_min]))

In [20]:
plt.imshow(rgbmap)
plt.scatter(centerline_coords[:,1], centerline_coords[:,0], c='gray', s=0.1, label='All Centerlines')
for idx, p in enumerate(intersections):
    plt.scatter(p[1], p[0], c='red', s=20)
    plt.annotate(str(idx), (p[1], p[0]), color='k', fontsize=10, weight='bold')
plt.legend()
plt.title("Closest Centerlines & Midpoints Between Intersections")
plt.show()

In [21]:
# 2. Build a pixel adjacency graph for centerline_coords (for pathfinding only)
pixel_graph = nx.Graph()
centerline_set = set(map(tuple, centerline_coords))
for y, x in centerline_coords:
    for dy in [-1, 0, 1]:
        for dx in [-1, 0, 1]:
            if dy == 0 and dx == 0:
                continue
            neighbor = (y + dy, x + dx)
            if neighbor in centerline_set:
                pixel_graph.add_edge((y, x), neighbor)

# 3. Precompute a set of "blocked" pixels for each intersection (within a radius)
block_radius = 100  # pixels
blocked_pixels = []
for node in intersection_nodes:
    y0, x0 = node
    pixels = set()
    for dy in range(-block_radius, block_radius+1):
        for dx in range(-block_radius, block_radius+1):
            if dy**2 + dx**2 <= block_radius**2:
                pixels.add((y0+dy, x0+dx))
    blocked_pixels.append(pixels)

# 4. Build the intersection graph
G_intersections = nx.Graph()
for idx in range(len(intersections)):
    G_intersections.add_node(idx, pos=intersections[idx])

for i, node_a in enumerate(intersection_nodes):
    for j, node_b in enumerate(intersection_nodes):
        if i < j:
            try:
                path = nx.shortest_path(pixel_graph, node_a, node_b)
                path_set = set(path[1:-1])
                blocked = False
                for k, block in enumerate(blocked_pixels):
                    if k != i and k != j and path_set & block:
                        blocked = True
                        break
                if not blocked:
                    G_intersections.add_edge(i, j)
            except nx.NetworkXNoPath:
                continue

print("Direct intersection pairs (by index):", list(G_intersections.edges))

# Visualization
plt.imshow(rgbmap)
for idx, p in enumerate(intersections):
    plt.scatter(p[1], p[0], c='red', s=20)
    plt.annotate(str(idx), (p[1], p[0]), color='k', fontsize=10, weight='bold')
for i, j in G_intersections.edges:
    a = intersections[i]
    b = intersections[j]
    plt.plot([a[1], b[1]], [a[0], b[0]], 'c-', lw=2)
plt.title("Intersection Graph (nodes=intersections, edges=direct paths)")
plt.show()

Direct intersection pairs (by index): [(0, 1), (0, 2), (1, 6), (2, 3), (2, 7), (3, 4), (3, 8), (4, 5), (4, 11), (5, 6), (5, 12), (6, 13), (7, 8), (7, 9), (8, 10), (9, 10), (9, 14), (10, 11), (11, 12), (12, 13), (13, 15), (14, 15)]


In [11]:
options = {
    "font_size": 18,
    "node_size": 1000,
    "node_color": "white",
    "edgecolors": "black",
    "linewidths": 5,
    "width": 5,
}
nx.draw_networkx(G_intersections, map_edges, **options)

# Set margins for the axes so that nodes aren't clipped
ax = plt.gca()
ax.margins(0.20)
plt.axis("off")
plt.show()

In [31]:
# Usage
lane_width_pixels = 80  # Adjust to your map's scale

# Example: Discretize all edges in G_intersections
edge_waypoints = {}
edge_lanes_r = {}
edge_lanes_l = {}
for i, j in G_intersections.edges:
    node_a = intersection_nodes[i]
    node_b = intersection_nodes[j]
    # Get the pixel path between intersections
    path = nx.shortest_path(pixel_graph, node_a, node_b)
    # Sample equidistant points (e.g., every n pixels)
    waypoints = sample_equidistant_points(path, step=250)

    left_lane, right_lane, side1, side2 = compute_lane_offsets(waypoints, lane_width=lane_width_pixels)
    edge_waypoints[(i, j)] = waypoints
    edge_lanes_r[(i, j)] = right_lane 
    edge_lanes_l[(i, j)] = left_lane 

# Visualize one edge's waypoints
plt.imshow(rgbmap)
for idx, p in enumerate(intersections):
    plt.scatter(p[1], p[0], c='red', s=50)

for (i, j), waypoints in edge_waypoints.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'k.', markersize=4)

for (i, j), waypoints in edge_lanes_l.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'b-', markersize=4)

for (i, j), waypoints in edge_lanes_r.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'g-', markersize=4)

plt.title("Equidistant Waypoints Along Edges")
plt.axis('off')
plt.show()

In [9]:
def add_lane_waypoints_to_graph(G, centerline, left_lane, right_lane,
                                 connect_lateral=True,
                                 connect_diagonal=False,
                                 cost_forward=1.0,
                                 cost_lateral=3.0,
                                 cost_diagonal=5.0):
    """
    Adds a lane-level mesh to the existing graph G.
    centerline, left_lane, right_lane: lists of (x, y) waypoints, sorted along road.
    """
    N = len(centerline)
    assert len(left_lane) == N and len(right_lane) == N, "Waypoint lists must be the same length."

    for i in range(N):
        c_node = f"C_{i}"
        l_node = f"L_{i}"
        r_node = f"R_{i}"

        G.add_node(c_node, pos=centerline[i], lane="center")
        G.add_node(l_node, pos=left_lane[i], lane="left")
        G.add_node(r_node, pos=right_lane[i], lane="right")

        # Forward connections (longitudinal)
        if i < N - 1:
            G.add_edge(c_node, f"C_{i+1}", cost=cost_forward)
            G.add_edge(l_node, f"L_{i+1}", cost=cost_forward)
            G.add_edge(r_node, f"R_{i+1}", cost=cost_forward)

        # Lateral connections (lane changes)
        if connect_lateral:
            G.add_edge(c_node, l_node, cost=cost_lateral)
            G.add_edge(c_node, r_node, cost=cost_lateral)
            G.add_edge(l_node, c_node, cost=cost_lateral)
            G.add_edge(r_node, c_node, cost=cost_lateral)

        # Diagonal transitions (lane change with forward motion)
        if connect_diagonal and i < N - 1:
            G.add_edge(l_node, f"C_{i+1}", cost=cost_diagonal)
            G.add_edge(r_node, f"C_{i+1}", cost=cost_diagonal)

    return G


In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# --- Parameters ---
lane_width_pixels = 80  # Adjust to your map's scale
sample_step = 250       # Step size for waypoint sampling

# --- Containers ---
edge_waypoints = {}
edge_lanes_r = {}
edge_lanes_l = {}

# --- Full planning graph ---
G_planning = G_intersections.copy()

# --- Function to add waypoints and lane mesh to G_planning ---
def add_lane_waypoints_to_graph(G, edge_id, centerline, left_lane, right_lane,
                                connect_lateral=True,
                                connect_diagonal=False,
                                cost_forward=1.0,
                                cost_lateral=3.0,
                                cost_diagonal=5.0):
    N = len(centerline)
    edge_label = f"{edge_id[0]}-{edge_id[1]}"

    for i in range(N):
        c_node = f"{edge_label}_C_{i}"
        l_node = f"{edge_label}_L_{i}"
        r_node = f"{edge_label}_R_{i}"

        G.add_node(c_node, pos=tuple(centerline[i]), lane="center")
        G.add_node(l_node, pos=tuple(left_lane[i]), lane="left")
        G.add_node(r_node, pos=tuple(right_lane[i]), lane="right")

        if i < N - 1:
            G.add_edge(c_node, f"{edge_label}_C_{i+1}", cost=cost_forward)
            G.add_edge(l_node, f"{edge_label}_L_{i+1}", cost=cost_forward)
            G.add_edge(r_node, f"{edge_label}_R_{i+1}", cost=cost_forward)

        if connect_lateral:
            G.add_edge(c_node, l_node, cost=cost_lateral)
            G.add_edge(c_node, r_node, cost=cost_lateral)
            G.add_edge(l_node, c_node, cost=cost_lateral)
            G.add_edge(r_node, c_node, cost=cost_lateral)

        if connect_diagonal and i < N - 1:
            G.add_edge(l_node, f"{edge_label}_C_{i+1}", cost=cost_diagonal)
            G.add_edge(r_node, f"{edge_label}_C_{i+1}", cost=cost_diagonal)

# --- Main loop ---
for i, j in G_intersections.edges:
    node_a = intersection_nodes[i]
    node_b = intersection_nodes[j]

    # Get path in pixel_graph and sample waypoints
    path = nx.shortest_path(pixel_graph, node_a, node_b)
    waypoints = sample_equidistant_points(path, step=sample_step)

    # Compute lane offsets
    left_lane, right_lane, _, _ = compute_lane_offsets(waypoints, lane_width=lane_width_pixels)

    # Save for visualization/debugging
    edge_waypoints[(i, j)] = waypoints
    edge_lanes_l[(i, j)] = left_lane
    edge_lanes_r[(i, j)] = right_lane

    # Add to planning graph
    add_lane_waypoints_to_graph(
        G_planning,
        edge_id=(i, j),
        centerline=waypoints,
        left_lane=left_lane,
        right_lane=right_lane,
        connect_lateral=True,
        connect_diagonal=True
    )



In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# --- Parameters ---
lane_width_pixels = 80  # Adjust to your map's scale
sample_step = 250       # Step size for waypoint sampling

# --- Containers ---
edge_waypoints = {}
edge_lanes_r = {}
edge_lanes_l = {}

# --- Full planning graph ---
G_planning = G_intersections.copy()

# --- Function to add waypoints and lane mesh to G_planning ---
def add_lane_waypoints_to_graph(G, edge_id, centerline, left_lane, right_lane,
                                connect_lateral=True,
                                connect_diagonal=False,
                                cost_forward=1.0,
                                cost_lateral=3.0,
                                cost_diagonal=5.0):
    N = len(centerline)
    edge_label = f"{edge_id[0]}-{edge_id[1]}"

    for i in range(N):
        c_node = f"{edge_label}_C_{i}"
        l_node = f"{edge_label}_L_{i}"
        r_node = f"{edge_label}_R_{i}"

        G.add_node(c_node, pos=tuple(centerline[i]), lane="center")
        G.add_node(l_node, pos=tuple(left_lane[i]), lane="left")
        G.add_node(r_node, pos=tuple(right_lane[i]), lane="right")

        if i < N - 1:
            G.add_edge(c_node, f"{edge_label}_C_{i+1}", cost=cost_forward)
            G.add_edge(l_node, f"{edge_label}_L_{i+1}", cost=cost_forward)
            G.add_edge(r_node, f"{edge_label}_R_{i+1}", cost=cost_forward)

        if connect_lateral:
            G.add_edge(c_node, l_node, cost=cost_lateral)
            G.add_edge(c_node, r_node, cost=cost_lateral)
            G.add_edge(l_node, c_node, cost=cost_lateral)
            G.add_edge(r_node, c_node, cost=cost_lateral)

        if connect_diagonal and i < N - 1:
            G.add_edge(l_node, f"{edge_label}_C_{i+1}", cost=cost_diagonal)
            G.add_edge(r_node, f"{edge_label}_C_{i+1}", cost=cost_diagonal)

# --- Main loop ---
for i, j in G_intersections.edges:
    node_a = intersection_nodes[i]
    node_b = intersection_nodes[j]

    # Get path in pixel_graph and sample waypoints
    path = nx.shortest_path(pixel_graph, node_a, node_b)
    waypoints = sample_equidistant_points(path, step=sample_step)

    # Compute lane offsets
    left_lane, right_lane, _, _ = compute_lane_offsets(waypoints, lane_width=lane_width_pixels)

    # Save for visualization/debugging
    edge_waypoints[(i, j)] = waypoints
    edge_lanes_l[(i, j)] = left_lane
    edge_lanes_r[(i, j)] = right_lane

    # Add to planning graph
    add_lane_waypoints_to_graph(
        G_planning,
        edge_id=(i, j),
        centerline=waypoints,
        left_lane=left_lane,
        right_lane=right_lane,
        connect_lateral=True,
        connect_diagonal=True
    )


merge_threshold = 10
# --- Merge redundant nodes ---
pos = nx.get_node_attributes(G_planning, 'pos')
positions = list(pos.items())

for i in range(len(positions)):
    node_i, pos_i = positions[i]
    for j in range(i+1, len(positions)):
        node_j, pos_j = positions[j]
        if np.linalg.norm(np.array(pos_i) - np.array(pos_j)) < merge_threshold:
            # Merge node_j into node_i
            if node_j in G_planning and node_i in G_planning:
                nx.relabel_nodes(G_planning, {node_j: node_i}, copy=False)

In [None]:
# --- Optional: Visualization ---
plt.imshow(rgbmap)
for idx, p in enumerate(intersections):
    plt.scatter(p[1], p[0], c='red', s=50)

for (i, j), waypoints in edge_waypoints.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'k.', markersize=4)

for (i, j), waypoints in edge_lanes_l.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'b-', linewidth=1)

for (i, j), waypoints in edge_lanes_r.items():
    plt.plot(waypoints[:,1], waypoints[:,0], 'g-', linewidth=1)

plt.title("Equidistant Waypoints with Lane Offsets")
plt.axis('off')
plt.show()

In [35]:
# --- Draw the planning graph with nx.draw_networkx() ---
pos = nx.get_node_attributes(G_planning, 'pos')
plt.figure(figsize=(12, 12))
nx.draw_networkx(G_planning, pos=pos, node_size=10, with_labels=False, edge_color='gray')
plt.title("Lane-level Planning Graph")
plt.axis('equal')
plt.show()

In [None]:
import pickle

# Save
with open("planning_graph.pkl", "wb") as f:
    pickle.dump(G_planning, f)

# Load
with open("planning_graph.pkl", "rb") as f:
    G_planning = pickle.load(f)


In [None]:
import random 

# --- Function to generate and display a random route ---
def display_random_route(G, pos, rgbmap, max_distance=1000):
    nodes = list(pos.keys())
    while True:
        start = random.choice(nodes)
        end = random.choice(nodes)
        if start != end:
            p1 = np.array(pos[start])
            p2 = np.array(pos[end])
            if np.linalg.norm(p1 - p2) < max_distance:
                try:
                    path = nx.shortest_path(G, start, end, weight='cost')
                    coords = np.array([pos[n] for n in path])
                    break
                except nx.NetworkXNoPath:
                    continue

    plt.figure(figsize=(10, 10))
    plt.imshow(rgbmap)
    plt.plot(coords[:, 1], coords[:, 0], 'r-', linewidth=2)
    plt.scatter([coords[0, 1], coords[-1, 1]], [coords[0, 0], coords[-1, 0]], c=['green', 'red'], s=60)
    plt.title("Random Route on Lane-level Planning Graph")
    plt.axis('off')
    plt.show()

# --- Display one random route ---
display_random_route(G_planning, pos, rgbmap)

In [6]:
import random
import numpy as np


def plot_route_on_map(merged_path_coords):
    """
    Plots the route on top of the map.
    """
    plt.figure(figsize=(10, 10))
    plt.imshow(rgbmap)
    
    # Draw route
    if merged_path_coords.shape[0] > 1:
        plt.plot(merged_path_coords[:, 1], merged_path_coords[:, 0], 'r-', linewidth=3, label='Route')
        plt.scatter(merged_path_coords[0, 1], merged_path_coords[0, 0], c='green', s=100, label='Start')
        plt.scatter(merged_path_coords[-1, 1], merged_path_coords[-1, 0], c='red', s=100, label='End')

    plt.title("Shortest Path on Lane: Merged Route")
    plt.legend()
    plt.axis('off')
    plt.show()


In [4]:
from CarlaBEV.src.planning.map_graph import MapGraph
map_graph = MapGraph(graph_path='planning_graph.pkl')

In [11]:
# Find a route in the center lane within 2000 pixels
original_path, merged_path = map_graph.find_route(
    lane_type='left',
    max_distance=2000,
    merge_threshold=1
)

plot_route_on_map(merged_path)