diff --git a/.claude/skills/benchmark/SKILL.md b/.claude/skills/benchmark/SKILL.md new file mode 100644 index 00000000..e95ffe05 --- /dev/null +++ b/.claude/skills/benchmark/SKILL.md @@ -0,0 +1,479 @@ +--- +name: benchmark +description: Write benchmark scripts for EmbodiChain modules following project conventions +--- + +# EmbodiChain Benchmark Script Writer + +This skill guides you through writing well-structured benchmark scripts for EmbodiChain modules, covering performance measurement of solvers, samplers, metrics, and other computationally intensive components. + +## Usage + +Invoke this skill when: +- A user asks to write or extend a benchmark script for any EmbodiChain module +- Comparing CPU vs GPU implementations (e.g., Warp CUDA vs pure-Python) +- Measuring throughput of samplers, metrics, FK/IK solvers, or data pipelines +- The file path contains `scripts/benchmark/` or the word "benchmark" appears in the request + +## Key Conventions + +### File Location + +Place benchmark scripts under: + +``` +scripts/benchmark//.py +``` + +Examples: +- `scripts/benchmark/robotics/kinematic_solver/opw_solver.py` +- `scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py` + +### File Header + +Every benchmark file **must** begin with the Apache 2.0 copyright header followed by a module-level docstring: + +```python +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""One-line summary of what this benchmark measures. + +Longer description of the optimizations or comparisons being evaluated. +Run: python -m scripts.benchmark.. +""" +``` + +--- + +## Steps + +### 1. Identify What to Benchmark + +Ask yourself: +- **What implementations are being compared?** (e.g., Warp CUDA vs. CPU, vectorized vs. loop-based) +- **What is the primary metric?** (wall-clock time, mean error, throughput) +- **What sample sizes cover realistic usage?** Typically: `[100, 1000, 10000, 100000]` + +### 2. Structure the Script + +Use one helper function per concern, then a single orchestrator: + +``` +benchmark_() # e.g., benchmark_halton_sampler() +benchmark_() # e.g., benchmark_density_metric() +... +run_all_benchmarks() # calls all of the above + prints header/footer +``` + +### 3. Write Individual Benchmark Functions + +Each benchmark function follows this pattern: + +```python +def benchmark_(): + """One-line description of what is being measured.""" + from embodichain. import SomeClass, SomeCfg + + # --- Setup (not timed) --- + cfg = SomeCfg(...) + obj = cfg.init_solver(...) # or SomeClass(cfg) + + print("\n=== Benchmark ===") + for n in [100, 1000, 10000, 100000]: + # Prepare inputs (not timed) + inputs = ... + + # --- Timed block --- + start = time.perf_counter() + result = obj.compute(inputs) # or obj.get_ik(...) etc. + elapsed = time.perf_counter() - start + + print(f" n={n:>7d}: {elapsed*1000:>10.2f} ms (...)") +``` + +Key rules: +- Use `time.perf_counter()` for high-resolution wall-clock timing, **not** `time.time()`. +- Only time the core computation — exclude setup, data preparation, and print statements. +- Print results in milliseconds (`elapsed * 1000`) with consistent column alignment using `>` format specs. + +> **Exception**: When benchmarking GPU (Warp/CUDA) code alongside a CPU baseline, it is acceptable to use `time.time()` for coarser comparison timing, as seen in `opw_solver.py`. Prefer `time.perf_counter()` for CPU-only benchmarks. + +### 4. Comparing Two Implementations + +When the benchmark compares two backends (e.g., Warp CUDA vs. Python OPW): + +```python +def check_(solver_a, solver_b, n_samples=1000): + """Run both solvers and return timing + accuracy metrics.""" + # shared input generation + qpos = ... + + # --- Solver A (e.g., Warp CUDA) --- + start = time.time() + success_a, result_a = solver_a.get_ik(xpos, ...) + time_a = time.time() - start + t_err_a, r_err_a = get_poses_err(...) + + # --- Solver B (e.g., CPU) --- + start = time.time() + success_b, result_b = solver_b.get_ik(xpos, ...) + time_b = time.time() - start + t_err_b, r_err_b = get_poses_err(...) + + return time_a, t_err_a, r_err_a, time_b, t_err_b, r_err_b + + +def benchmark_(): + cfg = ... + solver_a = cfg.init_solver(device=torch.device("cuda"), ...) + solver_b = cfg.init_solver(device=torch.device("cpu"), ...) + + for n in [100, 1000, 10000, 100000]: + time_a, t_err_a, r_err_a, time_b, t_err_b, r_err_b = check_( + solver_a, solver_b, n_samples=n + ) + print(f"**** Test over {n} samples:") + print(f"===Impl A time: {time_a * 1000:.6f} ms") + print(f" Translation mean error: {t_err_a * 1000:.6f} mm") + print(f" Rotation mean error: {r_err_a * 180 / np.pi:.6f} degrees") + print(f"===Impl B time: {time_b * 1000:.6f} ms") + ... +``` + +### 5. Report Accuracy Alongside Speed + +For FK/IK solvers, always verify correctness by running FK on the IK output and measuring pose error: + +```python +def get_pose_err(matrix_a: np.ndarray, matrix_b: np.ndarray) -> tuple[float, float]: + """Return (translation_error_m, rotation_error_rad).""" + t_err = np.linalg.norm(matrix_a[:3, 3] - matrix_b[:3, 3]) + relative_rot = matrix_a[:3, :3].T @ matrix_b[:3, :3] + cos_angle = np.clip((np.trace(relative_rot) - 1) / 2.0, -1.0, 1.0) + r_err = np.arccos(cos_angle) + return t_err, r_err + + +def get_poses_err( + matrix_a_list: list[np.ndarray], matrix_b_list: list[np.ndarray] +) -> tuple[float, float]: + t_errs, r_errs = [], [] + for a, b in zip(matrix_a_list, matrix_b_list): + t, r = get_pose_err(a, b) + t_errs.append(t) + r_errs.append(r) + return float(np.mean(t_errs)), float(np.mean(r_errs)) +``` + +### 6. Handle Benchmarks That Require External Resources + +If a benchmark requires a live simulation, robot, or GPU device that may not be available, **skip gracefully** rather than raising an error: + +```python +def benchmark_batch_fk(): + """Benchmark batch FK (requires GPU robot setup).""" + print("\n=== Batch FK Benchmark (requires robot/simulation) ===") + print(" Skipped -- requires live SimulationManager and Robot.") + print(" To run manually, integrate with your robot setup:") + print(" analyzer.compute_workspace_points(joint_configs, batch_size=512)") +``` + +### 7. Write the Orchestrator + +```python +def run_all_benchmarks(): + """Run all benchmarks and print summary.""" + print("=" * 60) + print(" Performance Benchmarks") + print("=" * 60) + + benchmark_component_a() + benchmark_component_b() + # ... + + print("\n" + "=" * 60) + print("Benchmarks complete.") + print("=" * 60) + + +if __name__ == "__main__": + run_all_benchmarks() +``` + +### 8. Save Results to One Markdown Report (Required) + +Every benchmark script must write its final results to **one Markdown file** after execution. + +- Output directory recommendation: `outputs/benchmarks/` +- File naming recommendation: `_.md` +- Requirement: output **exactly three Markdown tables** in the report + 1. `Time & Memory` table (cost time + memory columns) + 2. `Success & Other Metrics` table (success rate + quality/accuracy/extra metrics) + 3. `Leaderboard` table (algorithm ranking by overall success rate, descending) +- `Leaderboard` coverage rule: include **all algorithms evaluated in the current benchmark scope**. If a provided leaderboard artifact is incomplete, backfill missing algorithms from aggregate summaries before rendering. + +Use this pattern: + +```python +from datetime import datetime +from pathlib import Path + + +def write_markdown_report( + benchmark_name: str, + perf_rows: list[dict[str, object]], + metric_rows: list[dict[str, object]], + leaderboard_rows: list[dict[str, object]], + notes: list[str] | None = None, +) -> Path: + """Write benchmark results into a single markdown report file.""" + output_dir = Path("outputs/benchmarks") + output_dir.mkdir(parents=True, exist_ok=True) + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = output_dir / f"{benchmark_name}_{ts}.md" + + lines: list[str] = [ + f"# {benchmark_name} Benchmark Report", + "", + f"Generated at: {datetime.now().isoformat(timespec='seconds')}", + "", + "## Time & Memory", + "", + ] + + if perf_rows: + perf_headers = list(perf_rows[0].keys()) + lines.append("| " + " | ".join(perf_headers) + " |") + lines.append("| " + " | ".join(["---"] * len(perf_headers)) + " |") + for row in perf_rows: + lines.append("| " + " | ".join(str(row[h]) for h in perf_headers) + " |") + else: + lines.append("No time/memory rows were produced.") + + lines.extend(["", "## Success & Other Metrics", ""]) + + if metric_rows: + metric_headers = list(metric_rows[0].keys()) + lines.append("| " + " | ".join(metric_headers) + " |") + lines.append("| " + " | ".join(["---"] * len(metric_headers)) + " |") + for row in metric_rows: + lines.append( + "| " + " | ".join(str(row[h]) for h in metric_headers) + " |" + ) + else: + lines.append("No success/metric rows were produced.") + + lines.extend(["", "## Leaderboard", ""]) + + if leaderboard_rows: + leaderboard_headers = list(leaderboard_rows[0].keys()) + lines.append("| " + " | ".join(leaderboard_headers) + " |") + lines.append("| " + " | ".join(["---"] * len(leaderboard_headers)) + " |") + for row in leaderboard_rows: + lines.append( + "| " + " | ".join(str(row[h]) for h in leaderboard_headers) + " |" + ) + else: + lines.append("No leaderboard rows were produced.") + + if notes: + lines.extend(["", "## Notes", ""]) + lines.extend([f"- {note}" for note in notes]) + + report_path.write_text("\\n".join(lines) + "\\n", encoding="utf-8") + return report_path +``` + +And call it at the end of `run_all_benchmarks()`: + +```python +def run_all_benchmarks() -> None: + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + perf_part, metric_part = benchmark_halton_sampler() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + perf_part, metric_part = benchmark_density_metric() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + # ... + + leaderboard_rows = build_leaderboard_rows(metric_rows) + # `build_leaderboard_rows` should aggregate per algorithm and sort by + # overall success rate in descending order. + + report_path = write_markdown_report( + benchmark_name="workspace_analyzer", + perf_rows=perf_rows, + metric_rows=metric_rows, + leaderboard_rows=leaderboard_rows, + notes=["CPU/GPU memory fields are deltas measured around timed calls."], + ) + print(f"Markdown report saved: {report_path}") +``` + +--- + +## Output Format Reference + +| Scenario | Print format | +|----------|-------------| +| Single implementation, many sizes | `n={n:>7d}: {elapsed*1000:>10.2f} ms \| CPU Δ={...:+.1f} MB GPU Δ={...:+.1f} MB peak GPU={...:.1f} MB` | +| Two implementations compared | `=== time: {ms:.6f} ms` then error & memory lines indented 3 spaces | +| Markdown report path | `Markdown report saved: outputs/benchmarks/_.md` | +| Markdown table 1 (Time & Memory) | `| sample_size | impl | cost_time_ms | cpu_delta_mb | gpu_delta_mb | peak_gpu_mb |` | +| Markdown table 2 (Success & Metrics) | `| sample_size | impl | success_rate | translation_err_mm | rotation_err_deg | ... |` | +| Markdown table 3 (Leaderboard) | `| rank | algorithm | overall_success_rate | ... |` (sorted by `overall_success_rate` descending) | +| Section header | `\n=== Benchmark ===` | +| Top-level separator | `"=" * 60` | + +--- + +## Measuring Memory Usage + +Always measure **both GPU VRAM and CPU RAM** alongside wall-clock time. Use the helpers below. + +### GPU VRAM (via PyTorch CUDA) + +```python +import torch + +def get_gpu_memory_mb() -> float: + """Return current GPU VRAM allocated by PyTorch in MB.""" + if torch.cuda.is_available(): + return torch.cuda.memory_allocated() / 1024 ** 2 + return 0.0 + +# Usage pattern inside a benchmark loop: +torch.cuda.reset_peak_memory_stats() # reset peak counter before timed block +mem_before = get_gpu_memory_mb() + +start = time.perf_counter() +result = obj.compute(inputs) +elapsed = time.perf_counter() - start + +mem_after = get_gpu_memory_mb() +peak_vram = torch.cuda.max_memory_allocated() / 1024 ** 2 # peak during timed block + +print( + f" n={n:>7d}: {elapsed*1000:>10.2f} ms | " + f"VRAM delta={mem_after - mem_before:+.1f} MB peak={peak_vram:.1f} MB" +) +``` + +### CPU RAM (via `psutil`) + +```python +import psutil, os + +def get_cpu_memory_mb() -> float: + """Return current process RSS (resident set size) in MB.""" + process = psutil.Process(os.getpid()) + return process.memory_info().rss / 1024 ** 2 + +# Usage pattern: +mem_before = get_cpu_memory_mb() + +start = time.perf_counter() +result = obj.compute(inputs) +elapsed = time.perf_counter() - start + +mem_after = get_cpu_memory_mb() + +print( + f" n={n:>7d}: {elapsed*1000:>10.2f} ms | " + f"RAM delta={mem_after - mem_before:+.1f} MB" +) +``` + +### Combined Helper (recommended) + +For benchmarks that use both CPU and GPU, combine into a single snapshot: + +```python +import os, psutil, torch + +def memory_snapshot() -> dict: + """Return a dict with current CPU RSS and GPU allocated memory in MB.""" + process = psutil.Process(os.getpid()) + cpu_mb = process.memory_info().rss / 1024 ** 2 + gpu_mb = torch.cuda.memory_allocated() / 1024 ** 2 if torch.cuda.is_available() else 0.0 + return {"cpu_mb": cpu_mb, "gpu_mb": gpu_mb} + +# Usage: +torch.cuda.reset_peak_memory_stats() +before = memory_snapshot() + +start = time.perf_counter() +result = obj.compute(inputs) +elapsed = time.perf_counter() - start + +after = memory_snapshot() +peak_gpu = torch.cuda.max_memory_allocated() / 1024 ** 2 + +print( + f" n={n:>7d}: {elapsed*1000:>10.2f} ms | " + f"CPU Δ={after['cpu_mb'] - before['cpu_mb']:+.1f} MB " + f"GPU Δ={after['gpu_mb'] - before['gpu_mb']:+.1f} MB peak GPU={peak_gpu:.1f} MB" +) +``` + +> Add `psutil` to the project's dev-dependencies if not already present (`pip install psutil`). + +--- + +## Common Imports + +```python +import os +import time +import psutil +import numpy as np +import torch +import warp as wp # only when GPU kernels are benchmarked +from scipy.spatial.transform import Rotation # only when needed +from typing import Tuple, List # or use built-in generics (Python ≥ 3.10) +``` + +--- + +## Quick Checklist + +Before finishing a benchmark script: + +- [ ] Apache 2.0 copyright header is present +- [ ] Module-level docstring with `Run:` line +- [ ] Each function has a one-line docstring +- [ ] Setup code is **outside** the timed block +- [ ] Timing uses `time.perf_counter()` (or `time.time()` when comparing GPU/CPU coarsely) +- [ ] CPU RAM measured with `psutil` (delta MB before/after timed block) +- [ ] GPU VRAM measured with `torch.cuda.memory_allocated()` + `torch.cuda.max_memory_allocated()` (delta + peak) +- [ ] `torch.cuda.reset_peak_memory_stats()` called before each timed block +- [ ] Accuracy metrics reported alongside timing (for solver benchmarks) +- [ ] Graceful skip for benchmarks that need unavailable hardware +- [ ] `run_all_benchmarks()` orchestrator with formatted separators +- [ ] Results are written to exactly one Markdown report file per run +- [ ] Report contains exactly three Markdown tables: `Time & Memory`, `Success & Other Metrics`, and `Leaderboard` +- [ ] `Time & Memory` table includes `cost_time_ms`, `cpu_delta_mb`, `gpu_delta_mb`, `peak_gpu_mb` +- [ ] `Success & Other Metrics` table includes `success_rate` and domain-specific quality metrics +- [ ] `Leaderboard` table ranks algorithms by overall success rate in descending order +- [ ] `Leaderboard` table includes all benchmarked algorithms (missing entries are backfilled from aggregate summaries if needed) +- [ ] Console log includes final report path +- [ ] `if __name__ == "__main__":` entry point +- [ ] `black .` formatting applied diff --git a/embodichain/lab/sim/solvers/base_solver.py b/embodichain/lab/sim/solvers/base_solver.py index 40c61af5..98b84807 100644 --- a/embodichain/lab/sim/solvers/base_solver.py +++ b/embodichain/lab/sim/solvers/base_solver.py @@ -313,12 +313,32 @@ def set_qpos_limits( ) return False - self.lower_qpos_limits = torch.tensor( - lower_qpos_limits, dtype=float, device=self.device - ) - self.upper_qpos_limits = torch.tensor( - upper_qpos_limits, dtype=float, device=self.device - ) + if isinstance(lower_qpos_limits, list) or isinstance( + lower_qpos_limits, np.ndarray + ): + self.lower_qpos_limits = torch.tensor( + lower_qpos_limits, dtype=float, device=self.device + ) + elif isinstance(lower_qpos_limits, torch.Tensor): + self.lower_qpos_limits = lower_qpos_limits.clone().to(device=self.device) + else: + logger.log_error( + f"Invalid type for lower_qpos_limits: {type(lower_qpos_limits)}. Must be list, np.ndarray, or torch.Tensor." + ) + + if isinstance(upper_qpos_limits, list) or isinstance( + upper_qpos_limits, np.ndarray + ): + self.upper_qpos_limits = torch.tensor( + upper_qpos_limits, dtype=float, device=self.device + ) + elif isinstance(upper_qpos_limits, torch.Tensor): + self.upper_qpos_limits = upper_qpos_limits.clone().to(device=self.device) + else: + logger.log_error( + f"Invalid type for upper_qpos_limits: {type(upper_qpos_limits)}. Must be list, np.ndarray, or torch.Tensor." + ) + return True def get_qpos_limits(self) -> dict: diff --git a/scripts/benchmark/__main__.py b/scripts/benchmark/__main__.py index fb38235b..ee9eac0a 100644 --- a/scripts/benchmark/__main__.py +++ b/scripts/benchmark/__main__.py @@ -20,7 +20,7 @@ python -m scripts.benchmark rl --tasks push_cube --algorithms ppo --suite default python -m scripts.benchmark rl --rebuild-report-only - python -m scripts.benchmark robotics-kinematic-solver + python -m scripts.benchmark robotics-kinematic-solver -s pytorch """ from __future__ import annotations @@ -29,6 +29,22 @@ import sys +def _run_robotics_kinematic_solver_cli(args: argparse.Namespace) -> None: + """Run robotics kinematic solver benchmark with forwarded CLI args.""" + from scripts.benchmark.robotics.kinematic_solver.run_benchmark import ( + run_all_benchmarks, + ) + + run_all_benchmarks(selected_solvers=args.solvers) + + +def _run_rl_cli(_: argparse.Namespace) -> None: + """Run RL benchmark CLI entrypoint.""" + from scripts.benchmark.rl.run_benchmark import main as rl_main + + rl_main() + + def main() -> None: """Dispatch to the appropriate benchmark sub-command CLI.""" parser = argparse.ArgumentParser( @@ -42,20 +58,22 @@ def main() -> None: "rl", help="Run RL benchmark: train, evaluate, aggregate, and report results.", ) - from scripts.benchmark.rl.run_benchmark import main as rl_main - - rl_parser.set_defaults(func=rl_main) + rl_parser.set_defaults(func=_run_rl_cli) # -- robotics-kinematic-solver ------------------------------------------- robotics_ks_parser = subparsers.add_parser( "robotics-kinematic-solver", help="Benchmark the OPW kinematic solver (FK/IK accuracy and speed).", ) - from scripts.benchmark.robotics.kinematic_solver.opw_solver import ( - benchmark_opw_solver, + robotics_ks_parser.add_argument( + "--solvers", + "-s", + nargs="+", + choices=("opw", "pytorch", "all"), + default=["all"], + help="Solvers to benchmark. Use one or more of: opw, pytorch, all.", ) - - robotics_ks_parser.set_defaults(func=benchmark_opw_solver) + robotics_ks_parser.set_defaults(func=_run_robotics_kinematic_solver_cli) # -- Parse --------------------------------------------------------------- # If no sub-command is given, print help and exit. @@ -73,7 +91,7 @@ def main() -> None: original_argv = sys.argv sys.argv = subcommand_argv try: - known.func() + known.func(known) finally: sys.argv = original_argv else: diff --git a/scripts/benchmark/rl/reporting.py b/scripts/benchmark/rl/reporting.py index cfdd7a3c..635123df 100644 --- a/scripts/benchmark/rl/reporting.py +++ b/scripts/benchmark/rl/reporting.py @@ -16,6 +16,9 @@ from __future__ import annotations +import math +from collections import defaultdict +from datetime import datetime from pathlib import Path from typing import Any @@ -26,22 +29,81 @@ def _fmt(value: Any, digits: int = 3) -> str: return str(value) -def _group_aggregate_results_by_task( +def _safe_divide(numerator: float, denominator: float) -> float: + if denominator <= 0: + return float("nan") + return numerator / denominator + + +def _sortable_success_rate(item: dict[str, Any]) -> float: + value = float(item.get("avg_success_rate", float("nan"))) + if math.isnan(value): + return float("-inf") + return value + + +def _build_report_leaderboard_rows( + leaderboard: list[dict[str, Any]], aggregate_results: list[dict[str, Any]], -) -> dict[str, list[dict[str, Any]]]: - grouped: dict[str, list[dict[str, Any]]] = {} +) -> list[dict[str, Any]]: + """Build complete leaderboard rows and sort by overall success rate.""" + by_algorithm: dict[str, dict[str, Any]] = {} + for item in leaderboard: + algorithm = str(item.get("algorithm", "")) + if not algorithm: + continue + by_algorithm[algorithm] = dict(item) + + grouped_aggregate: dict[str, list[dict[str, Any]]] = defaultdict(list) for item in aggregate_results: - grouped.setdefault(item["task"], []).append(item) - for task_results in grouped.values(): - task_results.sort( - key=lambda item: ( - -float(item.get("final_success_rate_stable_mean", float("-inf"))), - -float(item.get("final_success_rate_mean", float("-inf"))), - float(item.get("steps_to_success_threshold_mean", float("inf"))), - item["algorithm"], - ) - ) - return dict(sorted(grouped.items())) + algorithm = str(item.get("algorithm", "")) + if not algorithm: + continue + grouped_aggregate[algorithm].append(item) + + for algorithm, items in grouped_aggregate.items(): + if algorithm in by_algorithm: + continue + + success_values = [ + float(entry["final_success_rate_mean"]) + for entry in items + if isinstance(entry.get("final_success_rate_mean"), (int, float)) + and not math.isnan(float(entry["final_success_rate_mean"])) + ] + stable_success_values = [ + float(entry["final_success_rate_stable_mean"]) + for entry in items + if isinstance(entry.get("final_success_rate_stable_mean"), (int, float)) + and not math.isnan(float(entry["final_success_rate_stable_mean"])) + ] + by_algorithm[algorithm] = { + "algorithm": algorithm, + "avg_success_rate": ( + sum(success_values) / len(success_values) + if success_values + else float("nan") + ), + "avg_success_rate_stable": ( + sum(stable_success_values) / len(stable_success_values) + if stable_success_values + else float("nan") + ), + "score": ( + sum(stable_success_values) / len(stable_success_values) + if stable_success_values + else float("nan") + ), + "tasks_covered": len(items), + } + + return sorted( + by_algorithm.values(), + key=lambda item: ( + -_sortable_success_rate(item), + str(item.get("algorithm", "")), + ), + ) def generate_markdown_report( @@ -52,13 +114,24 @@ def generate_markdown_report( protocol: dict[str, Any] | None, output_path: str | Path, ) -> Path: - """Write a markdown benchmark report to disk.""" + """Write a benchmark markdown report with exactly three tables.""" output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) + ordered_runs = sorted( + run_results, + key=lambda item: ( + str(item.get("task", "")), + str(item.get("algorithm", "")), + int(item.get("seed", 0)), + ), + ) + lines = [ "# RL Benchmark Report", "", + f"Generated at: {datetime.now().isoformat(timespec='seconds')}", + "", "## Benchmark Overview", "", ] @@ -80,175 +153,99 @@ def generate_markdown_report( ) lines.extend( [ - "## Leaderboard", + "## Time & Memory", "", - "| Rank | Algorithm | Score | Steps To Threshold (Sustained) | Success Rate Std | Avg Success Rate | Avg Stable Success Rate | Avg Final Reward | Tasks |", - "| ---: | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| task | algorithm | seed | cost_time_ms | cpu_delta_mb | gpu_delta_mb | peak_gpu_mb | training_fps | env_fps |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ] ) - for item in leaderboard: + for result in ordered_runs: + train_steps = float(result.get("train_steps", float("nan"))) + training_fps = float(result.get("training_fps", float("nan"))) + cost_time_ms = _safe_divide(train_steps, training_fps) * 1000.0 lines.append( - "| {rank} | {algorithm} | {score} | {steps} | {std} | {success} | {stable_success} | {reward} | {tasks} |".format( - rank=item["rank"], - algorithm=item["algorithm"], - score=_fmt(item.get("score", float("nan"))), - steps=_fmt(item.get("steps_to_success_threshold", float("nan"))), - std=_fmt(item.get("success_rate_std", float("nan"))), - success=_fmt(item.get("avg_success_rate", float("nan"))), - stable_success=_fmt(item.get("avg_success_rate_stable", float("nan"))), - reward=_fmt(item.get("avg_final_reward", float("nan"))), - tasks=item.get("tasks_covered", 0), + "| {task} | {algorithm} | {seed} | {cost_time_ms} | {cpu_delta} | {gpu_delta} | {peak_gpu} | {train_fps} | {env_fps} |".format( + task=result["task"], + algorithm=result["algorithm"], + seed=result["seed"], + cost_time_ms=_fmt(cost_time_ms), + cpu_delta=_fmt(result.get("cpu_delta_mb", "n/a")), + gpu_delta=_fmt(result.get("gpu_delta_mb", "n/a")), + peak_gpu=_fmt(result.get("peak_gpu_memory_mb", float("nan"))), + train_fps=_fmt(result.get("training_fps", float("nan"))), + env_fps=_fmt(result.get("environment_fps", float("nan")), digits=2), ) ) lines.extend( [ "", - "## Aggregate Results", + "## Success & Other Metrics", "", - "| Task | Algorithm | Runs | Final Reward | Final Success Rate | Final Stable Success Rate | Training FPS | Env FPS |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", + "| task | algorithm | seed | success_rate | stable_success_rate | steps_to_threshold | first_hit | final_reward | final_episode_length |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ] ) - for item in aggregate_results: + for result in ordered_runs: lines.append( - "| {task} | {algorithm} | {num_runs} | {reward} | {success} | {stable_success} | {train_fps} | {env_fps} |".format( - task=item["task"], - algorithm=item["algorithm"], - num_runs=item["num_runs"], - reward=_fmt(item.get("final_reward_mean", float("nan"))), - success=_fmt(item.get("final_success_rate_mean", float("nan"))), + "| {task} | {algorithm} | {seed} | {success} | {stable_success} | {steps} | {first_hit} | {reward} | {episode_len} |".format( + task=result["task"], + algorithm=result["algorithm"], + seed=result["seed"], + success=_fmt(result.get("final_success_rate", float("nan"))), stable_success=_fmt( - item.get("final_success_rate_stable_mean", float("nan")) - ), - train_fps=_fmt(item.get("training_fps_mean", float("nan"))), - env_fps=_fmt(item.get("environment_fps_mean", float("nan"))), - ) - ) - - lines.extend( - [ - "", - "## Per-Task Comparison", - "", - "Each table compares different algorithms on the same task.", - "", - ] - ) - for task, task_results in _group_aggregate_results_by_task( - aggregate_results - ).items(): - lines.extend( - [ - f"### {task}", - "", - "| Algorithm | Runs | Final Stable Success Rate | Final Success Rate | Steps To Threshold (Sustained) | Success Rate Std | Final Reward | Training FPS | Env FPS |", - "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", - ] - ) - for item in task_results: - lines.append( - "| {algorithm} | {num_runs} | {stable_success} | {success} | {steps} | {std} | {reward} | {train_fps} | {env_fps} |".format( - algorithm=item["algorithm"], - num_runs=item["num_runs"], - stable_success=_fmt( - item.get("final_success_rate_stable_mean", float("nan")) - ), - success=_fmt(item.get("final_success_rate_mean", float("nan"))), - steps=_fmt( - item.get("steps_to_success_threshold_mean", float("nan")) - ), - std=_fmt(item.get("final_success_rate_std", float("nan"))), - reward=_fmt(item.get("final_reward_mean", float("nan"))), - train_fps=_fmt(item.get("training_fps_mean", float("nan"))), - env_fps=_fmt(item.get("environment_fps_mean", float("nan"))), - ) - ) - lines.append("") - - lines.extend( - [ - "", - "## Plots", - "", - ] - ) - for plot_name, plot_path in sorted(plot_artifacts.items()): - relative = Path(plot_path).relative_to(output.parent) - lines.append(f"### {plot_name}") - lines.append("") - lines.append(f"![{plot_name}]({relative.as_posix()})") - lines.append("") - lines.extend( - [ - "## Stability Analysis", - "", - "| Task | Algorithm | Success Rate Mean | Stable Success Rate Mean | Success Rate Std | Steps To Threshold Mean | First Hit Mean |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: |", - ] - ) - for item in aggregate_results: - lines.append( - "| {task} | {algorithm} | {mean_value} | {stable_mean} | {std_value} | {steps} | {first_hit} |".format( - task=item["task"], - algorithm=item["algorithm"], - mean_value=_fmt(item.get("final_success_rate_mean", float("nan"))), - stable_mean=_fmt( - item.get("final_success_rate_stable_mean", float("nan")) + result.get("final_success_rate_stable", float("nan")) ), - std_value=_fmt(item.get("final_success_rate_std", float("nan"))), - steps=_fmt(item.get("steps_to_success_threshold_mean", float("nan"))), + steps=_fmt(result.get("steps_to_success_threshold", float("nan"))), first_hit=_fmt( - item.get("steps_to_success_threshold_first_hit_mean", float("nan")) + result.get("steps_to_success_threshold_first_hit", float("nan")) ), + reward=_fmt(result.get("final_reward", float("nan"))), + episode_len=_fmt(result.get("final_episode_length", float("nan"))), ) ) + + leaderboard_by_success = _build_report_leaderboard_rows( + leaderboard=leaderboard, + aggregate_results=aggregate_results, + ) lines.extend( [ "", - "## System Performance", + "## Leaderboard", "", - "| Task | Algorithm | Training FPS | Env FPS | Peak GPU Memory (MB) |", - "| --- | --- | ---: | ---: | ---: |", + "| rank | algorithm | overall_success_rate | stable_success_rate | score | tasks_covered |", + "| ---: | --- | ---: | ---: | ---: | ---: |", ] ) - for item in aggregate_results: + for rank, item in enumerate(leaderboard_by_success, start=1): lines.append( - "| {task} | {algorithm} | {train_fps} | {env_fps} | {mem} |".format( - task=item["task"], - algorithm=item["algorithm"], - train_fps=_fmt(item.get("training_fps_mean", float("nan"))), - env_fps=_fmt(item.get("environment_fps_mean", float("nan"))), - mem=_fmt(item.get("peak_gpu_memory_mb_mean", float("nan"))), + "| {rank} | {algorithm} | {success} | {stable_success} | {score} | {tasks} |".format( + rank=rank, + algorithm=item.get("algorithm", "n/a"), + success=_fmt(item.get("avg_success_rate", float("nan"))), + stable_success=_fmt(item.get("avg_success_rate_stable", float("nan"))), + score=_fmt(item.get("score", float("nan"))), + tasks=item.get("tasks_covered", 0), ) ) - lines.extend( - [ - "", - "## Per-Run Results", - "", - "| Task | Algorithm | Seed | Final Reward | Final Success Rate | Final Stable Success Rate | Steps To Threshold | First Hit | Checkpoint |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | --- |", - ] - ) - for result in sorted( - run_results, key=lambda item: (item["task"], item["algorithm"], item["seed"]) - ): + + lines.extend(["", "## Notes", ""]) + if leaderboard_by_success: + top = leaderboard_by_success[0] lines.append( - "| {task} | {algorithm} | {seed} | {reward} | {success} | {stable_success} | {steps} | {first_hit} | `{checkpoint}` |".format( - task=result["task"], - algorithm=result["algorithm"], - seed=result["seed"], - reward=_fmt(result.get("final_reward", float("nan"))), - success=_fmt(result.get("final_success_rate", float("nan"))), - stable_success=_fmt( - result.get("final_success_rate_stable", float("nan")) - ), - steps=result.get("steps_to_success_threshold", "n/a"), - first_hit=result.get("steps_to_success_threshold_first_hit", "n/a"), - checkpoint=result.get("checkpoint_path", ""), - ) + "- Top algorithm by overall success rate: " + f"`{top.get('algorithm', 'n/a')}` " + f"(success_rate={_fmt(top.get('avg_success_rate', float('nan')))})." ) + if aggregate_results: + lines.append(f"- Aggregate summaries available: `{len(aggregate_results)}`.") + + if plot_artifacts: + lines.extend(["", "## Plots", ""]) + for plot_name, plot_path in sorted(plot_artifacts.items()): + relative = Path(plot_path).relative_to(output.parent) + lines.append(f"- {plot_name}: ![{plot_name}]({relative.as_posix()})") output.write_text("\n".join(lines) + "\n", encoding="utf-8") return output @@ -258,19 +255,26 @@ def generate_leaderboard_markdown( leaderboard: list[dict[str, Any]], output_path: str | Path, ) -> Path: - """Write a dedicated leaderboard markdown artifact.""" + """Write a dedicated leaderboard markdown artifact sorted by success rate.""" output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) + leaderboard_by_success = sorted( + leaderboard, + key=lambda item: ( + -_sortable_success_rate(item), + str(item.get("algorithm", "")), + ), + ) lines = [ "# Benchmark Leaderboard", "", "| Rank | Algorithm | Score | Steps To Threshold (Sustained) | Success Rate Std | Avg Success Rate | Avg Stable Success Rate | Avg Final Reward | Tasks |", "| ---: | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ] - for item in leaderboard: + for rank, item in enumerate(leaderboard_by_success, start=1): lines.append( "| {rank} | {algorithm} | {score} | {steps} | {std} | {success} | {stable_success} | {reward} | {tasks} |".format( - rank=item["rank"], + rank=rank, algorithm=item["algorithm"], score=_fmt(item.get("score", float("nan"))), steps=_fmt(item.get("steps_to_success_threshold", float("nan"))), diff --git a/scripts/benchmark/rl/run_benchmark.py b/scripts/benchmark/rl/run_benchmark.py index 1d8f3ed4..bd85e12f 100644 --- a/scripts/benchmark/rl/run_benchmark.py +++ b/scripts/benchmark/rl/run_benchmark.py @@ -14,6 +14,11 @@ # limitations under the License. # ---------------------------------------------------------------------------- +"""Run RL benchmark training/evaluation and generate one markdown report. + +Run: python -m scripts.benchmark.rl.run_benchmark +""" + from __future__ import annotations import argparse @@ -73,9 +78,16 @@ def main() -> None: if args.rebuild_report_only: run_results = runner.collect_existing_run_results() if not run_results: - raise SystemExit( - "No compatible existing benchmark results were found for the requested jobs." - ) + training_runs = runner.collect_existing_training_runs() + if training_runs: + run_results = runner.run_evaluation(training_runs) + else: + raise SystemExit( + "No compatible existing benchmark results were found for the requested jobs under " + f"{runner.output_root / 'runs'}. " + "Run once without --rebuild-report-only to generate artifacts, " + "or pass --output-root to the directory containing existing runs." + ) else: existing_results = ( runner.collect_existing_run_results() if args.skip_existing else [] @@ -87,7 +99,7 @@ def main() -> None: aggregate_result = runner.aggregate_results(run_results) leaderboard = runner.update_leaderboard(aggregate_result, run_results) report_path = runner.generate_report(run_results, aggregate_result, leaderboard) - print(f"Benchmark report written to: {report_path}") + print(f"Markdown report saved: {report_path}") if __name__ == "__main__": diff --git a/scripts/benchmark/rl/runner.py b/scripts/benchmark/rl/runner.py index 75913a2f..84dcda87 100644 --- a/scripts/benchmark/rl/runner.py +++ b/scripts/benchmark/rl/runner.py @@ -207,6 +207,17 @@ def collect_existing_run_results(self) -> list[dict[str, Any]]: results.append(record) return results + def collect_existing_training_runs(self) -> list[dict[str, Any]]: + """Load compatible existing training artifacts for the requested jobs.""" + records: list[dict[str, Any]] = [] + for task_name, algorithm_name, seed in self._iter_jobs(): + record = self._load_existing_training_record( + task_name, algorithm_name, seed + ) + if record is not None: + records.append(record) + return records + def merge_run_results( self, *result_sets: list[dict[str, Any]], diff --git a/scripts/benchmark/robotics/kinematic_solver/opw_solver.py b/scripts/benchmark/robotics/kinematic_solver/opw_solver.py deleted file mode 100644 index 78f7e3d7..00000000 --- a/scripts/benchmark/robotics/kinematic_solver/opw_solver.py +++ /dev/null @@ -1,166 +0,0 @@ -# ---------------------------------------------------------------------------- -# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ---------------------------------------------------------------------------- - -import torch -import numpy as np -import warp as wp -from scipy.spatial.transform import Rotation -from embodichain.lab.sim.solvers.opw_solver import OPWSolver, OPWSolverCfg -from typing import Tuple, List -import time - - -LOWER_LIMITS = [-2.618, 0.0, -2.967, -1.745, -1.22, -2.0944] -UPPER_LIMITS = [2.618, 3.14159, 0.0, 1.745, 1.22, 2.0944] - - -def get_pose_err(matrix_a: np.ndarray, matrix_b: np.ndarray) -> Tuple[float, float]: - t_err = np.linalg.norm(matrix_a[:3, 3] - matrix_b[:3, 3]) - relative_rot = matrix_a[:3, :3].T @ matrix_b[:3, :3] - cos_angle = (np.trace(relative_rot) - 1) / 2.0 - cos_angle = np.clip(cos_angle, -1.0, 1.0) - r_err = np.arccos(cos_angle) - return t_err, r_err - - -def get_poses_err( - matrix_a_list: List[np.ndarray], matrix_b_list: List[np.ndarray] -) -> Tuple[float, float]: - t_errs = [] - r_errs = [] - for mat_a, mat_b in zip(matrix_a_list, matrix_b_list): - t_err, r_err = get_pose_err(mat_a, mat_b) - t_errs.append(t_err) - r_errs.append(r_err) - return np.mean(t_errs), np.mean(r_errs) - - -def check_opw_solver(solver_warp, solver_py_opw, n_samples=1000): - DOF = 6 - qpos_np = np.random.uniform( - low=np.array(LOWER_LIMITS) - + 5.1 / 180.0 * np.pi, # add a margin to avoid sampling near the joint limits - high=np.array(UPPER_LIMITS) + -5.1 / 180.0 * np.pi, - size=(n_samples, DOF), - ).astype(float) - - qpos = torch.tensor(qpos_np, device=torch.device("cuda"), dtype=torch.float32) - xpos = solver_warp.get_fk(qpos) - qpos_seed = torch.tensor( - [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - device=torch.device("cuda"), - dtype=torch.float32, - ) - - warp_ik_start_time = time.time() - warp_ik_success, warp_ik_qpos = solver_warp.get_ik( - xpos, - qpos_seed=qpos_seed, - initial_guess=qpos, - # return_all_solutions=True, - ) - warp_cost_time = time.time() - warp_ik_start_time - - # TODO: debug code - # warp_ik_success_np = warp_ik_success.cpu().numpy() - # warp_ik_failure_indices = np.where(warp_ik_success_np == False)[0] - # if len(warp_ik_failure_indices) > 0: - # failure_qpos = qpos_np[warp_ik_failure_indices] - # failure_xpos = xpos.cpu().numpy()[warp_ik_failure_indices] - # print("=====warp_ik_failure_qpos:\n", repr(failure_qpos)) - # print("=====warp_ik_failure_xpos:\n", repr(failure_xpos)) - - # print("=====xpos:\n", repr(xpos.cpu().numpy())) - # print("=====warp_ik_qpos:\n", repr(warp_ik_qpos.cpu().numpy())) - # print("=====warp_ik_success:\n", repr(warp_ik_success.cpu().numpy())) - - check_xpos = solver_warp.get_fk(warp_ik_qpos) - warp_t_mean_err, warp_r_mean_err = get_poses_err( - [x.cpu().numpy() for x in xpos], - [x.cpu().numpy() for x in check_xpos], - ) - - py_opw_ik_start_time = time.time() - py_opw_ik_success, py_opw_ik_qpos = solver_py_opw.get_ik( - xpos, qpos_seed=qpos_seed, initial_guess=qpos - ) - py_opw_cost_time = time.time() - py_opw_ik_start_time - - check_xpos = solver_warp.get_fk(py_opw_ik_qpos.to(torch.device("cuda"))) - py_opw_t_mean_err, py_opw_r_mean_err = get_poses_err( - [x.cpu().numpy() for x in xpos], - [x.cpu().numpy() for x in check_xpos], - ) - - return ( - warp_cost_time, - warp_t_mean_err, - warp_r_mean_err, - py_opw_cost_time, - py_opw_t_mean_err, - py_opw_r_mean_err, - ) - - -def benchmark_opw_solver(): - cfg = OPWSolverCfg( - joint_names=("J1", "J2", "J3", "J4", "J5", "J6"), - user_qpos_limits=(LOWER_LIMITS, UPPER_LIMITS), - ) - cfg.a1 = 400.333 - cfg.a2 = -251.449 - cfg.b = 0.0 - cfg.c1 = 830 - cfg.c2 = 1177.556 - cfg.c3 = 1443.593 - cfg.c4 = 230 - cfg.offsets = ( - 0.0, - 82.21350356417211 * np.pi / 180.0, - -167.21710113148163 * np.pi / 180.0, - 0.0, - 0.0, - 0.0, - ) - cfg.flip_axes = (True, False, True, True, False, True) - cfg.has_parallelogram = False - - # TODO: Set pk_serial_chain to "" to ignore pk_serial_chain for OPW. - solver_warp = cfg.init_solver(device=torch.device("cuda"), pk_serial_chain="") - solver_py_opw = cfg.init_solver(device=torch.device("cpu"), pk_serial_chain="") - - n_samples = [100, 1000, 10000, 100000] - for n_sample in n_samples: - # check_opw_solver(solver_warp, solver_py_opw, device=device, n_samples=n_sample) - ( - warp_cost_time, - warp_t_mean_err, - warp_r_mean_err, - py_opw_cost_time, - py_opw_t_mean_err, - py_opw_r_mean_err, - ) = check_opw_solver(solver_warp, solver_py_opw, n_samples=n_sample) - print(f"*******warp cuda OPW Solver FK/IK test over {n_sample} samples:") - print(f"===Warp IK time: {warp_cost_time * 1000:.6f} ms") - print(f" Translation mean error: {warp_t_mean_err*1000:.6f} mm") - print(f" Rotation mean error: {warp_r_mean_err*180/np.pi:.6f} degrees") - print(f"===warp cpu IK time: {py_opw_cost_time * 1000:.6f} ms") - print(f" Translation mean error: {py_opw_t_mean_err*1000:.6f} mm") - print(f" Rotation mean error: {py_opw_r_mean_err*180/np.pi:.6f} degrees") - - -if __name__ == "__main__": - benchmark_opw_solver() diff --git a/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py b/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py new file mode 100644 index 00000000..3cf426f5 --- /dev/null +++ b/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py @@ -0,0 +1,713 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""Unified benchmark for OPW and Pytorch kinematic solvers. + +Measures IK wall-clock latency, pose accuracy, success rate, and memory usage +across OPW (Warp CUDA vs CPU) and Pytorch solver (CPU vs optional CUDA). +Run: python -m scripts.benchmark.robotics.kinematic_solver.run_benchmark +""" + +from __future__ import annotations + +import argparse +import os +import time +from datetime import datetime +from pathlib import Path + +import numpy as np +import psutil +import torch + +from embodichain.data import get_data_path +from embodichain.lab.sim.solvers.opw_solver import OPWSolverCfg +from embodichain.lab.sim.solvers.pytorch_solver import PytorchSolver, PytorchSolverCfg + +OPW_LOWER_LIMITS = [-2.618, 0.0, -2.967, -1.745, -1.22, -2.0944] +OPW_UPPER_LIMITS = [2.618, 3.14159, 0.0, 1.745, 1.22, 2.0944] +PYTORCH_LOWER_LIMITS = [-6.2832, -6.2832, -3.1416, -6.2832, -6.2832, -6.2832] +PYTORCH_UPPER_LIMITS = [6.2832, 6.2832, 3.1416, 6.2832, 6.2832, 6.2832] +SAMPLE_SIZES = [100, 1000, 10000] +SUPPORTED_SOLVERS = ("opw", "pytorch") + + +def _parse_args() -> argparse.Namespace: + """Parse command line arguments for selecting benchmark solvers.""" + parser = argparse.ArgumentParser( + description="Run kinematic solver benchmarks for selected solver backends." + ) + parser.add_argument( + "--solvers", + "-s", + nargs="+", + choices=(*SUPPORTED_SOLVERS, "all"), + default=["all"], + help=( + "Solvers to benchmark. Use one or more of: opw, pytorch, all. " + "Default: all" + ), + ) + return parser.parse_args() + + +def _normalize_selected_solvers(selected_solvers: list[str] | None) -> set[str]: + """Normalize selected solver names to a canonical set.""" + if not selected_solvers or "all" in selected_solvers: + return set(SUPPORTED_SOLVERS) + return {solver for solver in selected_solvers if solver in SUPPORTED_SOLVERS} + + +def _sync_cuda() -> None: + """Synchronize CUDA stream when available.""" + if torch.cuda.is_available(): + torch.cuda.synchronize() + + +def _reset_peak_gpu_memory() -> None: + """Reset PyTorch peak GPU memory stats when CUDA is available.""" + if torch.cuda.is_available(): + torch.cuda.reset_peak_memory_stats() + + +def _peak_gpu_memory_mb() -> float: + """Return peak GPU memory allocated by PyTorch in MB.""" + if not torch.cuda.is_available(): + return 0.0 + return torch.cuda.max_memory_allocated() / 1024**2 + + +def _memory_snapshot() -> dict[str, float]: + """Return current process memory usage snapshot in MB.""" + process = psutil.Process(os.getpid()) + cpu_mb = process.memory_info().rss / 1024**2 + gpu_mb = ( + torch.cuda.memory_allocated() / 1024**2 if torch.cuda.is_available() else 0.0 + ) + return {"cpu_mb": cpu_mb, "gpu_mb": gpu_mb} + + +def _format_markdown_table(rows: list[dict[str, object]]) -> list[str]: + """Format rows into a markdown table.""" + if not rows: + return ["No data."] + + headers = list(rows[0].keys()) + lines = [ + "| " + " | ".join(headers) + " |", + "| " + " | ".join(["---"] * len(headers)) + " |", + ] + for row in rows: + lines.append("| " + " | ".join(str(row[h]) for h in headers) + " |") + return lines + + +def _build_leaderboard_rows( + metric_rows: list[dict[str, object]], +) -> list[dict[str, object]]: + """Aggregate and rank algorithms by overall success rate.""" + aggregate: dict[str, dict[str, float]] = {} + for row in metric_rows: + impl = str(row["impl"]) + if impl not in aggregate: + aggregate[impl] = { + "success_sum": 0.0, + "t_err_sum": 0.0, + "r_err_sum": 0.0, + "count": 0.0, + } + + aggregate[impl]["success_sum"] += float(row["success_rate"]) + aggregate[impl]["t_err_sum"] += float(row["translation_err_mm"]) + aggregate[impl]["r_err_sum"] += float(row["rotation_err_deg"]) + aggregate[impl]["count"] += 1.0 + + ranked = sorted( + aggregate.items(), + key=lambda item: item[1]["success_sum"] / max(item[1]["count"], 1.0), + reverse=True, + ) + + leaderboard_rows: list[dict[str, object]] = [] + for rank, (algorithm, stats) in enumerate(ranked, start=1): + count = max(stats["count"], 1.0) + leaderboard_rows.append( + { + "rank": rank, + "algorithm": algorithm, + "overall_success_rate": f"{stats['success_sum'] / count:.2%}", + "avg_translation_err_mm": f"{stats['t_err_sum'] / count:.6f}", + "avg_rotation_err_deg": f"{stats['r_err_sum'] / count:.6f}", + } + ) + return leaderboard_rows + + +def _write_markdown_report( + benchmark_name: str, + perf_rows: list[dict[str, object]], + metric_rows: list[dict[str, object]], + leaderboard_rows: list[dict[str, object]], + notes: list[str] | None = None, +) -> Path: + """Write benchmark results to a markdown report with three tables.""" + output_dir = Path("outputs/benchmarks") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = output_dir / f"{benchmark_name}_{timestamp}.md" + + lines: list[str] = [ + f"# {benchmark_name} Benchmark Report", + "", + f"Generated at: {datetime.now().isoformat(timespec='seconds')}", + "", + "## Time & Memory", + "", + ] + lines.extend(_format_markdown_table(perf_rows)) + lines.extend(["", "## Success & Other Metrics", ""]) + lines.extend(_format_markdown_table(metric_rows)) + + lines.extend(["", "## Leaderboard", ""]) + lines.extend(_format_markdown_table(leaderboard_rows)) + + if notes: + lines.extend(["", "## Notes", ""]) + lines.extend([f"- {note}" for note in notes]) + + report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return report_path + + +def get_pose_err( + matrix_a: np.ndarray | torch.Tensor, + matrix_b: np.ndarray | torch.Tensor, +) -> tuple[torch.Tensor, torch.Tensor]: + """Return translation and rotation errors between paired poses. + + Supports either a single 4x4 pose or a batch with shape (N, 4, 4). + """ + tensor_a = torch.as_tensor(matrix_a, dtype=torch.float64) + tensor_b = torch.as_tensor(matrix_b, dtype=torch.float64, device=tensor_a.device) + + if tensor_a.ndim == 2: + tensor_a = tensor_a.unsqueeze(0) + if tensor_b.ndim == 2: + tensor_b = tensor_b.unsqueeze(0) + + t_err = torch.linalg.norm(tensor_a[:, :3, 3] - tensor_b[:, :3, 3], dim=-1) + + relative_rot = torch.matmul( + tensor_a[:, :3, :3].transpose(-1, -2), + tensor_b[:, :3, :3], + ) + trace = torch.diagonal(relative_rot, dim1=-2, dim2=-1).sum(dim=-1) + cos_angle = torch.clamp((trace - 1.0) / 2.0, min=-1.0, max=1.0) + r_err = torch.arccos(cos_angle) + return t_err, r_err + + +def _timed_ik_call( + solver, xpos: torch.Tensor, qpos_seed: torch.Tensor, initial_guess: torch.Tensor +) -> tuple[float, dict[str, float], float, torch.Tensor, torch.Tensor]: + """Run a timed IK call and return elapsed seconds, memory deltas, and outputs.""" + _reset_peak_gpu_memory() + mem_before = _memory_snapshot() + _sync_cuda() + + start = time.perf_counter() + ik_success, ik_qpos = solver.get_ik( + xpos, + qpos_seed=qpos_seed, + initial_guess=initial_guess, + ) + _sync_cuda() + elapsed = time.perf_counter() - start + + mem_after = _memory_snapshot() + deltas = { + "cpu_mb": mem_after["cpu_mb"] - mem_before["cpu_mb"], + "gpu_mb": mem_after["gpu_mb"] - mem_before["gpu_mb"], + } + return elapsed, deltas, _peak_gpu_memory_mb(), ik_success, ik_qpos + + +def _init_pytorch_solver(device: torch.device) -> PytorchSolver: + """Initialize Pytorch kinematic solver on the target device.""" + solver_cfg = PytorchSolverCfg( + urdf_path=get_data_path("UniversalRobots/UR10/UR10.urdf"), + end_link_name="ee_link", + root_link_name="base_link", + joint_names=["J1", "J2", "J3", "J4", "J5", "J6"], + user_qpos_limits=[PYTORCH_LOWER_LIMITS, PYTORCH_UPPER_LIMITS], + ) + return PytorchSolver(solver_cfg, device=device) + + +def _sample_qpos( + n_samples: int, + lower_limits: list[float], + upper_limits: list[float], + margin: float, + device: torch.device, + dtype: torch.dtype, +) -> torch.Tensor: + """Sample joint positions with margin from lower/upper limits.""" + qpos_np = np.random.uniform( + low=np.array(lower_limits) + margin, + high=np.array(upper_limits) - margin, + size=(n_samples, 6), + ).astype(float) + return torch.tensor(qpos_np, device=device, dtype=dtype) + + +def _timed_pytorch_ik_call( + solver: PytorchSolver, + fk_xpos: torch.Tensor, + qpos_seed: torch.Tensor, +) -> tuple[float, dict[str, float], float, torch.Tensor, torch.Tensor]: + """Run a timed Pytorch IK call and return elapsed/memory/outputs.""" + _reset_peak_gpu_memory() + mem_before = _memory_snapshot() + _sync_cuda() + + start = time.perf_counter() + ik_success, ik_qpos = solver.get_ik( + fk_xpos, + joint_seed=qpos_seed, + return_all_solutions=False, + ) + _sync_cuda() + elapsed = time.perf_counter() - start + + mem_after = _memory_snapshot() + deltas = { + "cpu_mb": mem_after["cpu_mb"] - mem_before["cpu_mb"], + "gpu_mb": mem_after["gpu_mb"] - mem_before["gpu_mb"], + } + return elapsed, deltas, _peak_gpu_memory_mb(), ik_success, ik_qpos[:, 0, :] + + +def check_opw_solver( + solver_warp, solver_py_opw, n_samples: int = 1000 +) -> dict[str, float]: + """Run Warp and CPU OPW IK/FK checks and return timing, memory, and accuracy.""" + dof = 6 + qpos_np = np.random.uniform( + low=np.array(OPW_LOWER_LIMITS) + + 5.1 / 180.0 * np.pi, # add a margin to avoid sampling near the joint limits + high=np.array(OPW_UPPER_LIMITS) + -5.1 / 180.0 * np.pi, + size=(n_samples, dof), + ).astype(float) + + qpos_cuda = torch.tensor(qpos_np, device=torch.device("cuda"), dtype=torch.float32) + xpos_cuda = solver_warp.get_fk(qpos_cuda) + qpos_seed = torch.tensor( + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + device=torch.device("cuda"), + dtype=torch.float32, + ) + + ( + warp_elapsed, + warp_mem, + warp_peak_gpu, + warp_ik_success, + warp_ik_qpos, + ) = _timed_ik_call( + solver=solver_warp, + xpos=xpos_cuda, + qpos_seed=qpos_seed, + initial_guess=qpos_cuda, + ) + + check_xpos = solver_warp.get_fk(warp_ik_qpos) + warp_t_err, warp_r_err = get_pose_err(xpos_cuda, check_xpos) + warp_t_mean_err, warp_r_mean_err = ( + warp_t_err.mean().item(), + warp_r_err.mean().item(), + ) + + xpos_cpu = xpos_cuda.to(torch.device("cpu")) + qpos_seed_cpu = qpos_seed.to(torch.device("cpu")) + qpos_cpu = qpos_cuda.to(torch.device("cpu")) + + ( + cpu_elapsed, + cpu_mem, + cpu_peak_gpu, + py_opw_ik_success, + py_opw_ik_qpos, + ) = _timed_ik_call( + solver=solver_py_opw, + xpos=xpos_cpu, + qpos_seed=qpos_seed_cpu, + initial_guess=qpos_cpu, + ) + + check_xpos = solver_warp.get_fk(py_opw_ik_qpos.to(torch.device("cuda"))) + py_opw_t_err, py_opw_r_err = get_pose_err(xpos_cpu, check_xpos) + py_opw_t_mean_err, py_opw_r_mean_err = ( + py_opw_t_err.mean().item(), + py_opw_r_err.mean().item(), + ) + + warp_success_rate = float(warp_ik_success.float().mean().item()) + cpu_success_rate = float(py_opw_ik_success.float().mean().item()) + + return { + "warp_ms": warp_elapsed * 1000.0, + "warp_t_err_mm": warp_t_mean_err * 1000.0, + "warp_r_err_deg": warp_r_mean_err * 180.0 / np.pi, + "warp_success_rate": warp_success_rate, + "warp_cpu_delta_mb": warp_mem["cpu_mb"], + "warp_gpu_delta_mb": warp_mem["gpu_mb"], + "warp_peak_gpu_mb": warp_peak_gpu, + "cpu_ms": cpu_elapsed * 1000.0, + "cpu_t_err_mm": py_opw_t_mean_err * 1000.0, + "cpu_r_err_deg": py_opw_r_mean_err * 180.0 / np.pi, + "cpu_success_rate": cpu_success_rate, + "cpu_cpu_delta_mb": cpu_mem["cpu_mb"], + "cpu_gpu_delta_mb": cpu_mem["gpu_mb"], + "cpu_peak_gpu_mb": cpu_peak_gpu, + } + + +def benchmark_pytorch_solver() -> ( + tuple[list[dict[str, object]], list[dict[str, object]]] +): + """Benchmark Pytorch solver for CPU and optional CUDA implementations.""" + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + cpu_solver = _init_pytorch_solver(device=torch.device("cpu")) + has_cuda = torch.cuda.is_available() + cuda_solver = ( + _init_pytorch_solver(device=torch.device("cuda")) if has_cuda else None + ) + + print("\n=== Pytorch Kinematic Benchmark ===") + if not has_cuda: + print(" CUDA unavailable; CUDA benchmark is skipped.") + + for n_sample in SAMPLE_SIZES: + print(f"**** Test over {n_sample} samples:") + + qpos_cpu = _sample_qpos( + n_samples=n_sample, + lower_limits=PYTORCH_LOWER_LIMITS, + upper_limits=PYTORCH_UPPER_LIMITS, + margin=1e-1, + device=torch.device("cpu"), + dtype=torch.float64, + ) + fk_xpos_cpu = cpu_solver.get_fk(qpos_cpu) + ( + cpu_elapsed, + cpu_mem, + cpu_peak_gpu, + cpu_success, + cpu_ik_qpos, + ) = _timed_pytorch_ik_call(cpu_solver, fk_xpos_cpu, qpos_cpu) + check_xpos_cpu = cpu_solver.get_fk(cpu_ik_qpos) + cpu_t_err, cpu_r_err = get_pose_err(fk_xpos_cpu, check_xpos_cpu) + + cpu_result = { + "cost_time_ms": cpu_elapsed * 1000.0, + "cpu_delta_mb": cpu_mem["cpu_mb"], + "gpu_delta_mb": cpu_mem["gpu_mb"], + "peak_gpu_mb": cpu_peak_gpu, + "success_rate": float(cpu_success.float().mean().item()), + "translation_err_mm": cpu_t_err.mean().item() * 1000.0, + "rotation_err_deg": cpu_r_err.mean().item() * 180.0 / np.pi, + } + + perf_rows.append( + { + "sample_size": n_sample, + "impl": "pytorch_cpu", + "component": "pytorch_ik", + "cost_time_ms": f"{cpu_result['cost_time_ms']:.6f}", + "cpu_delta_mb": f"{cpu_result['cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{cpu_result['gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{cpu_result['peak_gpu_mb']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "pytorch_cpu", + "component": "pytorch_ik", + "success_rate": f"{cpu_result['success_rate']:.6f}", + "translation_err_mm": f"{cpu_result['translation_err_mm']:.6f}", + "rotation_err_deg": f"{cpu_result['rotation_err_deg']:.6f}", + } + ) + + print(f"===Pytorch CPU IK time: {cpu_result['cost_time_ms']:.6f} ms") + print(f" Translation mean error: {cpu_result['translation_err_mm']:.6f} mm") + print( + f" Rotation mean error: {cpu_result['rotation_err_deg']:.6f} degrees" + ) + print(f" Success rate: {cpu_result['success_rate'] * 100.0:.2f}%") + print( + " " + f"CPU Δ={cpu_result['cpu_delta_mb']:+.1f} MB " + f"GPU Δ={cpu_result['gpu_delta_mb']:+.1f} MB " + f"peak GPU={cpu_result['peak_gpu_mb']:.1f} MB" + ) + + if has_cuda and cuda_solver is not None: + qpos_cuda = qpos_cpu.to(torch.device("cuda")) + fk_xpos_cuda = cuda_solver.get_fk(qpos_cuda) + ( + cuda_elapsed, + cuda_mem, + cuda_peak_gpu, + cuda_success, + cuda_ik_qpos, + ) = _timed_pytorch_ik_call(cuda_solver, fk_xpos_cuda, qpos_cuda) + check_xpos_cuda = cuda_solver.get_fk(cuda_ik_qpos) + cuda_t_err, cuda_r_err = get_pose_err(fk_xpos_cuda, check_xpos_cuda) + + cuda_result = { + "cost_time_ms": cuda_elapsed * 1000.0, + "cpu_delta_mb": cuda_mem["cpu_mb"], + "gpu_delta_mb": cuda_mem["gpu_mb"], + "peak_gpu_mb": cuda_peak_gpu, + "success_rate": float(cuda_success.float().mean().item()), + "translation_err_mm": cuda_t_err.mean().item() * 1000.0, + "rotation_err_deg": cuda_r_err.mean().item() * 180.0 / np.pi, + } + + perf_rows.append( + { + "sample_size": n_sample, + "impl": "pytorch_cuda", + "component": "pytorch_ik", + "cost_time_ms": f"{cuda_result['cost_time_ms']:.6f}", + "cpu_delta_mb": f"{cuda_result['cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{cuda_result['gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{cuda_result['peak_gpu_mb']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "pytorch_cuda", + "component": "pytorch_ik", + "success_rate": f"{cuda_result['success_rate']:.6f}", + "translation_err_mm": f"{cuda_result['translation_err_mm']:.6f}", + "rotation_err_deg": f"{cuda_result['rotation_err_deg']:.6f}", + } + ) + + print(f"===Pytorch CUDA IK time: {cuda_result['cost_time_ms']:.6f} ms") + print( + f" Translation mean error: {cuda_result['translation_err_mm']:.6f} mm" + ) + print( + f" Rotation mean error: {cuda_result['rotation_err_deg']:.6f} degrees" + ) + print( + f" Success rate: {cuda_result['success_rate'] * 100.0:.2f}%" + ) + print( + " " + f"CPU Δ={cuda_result['cpu_delta_mb']:+.1f} MB " + f"GPU Δ={cuda_result['gpu_delta_mb']:+.1f} MB " + f"peak GPU={cuda_result['peak_gpu_mb']:.1f} MB" + ) + + return perf_rows, metric_rows + + +def benchmark_opw_solver() -> tuple[list[dict[str, object]], list[dict[str, object]]]: + """Benchmark OPW solver for multiple sample sizes.""" + if not torch.cuda.is_available(): + print("\n=== OPW Solver Benchmark ===") + print(" Skipped -- requires CUDA for Warp implementation comparison.") + return [], [ + { + "sample_size": "N/A", + "impl": "opw_solver", + "component": "opw_ik", + "success_rate": "N/A", + "other_metrics": "skipped: requires CUDA for Warp comparison", + } + ] + + cfg = OPWSolverCfg( + joint_names=("J1", "J2", "J3", "J4", "J5", "J6"), + user_qpos_limits=(OPW_LOWER_LIMITS, OPW_UPPER_LIMITS), + ) + cfg.a1 = 400.333 + cfg.a2 = -251.449 + cfg.b = 0.0 + cfg.c1 = 830 + cfg.c2 = 1177.556 + cfg.c3 = 1443.593 + cfg.c4 = 230 + cfg.offsets = ( + 0.0, + 82.21350356417211 * np.pi / 180.0, + -167.21710113148163 * np.pi / 180.0, + 0.0, + 0.0, + 0.0, + ) + cfg.flip_axes = (True, False, True, True, False, True) + cfg.has_parallelogram = False + + solver_warp = cfg.init_solver(device=torch.device("cuda"), pk_serial_chain="") + solver_py_opw = cfg.init_solver(device=torch.device("cpu"), pk_serial_chain="") + + print("\n=== OPW Solver Benchmark ===") + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + for n_sample in SAMPLE_SIZES: + result = check_opw_solver(solver_warp, solver_py_opw, n_samples=n_sample) + print(f"**** Test over {n_sample} samples:") + print(f"===Warp CUDA IK time: {result['warp_ms']:.6f} ms") + print(f" Translation mean error: {result['warp_t_err_mm']:.6f} mm") + print(f" Rotation mean error: {result['warp_r_err_deg']:.6f} degrees") + print(f" Success rate: {result['warp_success_rate'] * 100.0:.2f}%") + print( + " " + f"CPU Δ={result['warp_cpu_delta_mb']:+.1f} MB " + f"GPU Δ={result['warp_gpu_delta_mb']:+.1f} MB " + f"peak GPU={result['warp_peak_gpu_mb']:.1f} MB" + ) + print(f"===CPU OPW IK time: {result['cpu_ms']:.6f} ms") + print(f" Translation mean error: {result['cpu_t_err_mm']:.6f} mm") + print(f" Rotation mean error: {result['cpu_r_err_deg']:.6f} degrees") + print(f" Success rate: {result['cpu_success_rate'] * 100.0:.2f}%") + print( + " " + f"CPU Δ={result['cpu_cpu_delta_mb']:+.1f} MB " + f"GPU Δ={result['cpu_gpu_delta_mb']:+.1f} MB " + f"peak GPU={result['cpu_peak_gpu_mb']:.1f} MB" + ) + + perf_rows.append( + { + "sample_size": n_sample, + "impl": "opw_cuda", + "component": "opw_ik", + "cost_time_ms": f"{result['warp_ms']:.6f}", + "cpu_delta_mb": f"{result['warp_cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{result['warp_gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{result['warp_peak_gpu_mb']:.6f}", + } + ) + perf_rows.append( + { + "sample_size": n_sample, + "impl": "opw_cpu", + "component": "opw_ik", + "cost_time_ms": f"{result['cpu_ms']:.6f}", + "cpu_delta_mb": f"{result['cpu_cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{result['cpu_gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{result['cpu_peak_gpu_mb']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "opw_cuda", + "component": "opw_ik", + "success_rate": f"{result['warp_success_rate']:.6f}", + "translation_err_mm": f"{result['warp_t_err_mm']:.6f}", + "rotation_err_deg": f"{result['warp_r_err_deg']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "opw_cpu", + "component": "opw_ik", + "success_rate": f"{result['cpu_success_rate']:.6f}", + "translation_err_mm": f"{result['cpu_t_err_mm']:.6f}", + "rotation_err_deg": f"{result['cpu_r_err_deg']:.6f}", + } + ) + + return perf_rows, metric_rows + + +def run_all_benchmarks(selected_solvers: list[str] | None = None) -> None: + """Run unified OPW + Pytorch kinematic solver benchmarks.""" + solvers_to_run = _normalize_selected_solvers(selected_solvers) + + print("=" * 60) + print("Kinematic Solver Performance Benchmarks") + print("=" * 60) + + print("\nSelected solvers:", ", ".join(sorted(solvers_to_run))) + + print("\nConfiguration differences:") + print( + "- OPW solver: analytic OPW parameters via OPWSolverCfg with " + "opw-specific joint limits." + ) + print("- Pytorch solver: UR10 URDF-based PytorchSolver with " "UR10 joint limits.") + + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + if "opw" in solvers_to_run: + opw_perf_rows, opw_metric_rows = benchmark_opw_solver() + perf_rows.extend(opw_perf_rows) + metric_rows.extend(opw_metric_rows) + + if "pytorch" in solvers_to_run: + pytorch_perf_rows, pytorch_metric_rows = benchmark_pytorch_solver() + perf_rows.extend(pytorch_perf_rows) + metric_rows.extend(pytorch_metric_rows) + + leaderboard_rows = _build_leaderboard_rows(metric_rows) + + benchmark_name = "kinematic_solver" + + print("\n" + "=" * 60) + print("Benchmarks complete.") + print("=" * 60) + + report_path = _write_markdown_report( + benchmark_name=benchmark_name, + perf_rows=perf_rows, + metric_rows=metric_rows, + leaderboard_rows=leaderboard_rows, + notes=[ + "CPU/GPU memory fields are deltas measured around timed calls.", + "This report contains exactly three tables: Time & Memory, Success & Other Metrics, and Leaderboard.", + ] + + ( + [ + "OPW and Pytorch solvers use different initialization paths and different lower/upper joint limits." + ] + if solvers_to_run == set(SUPPORTED_SOLVERS) + else [] + ), + ) + print(f"Markdown report saved: {report_path}") + + +if __name__ == "__main__": + args = _parse_args() + run_all_benchmarks(selected_solvers=args.solvers) diff --git a/scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py b/scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py index bd6f3393..67185059 100644 --- a/scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py +++ b/scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py @@ -14,18 +14,142 @@ # limitations under the License. # ---------------------------------------------------------------------------- +from __future__ import annotations + """Benchmark script for workspace analyzer performance optimizations. Measures each optimization independently across multiple sample sizes. Run: python -m scripts.benchmark.workspace_analyzer.benchmark_workspace_analyzer """ +import os import time +from datetime import datetime +from pathlib import Path + import numpy as np +import psutil import torch +SAMPLE_SIZES_SMALL = [100, 1000, 10000, 50000] +SAMPLE_SIZES_MEDIUM = [1000, 10000, 100000, 500000] + + +def _sync_cuda() -> None: + """Synchronize CUDA stream when available.""" + if torch.cuda.is_available(): + torch.cuda.synchronize() + + +def _reset_peak_gpu_memory() -> None: + """Reset PyTorch peak GPU memory stats when CUDA is available.""" + if torch.cuda.is_available(): + torch.cuda.reset_peak_memory_stats() + + +def _peak_gpu_memory_mb() -> float: + """Return peak GPU memory allocated by PyTorch in MB.""" + if not torch.cuda.is_available(): + return 0.0 + return torch.cuda.max_memory_allocated() / 1024**2 + + +def _memory_snapshot() -> dict[str, float]: + """Return current process memory usage snapshot in MB.""" + process = psutil.Process(os.getpid()) + cpu_mb = process.memory_info().rss / 1024**2 + gpu_mb = ( + torch.cuda.memory_allocated() / 1024**2 if torch.cuda.is_available() else 0.0 + ) + return {"cpu_mb": cpu_mb, "gpu_mb": gpu_mb} + + +def _time_call(callable_fn) -> tuple[float, dict[str, float], float, object]: + """Time a callable and return elapsed seconds, memory deltas, and result.""" + _reset_peak_gpu_memory() + before = _memory_snapshot() + _sync_cuda() + + start = time.perf_counter() + result = callable_fn() + _sync_cuda() + elapsed = time.perf_counter() - start + + after = _memory_snapshot() + deltas = { + "cpu_mb": after["cpu_mb"] - before["cpu_mb"], + "gpu_mb": after["gpu_mb"] - before["gpu_mb"], + } + return elapsed, deltas, _peak_gpu_memory_mb(), result + + +def _format_perf_line( + n: int, + elapsed_s: float, + memory_delta: dict[str, float], + peak_gpu_mb: float, + extra_info: str, +) -> str: + """Format one benchmark output line with aligned fields.""" + return ( + f" n={n:>7d}: {elapsed_s * 1000:>10.2f} ms | " + f"CPU Δ={memory_delta['cpu_mb']:+.1f} MB " + f"GPU Δ={memory_delta['gpu_mb']:+.1f} MB " + f"peak GPU={peak_gpu_mb:.1f} MB" + (f" | {extra_info}" if extra_info else "") + ) + -def benchmark_halton_sampler(): +def _format_markdown_table(rows: list[dict[str, object]]) -> list[str]: + """Format rows into a markdown table.""" + if not rows: + return ["No data."] + + headers = list(rows[0].keys()) + lines = [ + "| " + " | ".join(headers) + " |", + "| " + " | ".join(["---"] * len(headers)) + " |", + ] + for row in rows: + lines.append("| " + " | ".join(str(row[h]) for h in headers) + " |") + return lines + + +def _write_markdown_report( + benchmark_name: str, + perf_rows: list[dict[str, object]], + metric_rows: list[dict[str, object]], + notes: list[str] | None = None, +) -> Path: + """Write benchmark results to a markdown report with two tables.""" + output_dir = Path("outputs/benchmarks") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = output_dir / f"{benchmark_name}_{timestamp}.md" + + lines: list[str] = [ + f"# {benchmark_name} Benchmark Report", + "", + f"Generated at: {datetime.now().isoformat(timespec='seconds')}", + "", + "## Time & Memory", + "", + ] + lines.extend(_format_markdown_table(perf_rows)) + lines.extend(["", "## Success & Other Metrics", ""]) + lines.extend(_format_markdown_table(metric_rows)) + + if notes: + lines.extend(["", "## Notes", ""]) + lines.extend([f"- {note}" for note in notes]) + + report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return report_path + + +def benchmark_halton_sampler() -> ( + tuple[list[dict[str, object]], list[dict[str, object]]] +): """Benchmark Halton sampler: vectorized vs loop-based.""" from embodichain.lab.sim.utility.workspace_analyzer.samplers.halton_sampler import ( HaltonSampler, @@ -45,14 +169,51 @@ def benchmark_halton_sampler(): ) print("\n=== Halton Sampler Benchmark ===") + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + for n in [100, 1000, 10000, 100000]: - start = time.perf_counter() - samples = sampler.sample(num_samples=n, bounds=bounds) - elapsed = time.perf_counter() - start - print(f" n={n:>7d}: {elapsed*1000:>10.2f} ms ({samples.shape})") + elapsed, mem_delta, peak_gpu, samples = _time_call( + lambda: sampler.sample(num_samples=n, bounds=bounds) + ) + elapsed_ms = elapsed * 1000.0 + print( + _format_perf_line( + n=n, + elapsed_s=elapsed, + memory_delta=mem_delta, + peak_gpu_mb=peak_gpu, + extra_info=f"shape={tuple(samples.shape)}", + ) + ) + + perf_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "halton_sampler", + "cost_time_ms": f"{elapsed_ms:.6f}", + "cpu_delta_mb": f"{mem_delta['cpu_mb']:.6f}", + "gpu_delta_mb": f"{mem_delta['gpu_mb']:.6f}", + "peak_gpu_mb": f"{peak_gpu:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "halton_sampler", + "success_rate": "N/A", + "other_metrics": f"shape={tuple(samples.shape)}", + } + ) + return perf_rows, metric_rows -def benchmark_density_metric(): + +def benchmark_density_metric() -> ( + tuple[list[dict[str, object]], list[dict[str, object]]] +): """Benchmark density metric: KDTree vs brute-force.""" from embodichain.lab.sim.utility.workspace_analyzer.metrics.density_metric import ( DensityMetric, @@ -65,19 +226,51 @@ def benchmark_density_metric(): metric = DensityMetric(config) print("\n=== Density Metric Benchmark ===") - for n in [100, 1000, 10000, 50000]: + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + for n in SAMPLE_SIZES_SMALL: points = np.random.randn(n, 3).astype(np.float32) * 0.5 - start = time.perf_counter() - result = metric.compute(points) - elapsed = time.perf_counter() - start + elapsed, mem_delta, peak_gpu, result = _time_call( + lambda: metric.compute(points) + ) + elapsed_ms = elapsed * 1000.0 print( - f" n={n:>7d}: {elapsed*1000:>10.2f} ms " - f"(mean_density={result['mean_density']:.2f})" + _format_perf_line( + n=n, + elapsed_s=elapsed, + memory_delta=mem_delta, + peak_gpu_mb=peak_gpu, + extra_info=f"mean_density={result['mean_density']:.2f}", + ) + ) + + perf_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "density_metric", + "cost_time_ms": f"{elapsed_ms:.6f}", + "cpu_delta_mb": f"{mem_delta['cpu_mb']:.6f}", + "gpu_delta_mb": f"{mem_delta['gpu_mb']:.6f}", + "peak_gpu_mb": f"{peak_gpu:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "density_metric", + "success_rate": "N/A", + "other_metrics": f"mean_density={result['mean_density']:.6f}", + } ) + return perf_rows, metric_rows -def benchmark_voxelization(): + +def benchmark_voxelization() -> tuple[list[dict[str, object]], list[dict[str, object]]]: """Benchmark voxelization: np.unique vs dict-based.""" from embodichain.lab.sim.utility.workspace_analyzer.metrics.reachability_metric import ( ReachabilityMetric, @@ -90,19 +283,57 @@ def benchmark_voxelization(): metric = ReachabilityMetric(config) print("\n=== Voxelization Benchmark ===") - for n in [1000, 10000, 100000, 500000]: + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + for n in SAMPLE_SIZES_MEDIUM: points = np.random.randn(n, 3).astype(np.float32) * 0.5 - start = time.perf_counter() - result = metric.compute(points) - elapsed = time.perf_counter() - start + elapsed, mem_delta, peak_gpu, result = _time_call( + lambda: metric.compute(points) + ) + elapsed_ms = elapsed * 1000.0 print( - f" n={n:>7d}: {elapsed*1000:>10.2f} ms " - f"(volume={result['volume']:.4f}, voxels={result['num_voxels']})" + _format_perf_line( + n=n, + elapsed_s=elapsed, + memory_delta=mem_delta, + peak_gpu_mb=peak_gpu, + extra_info=( + f"volume={result['volume']:.4f}, " f"voxels={result['num_voxels']}" + ), + ) ) + perf_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "voxelization", + "cost_time_ms": f"{elapsed_ms:.6f}", + "cpu_delta_mb": f"{mem_delta['cpu_mb']:.6f}", + "gpu_delta_mb": f"{mem_delta['gpu_mb']:.6f}", + "peak_gpu_mb": f"{peak_gpu:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "voxelization", + "success_rate": "N/A", + "other_metrics": ( + f"volume={result['volume']:.6f}, num_voxels={result['num_voxels']}" + ), + } + ) + + return perf_rows, metric_rows + -def benchmark_manipulability(): +def benchmark_manipulability() -> ( + tuple[list[dict[str, object]], list[dict[str, object]]] +): """Benchmark manipulability: batch vs per-sample.""" from embodichain.lab.sim.utility.workspace_analyzer.metrics.manipulability_metric import ( ManipulabilityMetric, @@ -115,20 +346,54 @@ def benchmark_manipulability(): metric = ManipulabilityMetric(config) print("\n=== Manipulability Metric Benchmark ===") - for n in [100, 1000, 10000, 50000]: + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + for n in SAMPLE_SIZES_SMALL: points = np.random.randn(n, 3).astype(np.float32) * 0.5 jacobians = np.random.randn(n, 6, 6).astype(np.float32) * 0.1 - start = time.perf_counter() - result = metric.compute(points, jacobians=jacobians) - elapsed = time.perf_counter() - start + elapsed, mem_delta, peak_gpu, result = _time_call( + lambda: metric.compute(points, jacobians=jacobians) + ) + elapsed_ms = elapsed * 1000.0 print( - f" n={n:>7d}: {elapsed*1000:>10.2f} ms " - f"(mean_manip={result['mean_manipulability']:.6f})" + _format_perf_line( + n=n, + elapsed_s=elapsed, + memory_delta=mem_delta, + peak_gpu_mb=peak_gpu, + extra_info=f"mean_manip={result['mean_manipulability']:.6f}", + ) + ) + + perf_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "manipulability_metric", + "cost_time_ms": f"{elapsed_ms:.6f}", + "cpu_delta_mb": f"{mem_delta['cpu_mb']:.6f}", + "gpu_delta_mb": f"{mem_delta['gpu_mb']:.6f}", + "peak_gpu_mb": f"{peak_gpu:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n, + "impl": "workspace_analyzer", + "component": "manipulability_metric", + "success_rate": "N/A", + "other_metrics": ( + f"mean_manipulability={result['mean_manipulability']:.6f}" + ), + } ) + return perf_rows, metric_rows + -def benchmark_batch_fk(): +def benchmark_batch_fk() -> tuple[list[dict[str, object]], list[dict[str, object]]]: """Benchmark batch FK vs sequential FK (requires GPU robot setup). This benchmark requires a running simulation with a robot. @@ -138,9 +403,18 @@ def benchmark_batch_fk(): print(" Skipped -- requires live SimulationManager and Robot.") print(" To run manually, integrate with your robot setup:") print(" analyzer.compute_workspace_points(joint_configs, batch_size=512)") - - -def benchmark_batch_ik(): + return [], [ + { + "sample_size": "N/A", + "impl": "workspace_analyzer", + "component": "batch_fk", + "success_rate": "N/A", + "other_metrics": "skipped: requires live SimulationManager and Robot", + } + ] + + +def benchmark_batch_ik() -> tuple[list[dict[str, object]], list[dict[str, object]]]: """Benchmark batch IK vs sequential IK (requires GPU robot setup). This benchmark requires a running simulation with a robot. @@ -150,25 +424,65 @@ def benchmark_batch_ik(): print(" Skipped -- requires live SimulationManager and Robot.") print(" To run manually, integrate with your robot setup:") print(" analyzer.compute_reachability(cartesian_points, batch_size=512)") - - -def run_all_benchmarks(): + return [], [ + { + "sample_size": "N/A", + "impl": "workspace_analyzer", + "component": "batch_ik", + "success_rate": "N/A", + "other_metrics": "skipped: requires live SimulationManager and Robot", + } + ] + + +def run_all_benchmarks() -> None: """Run all benchmarks and print summary.""" print("=" * 60) print("Workspace Analyzer Performance Benchmarks") print("=" * 60) - benchmark_halton_sampler() - benchmark_density_metric() - benchmark_voxelization() - benchmark_manipulability() - benchmark_batch_fk() - benchmark_batch_ik() + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + perf_part, metric_part = benchmark_halton_sampler() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + + perf_part, metric_part = benchmark_density_metric() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + + perf_part, metric_part = benchmark_voxelization() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + + perf_part, metric_part = benchmark_manipulability() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + + perf_part, metric_part = benchmark_batch_fk() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) + + perf_part, metric_part = benchmark_batch_ik() + perf_rows.extend(perf_part) + metric_rows.extend(metric_part) print("\n" + "=" * 60) print("Benchmarks complete.") print("=" * 60) + report_path = _write_markdown_report( + benchmark_name="workspace_analyzer", + perf_rows=perf_rows, + metric_rows=metric_rows, + notes=[ + "CPU/GPU memory fields are deltas measured around timed calls.", + "This report contains exactly two tables: Time & Memory, and Success & Other Metrics.", + ], + ) + print(f"Markdown report saved: {report_path}") + if __name__ == "__main__": run_all_benchmarks() diff --git a/tests/benchmark/test_reporting.py b/tests/benchmark/test_reporting.py index feb53274..55784b11 100644 --- a/tests/benchmark/test_reporting.py +++ b/tests/benchmark/test_reporting.py @@ -88,18 +88,10 @@ def test_generate_markdown_report_writes_expected_sections(tmp_path): {"device": "cpu", "iterations": 10}, output_path, ) - report = output_path.read_text(encoding="utf-8") assert "RL Benchmark Report" in report assert "Benchmark Overview" in report assert "Leaderboard" in report assert "Plots" in report - assert "Stability Analysis" in report - assert "System Performance" in report - assert "Aggregate Results" in report - assert "Per-Task Comparison" in report - assert "Per-Run Results" in report - assert "Final Stable Success Rate" in report - assert "Each table compares different algorithms on the same task." in report assert "cart_pole" in report assert "grpo" in report