# OpenDRIVE - Lanelet2 conversion

### 1. Imports

In [206]:
import os
import math
import itertools
import xml.etree.ElementTree as ET
from pprint import pprint
from lxml import etree
from crdesigner.common.config.lanelet2_config import lanelet2_config
from crdesigner.map_conversion.lanelet2.cr2lanelet import CR2LaneletConverter
from commonroad.scenario.scenario import Location, GeoTransformation
from crdesigner.map_conversion.map_conversion_interface import opendrive_to_commonroad
from pathlib import Path

### 2. Declarations

In [None]:
# Input handling
input_dir = Path("./sample_data")
set_list = [
    "naive",
    "CARLA",
    "esmini"
]

# Output handling
output_dir = Path("./output_old")
output_dir.mkdir(exist_ok = True)

# Georeference params
georeference_string = "EPSG:3857"
x_translation = 658761.0
y_translation = 4542599.0

# Scenario handling
location_geotransformation = GeoTransformation(
    georeference_string,
    x_translation,
    y_translation,
    None,
    None
)
scenario_location = Location(
    11,
    0.0,
    0.0,
    location_geotransformation,
    None
)

### 3. Attempt conversion

In [208]:
PointCoords = tuple[float, float]
R = 6378.1370                       # Earth radius

In [209]:
def coords2XY(p: PointCoords):
    
    lat, lon = p[0], p[1]

    x = math.radians(lon) * R * math.cos(math.radians(lat))
    y = math.radians(lat) * R

    return x, y

In [210]:
def dist_2nodes(
    p1: PointCoords, 
    p2: PointCoords
):

    lat1, lon1 = p1[0], p1[1]
    lat2, lon2 = p2[0], p2[1]

    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat / 2) ** 2 + \
        math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
    dist = 2 * R * math.asin(math.sqrt(a))
    
    return dist

In [211]:
def calAngleTriplePoints(
    p1: PointCoords, 
    p2: PointCoords, 
    p3: PointCoords
):

    a = coords2XY(p1)
    b = coords2XY(p2)
    c = coords2XY(p3)

    v1 = (a[0] - b[0], a[1] - b[1])         # (b, a) vector, or p2 --> p1
    v2 = (c[0] - b[0], c[1] - b[1])         # (b, c) vector, or p2 --> p3

    norm_v1 = math.hypot(*v1)
    norm_v2 = math.hypot(*v2)
    
    if (norm_v1 == 0) or (norm_v2 == 0):    # Tbh this edge case ain't happen tho
        return 180

    dot = v1[0] * v2[0] + v1[1] * v2[1]

    angle = math.degrees(math.acos(
        max(min(dot / (norm_v1 * norm_v2), 1.0), -1.0)
    ))

    return angle

In [212]:
def simplifyWayNodes(
    points: list[PointCoords],
    straight_angle_threshold = 175.0,
    min_segment_dist = 3.0
):

    if (len(points) <= 2):                              # No point in simplifying 2 points
        return points

    simplified_points = [points[0]]

    for i in range(1, len(points) - 1):                 # Second first to second last

        this_angle = calAngleTriplePoints(
            points[i - 1],
            points[i],
            points[i + 1]
        )
        this_dist = dist_2nodes(points[i - 1], points[i])

        if (
            (this_angle < straight_angle_threshold) or
            (this_dist >= min_segment_dist)
        ):
            simplified_points.append(points[i])

    simplified_points.append(points[-1])

    # Fall back on sanity check
    if (len(simplified_points) < 2) and (len(points) >= 2):
        return [points[0], points[-1]]

    return simplified_points

In [213]:
def postprocessDownsamplingOSM(
    osm_root,
    straight_angle_threshold: float,
    min_segment_dist: float
):

    # Map node ID to (lat, lon)
    nodes = {
        node.get("id"): (
            float(node.get("lat")), 
            float(node.get("lon"))
        ) 
        for node in osm_root.findall("node")
    }

    ways = osm_root.findall("way")
    new_node_id_gen = itertools.count(1_000_000)
    used_node_ids = set()
    new_nodes = []

    for way in ways:

        nd_refs = [
            nd.get("ref") 
            for nd in way.findall("nd")
        ]
        coords = [
            nodes[ref] 
            for ref in nd_refs 
            if ref in nodes
        ]

        if (len(coords) < 2):
            print(f"Skipping way {way.get('id')} cuz not enough points.")

        simplified = simplifyWayNodes(coords, straight_angle_threshold)
        
        if (len(simplified) < 2):
            print(f"Skipping way {way.get('id')} cuz its too simple after filtering.")
            continue
        
        # Remove old <nd>
        for nd in way.findall("nd"):
            way.remove(nd)

        # New <node> & <nd> refs

        for lat, lon in simplified:

            node_id = str(next(new_node_id_gen))

            if node_id not in used_node_ids:
                used_node_ids.add(node_id)
                node = etree.Element(
                    "node", 
                    id = node_id, 
                    visible = "true", 
                    version = "1", 
                    lat = str(lat), 
                    lon = str(lon)
                )
                new_nodes.append(node)

            nd = etree.Element("nd", ref = node_id)
            way.append(nd)

    # Remove old <node> elements
    for node in osm_root.findall("node"):
        osm_root.remove(node)

    # Add new resampled nodes
    for node in new_nodes:
        osm_root.append(node)
    print(f"[debug] final osm_root num nodes: {len(osm_root)}")

    return osm_root

In [214]:
STRAIGHT_ANGLE_THRSH = 179.9            # Extremely strict angle threshold (trust me, 179 wasn't enough)
MIN_SEGMENT_DIST = 3.0                  # Minimum segment length accepted

for set_name in set_list:
    print(f"\n=============== Working on {set_name} ===============\n")
    this_set_input_path = input_dir / set_name
    this_set_output_path = output_dir / set_name
    if not (os.path.exists(this_set_output_path)):
        os.makedirs(this_set_output_path)

    for input_file in os.listdir(this_set_input_path):
        input_file_path = this_set_input_path / input_file
        try:
            # Input handling
            print(f"Converting {input_file_path}")

            # Output handling
            output_filename = f"converted_{input_file}.osm"
            output_preprocess_filename = f"preprocess_{input_file}.osm"
            output_preprocess_path = this_set_output_path / output_preprocess_filename
            output_path = this_set_output_path / output_filename

            # Scenario initialization
            scenario = opendrive_to_commonroad(input_file_path)
            scenario.location = scenario_location
            
            # Conversion
            if (scenario):
                l2osm = CR2LaneletConverter(lanelet2_config)
                osm = l2osm(scenario)

                with open(f"{output_preprocess_path}", "wb") as file_out:
                    file_out.write(etree.tostring(
                        osm, 
                        xml_declaration = True, 
                        encoding = "UTF-8", 
                        pretty_print = True
                    ))

                # Here comes my postprocessing
                downsamp_osm = postprocessDownsamplingOSM(
                    osm, 
                    STRAIGHT_ANGLE_THRSH,
                    MIN_SEGMENT_DIST
                )

                with open(output_path, "wb") as file_out:
                    file_out.write(etree.tostring(
                        downsamp_osm, 
                        xml_declaration = True, 
                        encoding = "UTF-8", 
                        pretty_print = True
                    ))
            
            print("Quest complete")

        except Exception as e:
            print(f"Error: {e}")
            continue



Converting sample_data/TIERIV_original/three_straight_lanes.xodr
[debug] final osm_root num nodes: 15
Quest complete


Converting sample_data/esmini/curves_elevation.xodr
[debug] final osm_root num nodes: 5540
Quest complete
Converting sample_data/esmini/curves.xodr
[debug] final osm_root num nodes: 5540
Quest complete
Converting sample_data/esmini/two_plus_one.xodr




[debug] final osm_root num nodes: 438
Quest complete
Converting sample_data/esmini/multi_intersections.xodr


07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no
07-Apr-25 23:02:38 - INFO - cr2lanelet::_append_from_sign: lanelet with yield sign has no


[debug] final osm_root num nodes: 9822
Quest complete
Converting sample_data/esmini/striaghtAndCurves.xodr
[debug] final osm_root num nodes: 5540
Quest complete
Converting sample_data/esmini/e6mini-lht.xodr




[debug] final osm_root num nodes: 42
Quest complete
Converting sample_data/esmini/straight_500m_signs.xodr
Error: 'SWE'
Converting sample_data/esmini/curve_r100.xodr
[debug] final osm_root num nodes: 956
Quest complete
Converting sample_data/esmini/soderleden.xodr
Error: int() argument must be a string, a bytes-like object or a number, not 'NoneType'
Converting sample_data/esmini/straight_500m_roadmarks.xodr




[debug] final osm_root num nodes: 11
Quest complete
Converting sample_data/esmini/e6mini.xodr
[debug] final osm_root num nodes: 30
Quest complete
Converting sample_data/esmini/fabriksgatan.xodr
[debug] final osm_root num nodes: 765
Quest complete
Converting sample_data/esmini/crest-curve.xodr
[debug] final osm_root num nodes: 1490
Quest complete
Converting sample_data/esmini/jolengatan.xodr
[debug] final osm_root num nodes: 494
Quest complete
Converting sample_data/esmini/straight_500m.xodr




[debug] final osm_root num nodes: 11
Quest complete
