In [12]:
import sys
import os
import pyxodr

In [33]:
import json
import numpy as np
from lxml import etree
import pyxodr
from pyxodr.road_objects.road import Road
from pyxodr.road_objects.lane import Lane,ConnectionPosition,LaneOrientation, TrafficOrientation
from pyxodr.road_objects.junction import Junction
from pyxodr.road_objects.lane_section import LaneSection
from pyxodr.road_objects.network import RoadNetwork
from shapely.geometry import Polygon
from enum import IntEnum


In [86]:
town_name = 'town10HD'
json_file = town_name + '.json'
carla_map_dir = 'C:\\Carla-0.10.0\\CarlaUnreal\\Content\\Carla\\Maps\\OpenDrive'
odr_file = os.path.join(carla_map_dir, town_name + '.xodr')

In [87]:
road_network = RoadNetwork(xodr_file_path=odr_file)
roads = road_network.get_roads()
print(f"Number of roads in the network: {len(roads)}")
print(f"Type: {type(roads[0])}\nRoads: {roads}")

Number of roads in the network: 108
Type: <class 'pyxodr.road_objects.road.Road'>
Roads: [Road_0, Road_1, Road_2, Road_3, Road_4, Road_5, Road_6, Road_7, Road_8, Road_9, Road_10, Road_11, Road_12, Road_13, Road_14, Road_15, Road_16, Road_17, Road_18, Road_19, Road_20, Road_21, Road_22, Road_24, Road_33, Road_41, Road_49, Road_89, Road_90, Road_100, Road_105, Road_111, Road_125, Road_150, Road_151, Road_152, Road_156, Road_160, Road_167, Road_172, Road_179, Road_254, Road_255, Road_256, Road_258, Road_268, Road_284, Road_286, Road_296, Road_315, Road_331, Road_338, Road_344, Road_361, Road_375, Road_382, Road_395, Road_466, Road_467, Road_469, Road_472, Road_474, Road_476, Road_480, Road_486, Road_515, Road_516, Road_565, Road_566, Road_579, Road_584, Road_597, Road_608, Road_617, Road_630, Road_637, Road_644, Road_655, Road_675, Road_676, Road_703, Road_707, Road_712, Road_718, Road_735, Road_736, Road_763, Road_769, Road_780, Road_787, Road_795, Road_801, Road_811, Road_823, Road_832,

Road Edges, Road Lines, Lanes and Sidewalks extraction

In [88]:
class MapType(IntEnum):
    LANE_UNDEFINED = 0
    LANE_FREEWAY = 1
    LANE_SURFACE_STREET = 2
    LANE_BIKE_LANE = 3
    # Original definition skips 4
    ROAD_LINE_UNKNOWN = 5
    ROAD_LINE_BROKEN_SINGLE_WHITE = 6
    ROAD_LINE_SOLID_SINGLE_WHITE = 7
    ROAD_LINE_SOLID_DOUBLE_WHITE = 8
    ROAD_LINE_BROKEN_SINGLE_YELLOW = 9
    ROAD_LINE_BROKEN_DOUBLE_YELLOW = 10
    ROAD_LINE_SOLID_SINGLE_YELLOW = 11
    ROAD_LINE_SOLID_DOUBLE_YELLOW = 12
    ROAD_LINE_PASSING_DOUBLE_YELLOW = 13
    ROAD_EDGE_UNKNOWN = 14
    ROAD_EDGE_BOUNDARY = 15
    ROAD_EDGE_MEDIAN = 16
    STOP_SIGN = 17
    CROSSWALK = 18
    SPEED_BUMP = 19
    DRIVEWAY = 20  # New womd datatype in v1.2.0: Driveway entrances
    UNKNOWN = -1
    NUM_TYPES = 21


In [90]:
# Reset the "roads" key to an empty list while preserving other fields
with open(json_file, "r") as f:
    xodr_json = json.load(f)

xodr_json["roads"] = []

with open(json_file, "w") as f:
    json.dump(xodr_json, f, indent=2)

In [91]:
def save_lane_section_to_json(xodr_json, id, road_edges, road_lines, lanes, sidewalks = []):
    roads = xodr_json.get("roads", [])
    for road_edge in road_edges:
        # edge_polygon = Polygon(road_edge)
        edge_data = {
            "id": id,
            "map_element_id": int(MapType.ROAD_EDGE_BOUNDARY),
            "type": "road_edge",
            "geometry": [{"x": float(pt[0]), "y": float(pt[1]), "z": 0.0} for pt in road_edge]
        }
        roads.append(edge_data)
        id += 1
    for road_line in road_lines:
        line_data = {
            "id": id,
            "map_element_id": int(MapType.ROAD_LINE_BROKEN_SINGLE_WHITE),
            "type": "road_line",
            "geometry": [{"x": float(pt[0]), "y": float(pt[1]), "z": 0.0} for pt in road_line]
        }
        roads.append(line_data)
        id += 1
    for lane in lanes:
        lane_data = {
            "id": id,
            "map_element_id": int(MapType.LANE_SURFACE_STREET),
            "type": "lane",
            "geometry": [{"x": float(pt[0]), "y": float(pt[1]), "z": 0.0} for pt in lane]
        }
        roads.append(lane_data)
        id += 1
    # for sidewalk in sidewalks:
    #     sidewalk_data = {
    #         "id": id,
    #         "map_element_id": int(MapType.LANE_BIKE_LANE),
    #         "type": "sidewalk",
    #         "geometry": [{"x": float(pt[0]), "y": float(pt[1]), "z": 0.0} for pt in sidewalk]
    #     }
    #     roads.append(sidewalk_data)
    #     id += 1
    xodr_json["roads"] = roads
    return id

In [92]:
def get_lane_data(lane, type = "BOUNDARY"):
    if type == "BOUNDARY":
        points = lane.boundary_line
    elif type == "CENTERLINE":
        points = lane.centre_line
    else:
        raise ValueError(f"Unknown lane data type: {type}")
    
    # Check traffic direction
    travel_dir = None
    vector_lane = lane.lane_xml.find(".//userData/vectorLane")
    if vector_lane is not None:
        travel_dir = vector_lane.get("travelDir")
    
    if travel_dir == "backward":
        # Reverse points for backward travel
        points = points[::-1]

    return points

In [93]:
# Go only till last "driving" lane("parking" NTD)
# "median" lane means a road edge(add after all of them appear)
# Add "sidewalk" lane as well

id = 0
roads_json_cnt = [[],[],[]]
print(f"Network has {len(roads)} roads.")
for road_obj in roads:
    # print(f"Road ID: {road_obj.id}")
    lane_sections = road_obj.lane_sections
    # print(f"Lane Sections: {lane_sections}")
    for lane_section in lane_sections:
        # print(f"Lane Section ID: {lane_section.lane_section_ordinal}")
        # print(f"Number of Left Lanes: {len(lane_section.left_lanes)}")
        # print(f"Number of Right Lanes: {len(lane_section.right_lanes)}")
        road_edges = []
        road_lines = []
        lanes = []
        # sidwalks = []
        
        left_immediate_driveable = False
        right_immediate_driveable = False

        # Left Lanes
        add_lane_data = False
        add_edge_data = False
        previous_lane = None
        for i, left_lane in enumerate(lane_section.left_lanes):
            if left_lane.type == 'driving' or left_lane.type == 'parking':
                if i == 0:
                    left_immediate_driveable = True

                if add_lane_data:
                    road_lines.append(get_lane_data(previous_lane, "BOUNDARY"))
                    lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
                # Add outer edge as road edge
                elif add_edge_data:
                    road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
                add_lane_data = True
                add_edge_data = False
            else:
                # Add inner lane as road edge
                if add_lane_data and i != 0:
                    lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
                    road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
                add_edge_data = True
                add_lane_data = False
            previous_lane = left_lane

        if add_lane_data:
            lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
            road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
        # elif add_edge_data:
            # if previous_lane.type == 'sidewalk':
            #     sidwalks.append(get_lane_data(previous_lane, "BOUNDARY"))
        
        # print("LEFT STATS")
        # print(f"Number of Road edges: {len(road_edges)}")
        # print(f"Road lines: {len(road_lines)}")
        # print(f"Lanes: {len(lanes)}")
        # print(f"Sidewalks: {len(sidwalks)}")

        # Right Lanes
        add_lane_data = False
        add_edge_data = False
        previous_lane = None
        for i, right_lane in enumerate(lane_section.right_lanes):
            if right_lane.type == 'driving' or right_lane.type == 'parking':
                if i == 0:
                    right_immediate_driveable = True

                if add_lane_data:
                    road_lines.append(get_lane_data(previous_lane, "BOUNDARY"))
                    lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
                # Add outer edge as road edge
                elif add_edge_data:
                    road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
                add_lane_data = True
                add_edge_data = False
            else:
                # Add inner lane as road edge
                if add_lane_data and i != 0:
                    lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
                    road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
                add_edge_data = True
                add_lane_data = False
            previous_lane = right_lane

        if add_lane_data:
            lanes.append(get_lane_data(previous_lane, "CENTERLINE"))
            road_edges.append(get_lane_data(previous_lane, "BOUNDARY"))
        # elif add_edge_data:
            #     if previous_lane.type == 'sidewalk':
            #         sidwalks.append(get_lane_data(previous_lane, "BOUNDARY"))
        
        # print(f"Number of Road edges in {road_obj.id}: {len(road_edges)}")
        # print(f"Road lines in {road_obj.id}: {len(road_lines)}")
        # print(f"Lanes in {road_obj.id}: {len(lanes)}")
        # print(f"Sidewalks in {road_obj.id}: {len(sidwalks)}")

        # If atleast one side has no immediate driveable lane add center as road edge
        if not left_immediate_driveable or not right_immediate_driveable:
            road_edges.append(lane_section.lane_section_reference_line)
        else:
            road_lines.append(lane_section.lane_section_reference_line)
            
        if len(road_lines) == 0 and len(lanes) == 0:
            road_edges = []
        id = save_lane_section_to_json(xodr_json, id, road_edges, road_lines, lanes)
        roads_json_cnt[0].append(len(road_edges))
        roads_json_cnt[1].append(len(road_lines))
        roads_json_cnt[2].append(len(lanes))
        # if len(lanes) == 0 and len(road_lines) != 0:
        #     print(f"Road: {road_obj.id}, Lane Section: {lane_section.lane_section_ordinal}")
        #     print(f"Road edges: {len(road_edges)}, Road lines: {len(road_lines)}, Lanes: {len(lanes)}")
    #     break
    # break
print(f"Total roads JSON count: {sum(roads_json_cnt[0]) + sum(roads_json_cnt[1]) + sum(roads_json_cnt[2])}")
# print(f"Road edges count: {roads_json_cnt[0]}")
# print(f"Road lines count: {roads_json_cnt[1]}")
# print(f"Lanes count: {roads_json_cnt[2]}")

Network has 108 roads.
Total roads JSON count: 430


In [94]:
# Save to file
with open(json_file, "w") as f:
    json.dump(xodr_json, f, indent=2)

In [95]:
print(len(road_edges))
# print(road_edges[0].shape)
# print(road_lines[0].shape)
# print(lanes[0].shape)
# print(sidwalks[0].shape)
print(road_edges)
# print(road_lines)
# print(lanes)
# print(sidwalks)

0
[]


In [96]:
base_path = r"C:\Users\91747\Documents\Academic_NYU\Emerge-Lab\gpudrive\data_utils\carla"

full_path = os.path.join(base_path, json_file)
with open(full_path, "r") as f:
    xodr_json = json.load(f)

print(f"Total number of road elements: {len(xodr_json['roads'])}")
print("\nFirst road element:")
print(xodr_json["roads"][0])

Total number of road elements: 430

First road element:
{'id': 0, 'map_element_id': 15, 'type': 'road_edge', 'geometry': [{'x': 97.73075659661255, 'y': 22.783206836743506, 'z': 0.0}, {'x': 97.72926979992536, 'y': 22.682382543591004, 'z': 0.0}, {'x': 97.72778279791288, 'y': 22.580852389941146, 'z': 0.0}, {'x': 97.72630596461157, 'y': 22.47834470892591, 'z': 0.0}, {'x': 97.72485385511014, 'y': 22.375542026511624, 'z': 0.0}, {'x': 97.72343079011893, 'y': 22.272738937957236, 'z': 0.0}, {'x': 97.72203676975151, 'y': 22.169935451470522, 'z': 0.0}, {'x': 97.72067179411908, 'y': 22.067131575251377, 'z': 0.0}, {'x': 97.71933586333067, 'y': 21.964327317504686, 'z': 0.0}, {'x': 97.71802897749286, 'y': 21.86152268644325, 'z': 0.0}, {'x': 97.71675113670992, 'y': 21.75871769026707, 'z': 0.0}, {'x': 97.71550234108379, 'y': 21.65591233717617, 'z': 0.0}, {'x': 97.7142825907142, 'y': 21.553106635381464, 'z': 0.0}, {'x': 97.71309188569849, 'y': 21.450300593085995, 'z': 0.0}, {'x': 97.71193022613168, 'y':

In [97]:
xodr_roads = xodr_json.get("roads", [])
# Extract all x and y coordinates from the roads
all_x = []
all_y = []

for road in xodr_roads:
    for point in road['geometry']:
        all_x.append(point['x'])
        all_y.append(point['y'])

# Find min and max values
min_x = min(all_x)
max_x = max(all_x)
min_y = min(all_y)
max_y = max(all_y)

print(f"X range: {min_x} to {max_x}")
print(f"Y range: {min_y} to {max_y}")

x_range = [min_x, max_x]
y_range = [min_y, max_y]


X range: -116.34519969394916 to 111.7342640183027
Y range: -142.96397800160645 to 70.48066461278475


Populating The Json

In [None]:
def save_object_to_json(xodr_json, id, x_range, y_range, v_x_range, v_y_range, headings_range, max_positions, object_type="vehicle"):
    # Generate random position within the map bounds of size max_positions
    positions = []
    for _ in range(max_positions):
        x = np.random.uniform(x_range[0], x_range[1])
        y = np.random.uniform(y_range[0], y_range[1])
        positions.append({"x": float(x), "y": float(y), "z": 1.0})
    
    velocities = []
    for _ in range(max_positions):
        v_x = np.random.uniform(v_x_range[0], v_x_range[1])
        v_y = np.random.uniform(v_y_range[0], v_y_range[1])
        velocities.append({"x": float(v_x), "y": float(v_y)})
    
    headings = []
    for _ in range(max_positions):
        heading = np.random.uniform(headings_range[0], headings_range[1])
        headings.append(float(heading))
    
    z = 1.0
    
    # Generate random heading from the specified range
    heading = np.random.uniform(headings_range[0], headings_range[1])
    
    object_data = {
        "position": list(positions),
        "width": 2.0,
        "length": 4.5,
        "height": 1.8,
        "id": id,
        "heading": list(headings),
        "velocity": list(velocities),
        "valid": [True] * max_positions,
        "type": object_type,
        "mark_as_expert": False
    }
    
    objects = xodr_json.get("objects", [])
    objects.append(object_data)
    xodr_json["objects"] = objects
    return id + 1

In [None]:
number_of_objects = 100
max_positions = 91
headings_range = [-np.pi, np.pi]
v_x_range = [-10.0, 10.0]
v_y_range = [-10.0, 10.0]
id = 0

# Reset the "objects" key to an empty list while preserving other fields
xodr_json["objects"] = []

for _ in range(number_of_objects):
    id = save_object_to_json(xodr_json, id, x_range, y_range, v_x_range, v_y_range, headings_range, max_positions)

if len(xodr_json["objects"]) != number_of_objects:
    print("Objects count mismatch: Expected", number_of_objects, "but found", len(xodr_json["objects"]))

In [None]:
# Save to file
with open(full_path, "w") as f:
    json.dump(xodr_json, f, indent=2)

In [None]:
import carla
import numpy as np

# Connect to CARLA
client = carla.Client('localhost', 2000)
# If you want to ensure you're using Town01 (to match your OpenDRIVE file), uncomment the next line:
world = client.load_world('Town01')
# world = client.get_world()
current_map = world.get_map()
print("Currently loaded map:", current_map.name)

# Spawn a vehicle
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.filter('vehicle.*')[0]
spawn_points = current_map.get_spawn_points()
vehicle = world.spawn_actor(vehicle_bp, spawn_points[0])

# Enable autopilot
vehicle.set_autopilot(True)

# Log trajectory
trajectory = []
for _ in range(1000):  # Log for 1000 steps
    world.tick()
    transform = vehicle.get_transform()
    velocity = vehicle.get_velocity()
    
    trajectory.append({
        "position": {"x": transform.location.x, "y": transform.location.y, "z": transform.location.z},
        "heading": transform.rotation.yaw,
        "velocity": {"x": velocity.x, "y": velocity.y}
    })

# Save trajectory
with open("expert_trajectory.json", "w") as f:
    json.dump(trajectory, f, indent=2)

RuntimeError: No connection could be made because the target machine actively refused it.

In [None]:
# try:
#     if os.path.exists(odr_file):
#         tree = etree.parse(odr_file)
#         root = tree.getroot()
#         # Find all <road> elements
#         road_elements = root.findall('road')
#         # Example: create a pyxodr Road object for the first road
#         if road_elements:
#             print(type(road_elements))
#             print(type(road_elements[0]))
#             road_obj = Road(road_elements[7])
#             print(f'Successfully created pyxodr Road object: {road_obj}')
#         else:
#             print('No <road> elements found in the file.')
#     else:
#         print('OpenDRIVE file not found:', odr_file)
# except Exception as e:
#     print('Error reading OpenDRIVE file or creating Road object:', e)

In [None]:
print(road_obj.lane_sections)

[Section_0/Road_355]
