# Offline Problem Generation Tests

Use this notebook to exercise the automated offline-problem generation pipeline that lives in
`motion_planning.offline_problems.Main`. The cells below discover the available graph assets,
call the batch generator for a small configuration, and inspect the recorded CSV artifacts.

In [1]:
from __future__ import annotations

import os
from pathlib import Path

def find_repo_root(start: Path) -> Path:
    for candidate in (start, *start.parents):
        if (candidate / "requirements.txt").exists() and (candidate / "src" / "motion_planning").exists():
            return candidate
    raise RuntimeError(f"Could not locate repository root from {start}")

PROJECT_ROOT = find_repo_root(Path.cwd())
SRC_DIR = PROJECT_ROOT / "src"
RESULTS_DIR = PROJECT_ROOT / "results"

pythonpath = os.environ.get("PYTHONPATH", "")
if str(SRC_DIR) not in pythonpath.split(os.pathsep):
    os.environ["PYTHONPATH"] = str(SRC_DIR) + (os.pathsep + pythonpath if pythonpath else "")

PROJECT_ROOT, SRC_DIR, RESULTS_DIR

(PosixPath('/home/abdulrahman/competitive-online-algorithms-motion-planning'),
 PosixPath('/home/abdulrahman/competitive-online-algorithms-motion-planning/src'),
 PosixPath('/home/abdulrahman/competitive-online-algorithms-motion-planning/results'))

In [2]:
from pathlib import Path

graph_dir = RESULTS_DIR / "graphs"
available_graphs = sorted(graph_dir.glob("graph_*.gpickle"))
scenarios = [p.stem.replace("graph_", "", 1) for p in available_graphs]
scenarios

['Town01_T-Intersection',
 'Town03_Roundabout',
 'Town05_Highway1',
 'Town05_Highway2',
 'Town05_Highway3',
 'Town05_Multiple Intersections']

## Helper to run the offline generator

The function below mirrors the CLI interface from `Main.py` so we can iterate quickly from the
notebook without spawning a separate process.

In [3]:
from typing import Iterable, Sequence
from pathlib import Path
import sys
# add <project>/src to sys.path
sys.path.insert(0, str(Path.cwd().resolve().parents[1]))
# now imports work
from motion_planning.offline_problems import Main as offline_main

def run_offline_generation(
    scenarios: Iterable[str] | None = None,
    *,
    risk_budgets: Sequence[float] = (5.0,),
    risk_levels: Sequence[str] = ("low",),
    num_epochs: int = 3,
    overwrite: bool = False,
    log_level: str = "INFO",
) -> None:
    arg_list: list[str] = [
        "--graphs-dir",
        str(RESULTS_DIR / "graphs"),
        "--routes-dir",
        str(RESULTS_DIR / "routes"),
        "--risk-budgets",
        *[str(b) for b in risk_budgets],
        "--risk-levels",
        *[str(level) for level in risk_levels],
        "--num-epochs",
        str(num_epochs),
        "--log-level",
        log_level,
    ]

    if scenarios:
        arg_list.extend(["--scenarios", *list(scenarios)])
    if overwrite:
        arg_list.append("--overwrite")

    args = offline_main.parse_args(arg_list)
    offline_main.run_generation(args)

## Smoke test on a single scenario

Pick one of the available graphs (first one by default) and generate offline problems for a
reduced sweep (single budget + risk level) to keep the runtime short.

In [4]:
if not scenarios:
    raise RuntimeError("No graphs found in results/graphs. Generate graphs before running this test.")

sample_scenario = scenarios[0]
print(f"Running offline generation for: {sample_scenario}")
run_offline_generation(
    [sample_scenario],
    risk_budgets=(10.0,),
    risk_levels=("high",),
    num_epochs=5,
    overwrite=True,
    log_level="INFO",
)

Running offline generation for: Town01_T-Intersection
Now, starting generation an offline problem: to get all candidates and rb candidates.
goal indecies inside helper function:  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
False
generating risks!!!
now generating from v_0_0 to v_7_6!!!
risk cap is 1
Set parameter WLSAccessID
Set parameter WLSSecret
Set parameter LicenseID to value 2649771
Academic license 2649771 - for non-commercial use only - registered to ab___@gmail.com
path signature: (('v_0_0', 'v_1_4', 'v_2_5', 'v_3_5', 'v_4_6', 'v_5_6', 'v_6_6', 'v_7_6'), (30, 25, 30, 30, 30, 30, 30))
risk cap is 2
Set parameter WLSAccessID
Set parameter WLSSecret
Set parameter LicenseID to value 2649771
Academic license 2649771 - for non-commercial use only - registered to ab___@gmail.com
path signature: (('v_0_0', 'v_1_1', 'v_2_2', 'v_3_2', 'v_4_3', 'v_5_4', 'v_6_5', 'v_7_6'), (30, 30, 30, 30, 30, 30, 30))
risk cap is 3
Set parameter WLSAccessID
Set paramete

## Inspect the recorded artifacts

Load the most recent candidate CSV emitted by the recorder to verify the run above and inspect a
preview of the stored offline-problem data.

In [5]:
# from IPython.display import display
# import pandas as pd

# problem_dir = RESULTS_DIR / "data" / "offline problems" / "problem details"
# candidate_files = sorted(problem_dir.glob("*_candidates.csv"), key=lambda p: p.stat().st_mtime, reverse=True)

# if not candidate_files:
#     raise RuntimeError("No candidate CSVs found. Run the generation cell first.")

# latest_candidates = candidate_files[0]
# display(latest_candidates)
# pd.read_csv(latest_candidates).head()

In [6]:
# # Smoke test for generate_all_candidate_paths_under_budget
# import sys
# if 'SRC_DIR' not in globals():
#     raise RuntimeError('Run the initialization cell that defines SRC_DIR before executing this smoke test.')
# if str(SRC_DIR) not in sys.path:
#     sys.path.insert(0, str(SRC_DIR))
# from motion_planning.graph_construction.graph_assets import GRAPH_FILE_HANDLER
# from motion_planning.offline_problems.Main import build_goal_indices, adjusted_epoch_count
# from motion_planning.offline_problems.generate_candidates import generate_all_candidate_paths_under_budget
# from motion_planning.offline_problems.utils import (
#     get_decision_nodes,
#     get_start_goal_node_pairs,
# )

# if not scenarios:
#     raise RuntimeError("No planning graphs available in results/graphs.")
# sample_scenario = scenarios[0]
# graph_handler = GRAPH_FILE_HANDLER()
# graph = graph_handler.load_graph(RESULTS_DIR / "graphs", sample_scenario)
# reference_route = graph_handler.load_reference_path_csv(RESULTS_DIR / "routes", sample_scenario)

# goal_indices = build_goal_indices(graph)
# if len(goal_indices) < 5:
#     raise RuntimeError("Graph does not have enough layers for the smoke test.")
# num_epochs = adjusted_epoch_count(4, goal_indices)
# target_epoch_index = min(3, num_epochs)
# target_epoch_index = min(target_epoch_index, len(goal_indices) - 2)
# decision_epochs = [goal_indices[target_epoch_index]]
# decision_nodes = get_decision_nodes(graph, decision_epochs, reference_route)
# start_goal_pairs = get_start_goal_node_pairs(graph, decision_nodes)
# start_node, sub_goal_node = start_goal_pairs[0]
# print(f"Testing {sample_scenario}: {start_node.id} -> {sub_goal_node.id} (layer {sub_goal_node.layer})")

# risk_budget = 5.0
# all_candidates = generate_all_candidate_paths_under_budget(
#     graph=graph,
#     start_node=start_node,
#     sub_goal_node=sub_goal_node,
#     delta_risk=risk_budget,
#     max_simple_paths=100,
#     enable_dp=True,
# )

# print(f"Generated {len(all_candidates)} candidate paths under risk budget {risk_budget}.")
# for candidate in all_candidates[:3]:
#     if candidate.solution_edge_speeds:
#         path_nodes = [edge[0].id for edge in candidate.solution_edge_speeds]
#         path_nodes.append(candidate.solution_edge_speeds[-1][1].id)
#     else:
#         path_nodes = [start_node.id]
#     print(f"  risk={candidate.risk:.2f} cost={candidate.cost:.2f} path={path_nodes}")
# if len(all_candidates) > 3:
#     print("  ...")


In [7]:
all_candidates[48].risk_list

NameError: name 'all_candidates' is not defined