In [1]:
# === Configuration ===
import struct
import pandas as pd
import numpy as np
from scipy.spatial import KDTree
import os

In [None]:
# === File Parsing Code ===
def parse_ideal_line(path):
    with open(path, "rb") as f:
        version = struct.unpack("<i", f.read(4))[0]
        if version != 7:
            raise ValueError(f"Unsupported spline version: {version}")

        point_count = struct.unpack("<i", f.read(4))[0]
        lap_time = struct.unpack("<i", f.read(4))[0]
        sample_count = struct.unpack("<i", f.read(4))[0]

        # AiPoint: position (vec3), length, id
        points = []
        for _ in range(point_count):
            x, y, z = struct.unpack("<fff", f.read(12))
            length = struct.unpack("<f", f.read(4))[0]
            point_id = struct.unpack("<i", f.read(4))[0]
            points.append([x, y, z, length, point_id])

        extra_count = struct.unpack("<i", f.read(4))[0]
        if extra_count != point_count:
            raise ValueError("Mismatch between point count and extra data count.")

        # AiPointExtra: 18 floats = 72 bytes
        extras = []
        for _ in range(extra_count):
            data = struct.unpack("<" + "f" * 18, f.read(72))
            extras.append(list(data))

    columns = [
        "x", "y", "z", "length", "id",
        "speed", "gas", "brake", "obsolete_lat_g", "radius",
        "side_left", "side_right", "camber", "direction",
        "normal_x", "normal_y", "normal_z",
        "extra_length",
        "forward_x", "forward_y", "forward_z",
        "tag", "grade"
    ]

    df = pd.DataFrame([p + e for p, e in zip(points, extras)], columns=columns)
    return df

def parse_kn5_road_vertices(kn5_path, road_keywords=None):
    """
    Extracts road-related mesh vertices from a .kn5 file.
    Filters meshes by material or shader names using keywords.
    """
    if road_keywords is None:
        road_keywords = ["road", "asphalt", "track", "surface", "pitlane", "curb"]

    with open(kn5_path, "rb") as f:
        magic = f.read(6)
        version = struct.unpack("<I", f.read(4))[0]

        if version > 5:
            f.read(4)  # skip extra header if present

        # TEXTURES
        texture_count = struct.unpack("<i", f.read(4))[0]
        for _ in range(texture_count):
            f.read(4)  # texture type
            name_len = struct.unpack("<i", f.read(4))[0]
            f.read(name_len)
            tex_size = struct.unpack("<i", f.read(4))[0]
            f.read(tex_size)

        # MATERIALS
        material_count = struct.unpack("<i", f.read(4))[0]
        materials = []
        for _ in range(material_count):
            name_len = struct.unpack("<i", f.read(4))[0]
            name = f.read(name_len).decode("utf-8").lower()
            shader_len = struct.unpack("<i", f.read(4))[0]
            shader = f.read(shader_len).decode("utf-8").lower()
            f.read(2)  # unknown short
            if version > 4:
                f.read(4)
            prop_count = struct.unpack("<i", f.read(4))[0]
            for _ in range(prop_count):
                pname_len = struct.unpack("<i", f.read(4))[0]
                f.read(pname_len)
                f.read(4)
                f.read(36)
            sample_count = struct.unpack("<i", f.read(4))[0]
            for _ in range(sample_count):
                sname_len = struct.unpack("<i", f.read(4))[0]
                f.read(sname_len)
                f.read(4)
                tname_len = struct.unpack("<i", f.read(4))[0]
                f.read(tname_len)
            materials.append((name, shader))

        # MESHES
        mesh_vertices = []

        def matches_road(mat_name, shader_name):
            return any(k in mat_name for k in road_keywords) or any(k in shader_name for k in road_keywords)

        def read_string():
            strlen = struct.unpack("<i", f.read(4))[0]
            return f.read(strlen).decode("utf-8")

        def read_vec3():
            return struct.unpack("<3f", f.read(12))

        def read_node():
            node_type = struct.unpack("<i", f.read(4))[0]
            name = read_string()
            child_count = struct.unpack("<i", f.read(4))[0]
            f.read(1)

            if node_type == 1:  # Dummy node
                f.read(64)
            elif node_type in [2, 3]:  # Mesh or Animated Mesh
                f.read(3)
                vertex_count = struct.unpack("<i", f.read(4))[0]
                positions = []
                for _ in range(vertex_count):
                    pos = read_vec3()
                    f.read(12 + 8 + 12)  # skip normals, UVs, tangents
                    positions.append(pos)
                idx_count = struct.unpack("<i", f.read(4))[0]
                f.read(idx_count * 2)  # indices
                mat_id = struct.unpack("<i", f.read(4))[0]
                f.read(29 if node_type == 2 else 12)

                if 0 <= mat_id < len(materials):
                    mat_name, shader = materials[mat_id]
                    if matches_road(mat_name, shader):
                        mesh_vertices.extend(positions)

            for _ in range(child_count):
                read_node()

        read_node()

    return pd.DataFrame(mesh_vertices, columns=["x", "y", "z"])

def estimate_track_edges(kn5_path, spline_df, offset=6):
    """
    Projects points perpendicular to the racing line, snaps to nearest road mesh vertex.
    Returns left/right edge DataFrame with timestamp.
    """
    mesh_df = parse_kn5_road_vertices(kn5_path)
    mesh_tree = KDTree(mesh_df[["x", "z"]].values)

    left_pts = []
    right_pts = []

    for i in range(1, len(spline_df) - 1):
        x1, y1 = spline_df.iloc[i - 1][["x", "z"]]
        x2, y2 = spline_df.iloc[i + 1][["x", "z"]]
        dx, dy = x2 - x1, y2 - y1
        norm = np.hypot(dx, dy)
        if norm == 0:
            continue

        perp = np.array([-dy, dx]) / norm
        cx, cy = spline_df.iloc[i][["x", "z"]]
        left_query = np.array([cx, cy]) + perp * offset
        right_query = np.array([cx, cy]) - perp * offset

        _, left_idx = mesh_tree.query(left_query)
        _, right_idx = mesh_tree.query(right_query)

        left_pts.append(mesh_df.iloc[left_idx].values)
        right_pts.append(mesh_df.iloc[right_idx].values)

    return pd.DataFrame({
        "timestamp": np.arange(len(left_pts)),
        "left_x": [pt[0] for pt in left_pts],
        "left_y": [pt[1] for pt in left_pts],
        "left_z": [pt[2] for pt in left_pts],
        "right_x": [pt[0] for pt in right_pts],
        "right_y": [pt[1] for pt in right_pts],
        "right_z": [pt[2] for pt in right_pts],
    })

# === Pipeline Helpers ===
def find_kn5_file(track_path, track_name):
    """Find the .kn5 file that matches the track name inside the track folder."""
    expected_kn5 = f"{track_name}.kn5"
    kn5_path = os.path.join(track_path, expected_kn5)
    return kn5_path if os.path.isfile(kn5_path) else None

def find_ai_files(track_path):
    """Finds all valid layout pairs: (fast_lane.ai, ideal_line.ai)"""

    layouts = []

    # Case 1: single-layout in root (ai/ and data/ inside track root)
    root_fast = os.path.join(track_path, "ai", "fast_lane.ai")
    root_ideal = os.path.join(track_path, "data", "ideal_line.ai")
    if os.path.isfile(root_fast) and os.path.isfile(root_ideal):
        layouts.append((root_fast, root_ideal))

    # Case 2: multiple layouts in subfolders
    for sub in os.listdir(track_path):
        layout_path = os.path.join(track_path, sub)
        if not os.path.isdir(layout_path):
            continue

        fast_path = os.path.join(layout_path, "ai", "fast_lane.ai")
        ideal_path = os.path.join(layout_path, "data", "ideal_line.ai")

        if os.path.isfile(fast_path) and os.path.isfile(ideal_path):
            layouts.append((fast_path, ideal_path))

    return layouts

# === Per-track processor ===
def process_track(track_name, tracks_root, output_root):
    track_path = os.path.join(tracks_root, track_name)
    if not os.path.isdir(track_path):
        return

    kn5_path = find_kn5_file(track_path, track_name)
    if not kn5_path:
        print(f"No KN5 file for {track_name}, skipping...")
        return

    layouts = find_ai_files(track_path)
    if not layouts:
        print(f"No valid layouts found for {track_name}, skipping...")
        return

    track_output_dir = os.path.join(output_root, track_name)
    os.makedirs(track_output_dir, exist_ok=True)

    print(f"\nProcessing {track_name} with {len(layouts)} layout(s)...")

    for i, (fast_path, ideal_path) in enumerate(layouts):
        layout_name = f"layout{i+1}"
        layout_dir = os.path.join(track_output_dir, layout_name)
        os.makedirs(layout_dir, exist_ok=True)

        try:
            centerline_df = parse_ideal_line(fast_path)
            edges_df = estimate_track_edges(kn5_path, centerline_df, offset=6.0)

            ideal_df = parse_ideal_line(ideal_path)

            ideal_df.to_csv(os.path.join(layout_dir, "ideal_line.csv"), index=False)
            edges_df.to_csv(os.path.join(layout_dir, "track_edges.csv"), index=False)

            print(f"Saved {layout_name}.")
        except Exception as e:
            print(f"Failed to process {layout_name}: {e}")

    print(f"Finished {track_name}.")

# === All Tracks Entry Point ===
def process_all_tracks(tracks_root, output_root):
    for track_name in os.listdir(tracks_root):
        process_track(track_name, tracks_root, output_root)
    print("Finished all tracks.")

In [5]:
# Dataset Creation Pipeline.
TRACKS_ROOT = "../data/assetto_corsa_tracks"
OUTPUT_ROOT = "../data/extracted_track_data"

if __name__ == "__main__":
    process_all_tracks(TRACKS_ROOT, OUTPUT_ROOT)


Processing imola with 1 layout(s)...
Failed to process layout1: Unsupported spline version: 2
Finished imola.

Processing ks_barcelona with 2 layout(s)...
Saved layout1.
Saved layout2.
Finished ks_barcelona.

Processing ks_black_cat_county with 3 layout(s)...
Saved layout1.
Saved layout2.
Saved layout3.
Finished ks_black_cat_county.

Processing ks_brands_hatch with 2 layout(s)...
Saved layout1.
Saved layout2.
Finished ks_brands_hatch.
No valid layouts found for ks_drag, skipping...

Processing ks_highlands with 4 layout(s)...
Saved layout1.
Saved layout2.
Saved layout3.
Saved layout4.
Finished ks_highlands.

Processing ks_laguna_seca with 1 layout(s)...
Saved layout1.
Finished ks_laguna_seca.

Processing ks_monza66 with 3 layout(s)...
Failed to process layout1: single positional indexer is out-of-bounds
Failed to process layout2: single positional indexer is out-of-bounds
Failed to process layout3: single positional indexer is out-of-bounds
Finished ks_monza66.

Processing ks_nordschl