Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 222 additions & 1 deletion website/docs/tutorial/inference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,225 @@ title: Inference
sidebar_position: 3
---

## ...
# Inference

---

## Overview

After training, a policy is deployed as a **policy server** process that:

1. Receives an observation bundle (cameras + joint positions) packed as an Arrow IPC file
2. Runs the model
3. Returns a JSON action chunk (a sequence of 16-DOF joint positions)

The robot runtime is a [Dora](https://dora-rs.ai/) dataflow.

---

## Dataflow Architecture

```
Hardware
arm_right (250 Hz) ──┐
arm_left (250 Hz) ──┤
camera_wrist_right ──┤ observer ──► quittable-observer
camera_wrist_left ──┤ (30 Hz) │
camera_head ──┤ observation (Arrow IPC)
camera_ceiling ──┘ │
┌─────────────────────────┐
│ policy-server │ ← YOU WRITE THIS
│ (local or docker) │
└─────────────────────────┘
actions JSON
actions-executor
```

## The Policy Server Contract

This is the what one needs to implement when adapting to a new model.

### Transport

The node connects to a UNIX socket (path set via `$SOCKET`). The Dora node
`dora-openarm-local-policy-server` handles the socket I/O for you; your
model code lives in the process it launches.

For local development, the server listens on the socket:

```
SOCKET=/dev/shm/policy-server.socket dora run dataflow-inference.yaml --uv
```

### Observation Input

Each request arrives as a JSON line over the socket:

```json
{
"name": "inference",
"data_path": "/dev/shm/obs_12345.arrow",
"metadata": {
"timestamp": 1716000000123456789,
"camera_head.height": 600,
"camera_head.width": 960,
"camera_ceiling.height": 600,
"camera_ceiling.width": 960,
"camera_wrist_right.height": 600,
"camera_wrist_right.width": 960,
"camera_wrist_left.height": 600,
"camera_wrist_left.width": 960
}
}
```

Open the Arrow IPC file and parse it:

```python
import pyarrow as pa
import numpy as np

with pa.OSFile(request["data_path"], "rb") as f:
with pa.ipc.open_file(f) as reader:
observations = reader.get_batch(0).to_struct_array()

last = observations[-1]
metadata = request["metadata"]

# Camera frames — shape (H, W, 3), uint8
def read_camera(name):
return (
last[name].values.to_numpy(zero_copy_only=False)
.reshape(metadata[f"{name}.height"], metadata[f"{name}.width"], 3)
)

frames = {
"head": read_camera("camera_head"),
"ceiling": read_camera("camera_ceiling"),
"right_wrist": read_camera("camera_wrist_right"),
"left_wrist": read_camera("camera_wrist_left"),
}

# Joint positions — float32, shape (16,)
# Layout: right_arm[7] | right_gripper[1] | left_arm[7] | left_gripper[1]
pos_dim = len(last["position"])
qpos = (
last["position"].values.to_numpy(zero_copy_only=False)
.reshape(pos_dim)
Comment on lines +111 to +114
.astype(np.float32)
)
```
Comment on lines +84 to +117

### Action Output

Write a single JSON line back to the socket:

```json
{
"interval": 33333333,
"cutoff_hz": 5,
"positions": [
[q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, q10, q11, q12, q13, q14, q15],
...
]
}
```

| Field | Type | Meaning |
|---|---|---|
| `interval` | int (ns) | Time between consecutive position steps. `int(1e9 / 30)` ≈ 33 ms for 30 Hz |
| `cutoff_hz` | number, optional | Execute only the first N steps of the chunk before the next inference for receding horizon control |
| `positions` | `List[List[float]]` length T | Each inner list is 16 floats: `right_arm[7] + right_gripper[1] + left_arm[7] + left_gripper[1]` |

If you want to skip inference this tick (e.g. rate-limiting), return an empty positions list:

```json
{ "positions": [] }
```

---

We will provide more full-fledged policy server and dataflow examples soon.

---

## Dataflow YAML — Key Nodes

The three nodes you care about most in the YAML. Everything else (ticks, cameras,
arms) is boilerplate you copy verbatim.

```yaml
nodes:
# --- observer: collects arms + cameras into one Arrow IPC bundle ---
- id: observer
path: dora-openarm-observer
inputs:
tick: quittable-tick-observer/tick # 30 Hz gate
arm_right: arm-right/position
arm_left: arm-left/position
camera_wrist_right: camera-wrist-right/image
camera_wrist_left: camera-wrist-left/image
camera_head: camera-head/image
camera_ceiling: camera-ceiling/image
phase_classifier_result: phase-classifier/result
outputs:
- observation # Arrow IPC file path + metadata, written to /dev/shm

# --- quittable-observer: gate controlled by controller ---
- id: quittable-observer
path: dora-openarm-quitter
inputs:
command: controller/command
observation: observer/observation
outputs:
- observation

# --- policy-server: YOUR node ---
- id: policy-server
path: dora-openarm-local-policy-server
env:
SOCKET: /dev/shm/policy-server.socket # must match your running server
inputs:
observation: quittable-observer/observation
outputs:
- actions # JSON: {interval, cutoff_hz, positions: [[16D], ...]}

# --- actions-executor: unpacks the chunk and drives the arms ---
- id: actions-executor
path: dora-openarm-actions-executor
args: "--upsample-on --filter-on"
inputs:
actions: policy-server/actions
outputs:
- move_position_right # → arm-right
- move_position_left # → arm-left
```
Comment on lines +157 to +202

The `observation` output of `quittable-observer` is what your server receives as
`request["data_path"]`. The `actions` output is what you write back via the socket.

---

## Running

### Prerequisites

```bash
uv venv .venv-inference --python=3.12
. .venv-inference/bin/activate
uv pip install "dora-rs-cli==0.5.0" "dora-rs==0.5.0"
dora build inference/dataflow-inference.yaml --uv
```

### Local policy server (debug)

Start the policy server first, then run the dataflow:

```bash
SOCKET=/dev/shm/policy-server.socket \
dora run inference/dataflow-inference.yaml --uv
```