Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions examples/so_arm100_smolvla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""End-to-end SO-ARM100 + LeRobot SmolVLA pipeline.

What this does, top to bottom:

1. Build a SOARM100Adapter from a LeRobot calibration file
2. Export the LeRobot SmolVLA base checkpoint to an ONNX bundle, embedding
the SO-ARM100 calibration so the runtime can stream commands to the arm
3. Verify the export matches the original PyTorch policy on a small LIBERO
sample (signed parity cert lands in ./verify_output/parity.cert.json)
4. Serve the bundle to a physical SO-ARM100 over a USB serial port

The full chain mirrors the README's `Quickstart` exactly — only the
embodiment + hardware port flags are new. Everything else (export, verify,
serve) is the same Reflex surface you'd use for a Franka or UR5.

Hardware requirements:
- SO-ARM100 assembled per https://github.com/TheRobotStudio/SO-ARM100
- 6x Feetech STS3215 servos wired in the canonical 1..6 ID order
- USB-to-serial bridge on /dev/ttyUSB0 (Linux) or /dev/tty.usbserial-* (Mac)

Python requirements:
pip install 'reflex-vla[serve,gpu,monolithic,lerobot]' # GPU host
pip install 'reflex-vla[serve,onnx,lerobot,so100]' # Mac / Pi at the arm

Calibration:
If you already have a LeRobot calibration file (recorded via
`lerobot-calibrate --robot so_follower`), point CAL_PATH at it.
Otherwise, run:
reflex calibrate so_arm100 default --output calib.json
and replace with a real calibration before running on hardware.

Usage:
python examples/so_arm100_smolvla.py

To run only one phase, set the SO_ARM100_PHASE env var to
"export", "verify", or "serve".
"""
from __future__ import annotations

import os
import sys
from pathlib import Path

from tether.embodiments.so_arm100 import SOARM100Adapter

# ─── Config ─────────────────────────────────────────────────────────────────

MODEL_ID = os.environ.get("REFLEX_MODEL_ID", "lerobot/smolvla_base")
CAL_PATH = os.environ.get("CAL_PATH", "calib.json")
BUNDLE_DIR = os.environ.get("BUNDLE_DIR", "bundle/")
VERIFY_OUT = os.environ.get("VERIFY_OUT", "verify_output/")
PORT = os.environ.get("SO_ARM100_PORT", "/dev/ttyUSB0")
PHASE = os.environ.get("SO_ARM100_PHASE", "all").lower()
N_EPISODES = int(os.environ.get("VERIFY_EPISODES", "10"))


def build_adapter() -> SOARM100Adapter:
"""Load the LeRobot calibration into an SOARM100Adapter."""
if not Path(CAL_PATH).exists():
print(
f"[warn] {CAL_PATH} not found; using factory defaults. "
f"Generate one with `reflex calibrate so_arm100 default --output {CAL_PATH}` "
f"or import an existing LeRobot calibration with "
f"`reflex calibrate so_arm100 import <path>`."
)
return SOARM100Adapter.default(port=PORT)
return SOARM100Adapter.from_calibration(CAL_PATH, port=PORT)


def phase_export(adapter: SOARM100Adapter) -> None:
"""Run `reflex export` with the SO-ARM100 calibration embedded.

This drives the same code path as the CLI command:
reflex export lerobot/smolvla_base \
--output bundle/ \
--embodiment so_arm100 \
--calibration calib.json
"""
import subprocess
cmd = [
sys.executable, "-m", "reflex.cli", "export", MODEL_ID,
"--output", BUNDLE_DIR,
"--embodiment", "so_arm100",
]
if Path(CAL_PATH).exists():
cmd += ["--calibration", CAL_PATH]
print("[export]", " ".join(cmd))
subprocess.run(cmd, check=True)

# Sanity: the bundle should now carry our embodiment dir.
bundle_cal = Path(BUNDLE_DIR) / "embodiment" / "so_arm100" / "calibration.json"
assert bundle_cal.exists(), f"Export did not write {bundle_cal}"
print(f"[export] embodiment bundle written: {bundle_cal}")

# Confirm round-trip — load the bundle as if we were the serve runtime.
loaded = SOARM100Adapter.from_bundle(BUNDLE_DIR)
print(
f"[export] loaded back: "
f"{[j.name for j in loaded.config.joints]} ({loaded.action_dim}-DOF)"
)


def phase_verify() -> None:
"""Run `reflex verify` on the exported bundle.

The `--embodiment so_arm100` flag tells verify to record provenance for
the parity cert; the numerical gates are unchanged.
"""
import subprocess
cmd = [
sys.executable, "-m", "reflex.cli", "verify", BUNDLE_DIR,
"--num-episodes", str(N_EPISODES),
"--output", VERIFY_OUT,
"--embodiment", "so_arm100",
]
print("[verify]", " ".join(cmd))
rc = subprocess.run(cmd).returncode
if rc == 0:
print("[verify] PASS")
cert = Path(VERIFY_OUT) / "parity.cert.json"
if cert.exists():
print(f"[verify] signed parity cert: {cert}")
else:
print(f"[verify] verify exited with code {rc} — see {VERIFY_OUT}/PARITY.md")


def phase_serve(adapter: SOARM100Adapter) -> None:
"""Start `reflex serve` against the bundle on the SO-ARM100.

NOTE: this opens a live serial port + runs until killed. Don't fire-and-
forget in headless scripts; we exec the CLI directly so Ctrl+C reaches
the server.
"""
import os
print(
f"[serve] launching: reflex serve {BUNDLE_DIR} "
f"--embodiment so_arm100 --port {PORT}"
)
print("[serve] Ctrl+C to stop. Endpoints: /act /health /config")
os.execvp(
sys.executable,
[
sys.executable, "-m", "reflex.cli", "serve", BUNDLE_DIR,
"--embodiment", "so_arm100",
],
)


def main() -> None:
print(f"[so_arm100_smolvla] phase={PHASE}, model={MODEL_ID}, "
f"bundle={BUNDLE_DIR}, calibration={CAL_PATH}, port={PORT}")
adapter = build_adapter()
print(
f"[so_arm100_smolvla] adapter: {adapter.embodiment_name}, "
f"{adapter.action_dim}-DOF, source="
f"{adapter.config._source_path or '(default)'}"
)

if PHASE in ("all", "export"):
phase_export(adapter)
if PHASE in ("all", "verify"):
phase_verify()
if PHASE in ("all", "serve"):
phase_serve(adapter)


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@ tracing = [
"opentelemetry-sdk>=1.30",
"opentelemetry-exporter-otlp-proto-grpc>=1.30",
]
# LeRobot interop — installs `lerobot>=0.5` so the SO-ARM100 adapter
# (`reflex.embodiments.so_arm100`) can talk to the Feetech bus + import
# LeRobot calibration JSONs. Use this on the host wired to the arm:
#
# pip install 'reflex-vla[lerobot]'
# reflex serve bundle/ --embodiment so_arm100 --port /dev/ttyUSB0
#
# lerobot 0.5.x requires Python >=3.12 (we skip on 3.10/3.11 to avoid pip's
# ResolutionImpossible; users on JetPack 6 Python 3.10 can still construct an
# adapter + run the conversion math, they just can't open a serial bus until
# they upgrade to Python 3.12 or install scservo_sdk directly via the [so100]
# extra).
lerobot = [
"lerobot>=0.5.1; python_version >= '3.12'",
]
# SO-100 hardware extra — installs the scservo_sdk Python package on the
# Raspberry Pi / host wired to the arm. Separate from [lerobot] because the
# legacy auto_soarm path (used by the bench wizard) only needs the SDK, not
# the full LeRobot stack. The two extras compose: `pip install
# 'reflex-vla[so100,lerobot]'` gives you both code paths.
so100 = [
"scservo-sdk>=0.0.4",
]
# Real-Time Chunking adapter (B.3). Pulls in lerobot's RTC module so
# `tether serve --rtc` can wrap inference with the canonical RTCProcessor.
# Lerobot is heavy (torch + vision + datasets); only install when serving
Expand Down Expand Up @@ -309,4 +332,5 @@ testpaths = ["tests"]
asyncio_mode = "strict"
markers = [
"asyncio: mark a coroutine test for pytest-asyncio (strict mode)",
"hardware: requires a physical robot wired in; skipped unless RUN_HARDWARE_TESTS=1",
]
Loading
Loading