Skip to content

broker: unify worker request/response correlation (factor out the pending_snapshots pattern) #871

@willwashburn

Description

@willwashburn

Context

PR #870 (snapshot + dump-pty) introduced a new pattern in the broker: send a typed request to a wrapped worker over its JSON-over-stdio pipe, wait for a typed response back, route the response to whichever HTTP handler asked for it. That's different from the existing send_input / resize pattern, which is fire-and-forget — the broker forwards to the worker and immediately replies "ok" without waiting.

The snapshot implementation needed:

  • A pending_snapshots: HashMap<RequestId, oneshot::Sender<SnapshotResponse>> in the broker loop.
  • A Snapshot arm in ListenApiRequest that inserts the oneshot into the map keyed by a freshly minted request_id, then sends a snapshot_pty frame to the worker carrying that same request_id.
  • A snapshot_response arm in the worker-frame handler that pulls the oneshot back out of the map by request_id and fires it.
  • A timeout sweep on the reap tick (5s) that drops entries whose worker never replied.

This is ~80 lines of ceremony for one feature. The next feature that needs the same shape will reinvent the same machinery.

What needs it next

From #864 (view / drive / relay clients), almost every new route is request/response:

  • GET /api/spawned/{name}/mode — read the worker's current session mode → request/response
  • GET /api/spawned/{name}/pending — read queued relay messages → request/response
  • POST /api/spawned/{name}/flushcould be fire-and-forget but probably wants the drained-count back → request/response

That's three more pending_* maps if we keep going as-is.

Proposal

A single broker-side helper:

/// Send a request frame to a worker and await its response.
///
/// The caller picks a unique `kind` (e.g. "snapshot_pty"); the helper
/// mints a `request_id`, frames the request, parks the oneshot in a
/// shared pending map, and fires the timeout sweep. The response is
/// the JSON payload of the matching response frame.
async fn request_worker(
    state: &mut BrokerState,
    worker_name: &str,
    request_kind: &str,
    payload: serde_json::Value,
    timeout: Duration,
) -> Result<serde_json::Value, RequestWorkerError>;

enum RequestWorkerError {
    WorkerNotFound,
    Timeout,
    WorkerError { code: String, message: String },
    ChannelClosed,
}

Backed by one pending_requests: HashMap<RequestId, PendingRequest> map (replacing pending_snapshots and any future per-feature maps). A PendingRequest carries the oneshot sender + the deadline.

The worker-frame handler in main.rs grows one generic dispatch:

"snapshot_response" | "mode_response" | "pending_response" | ... => {
    if let Some(req_id) = frame.request_id.as_deref() {
        if let Some(pending) = pending_requests.remove(req_id) {
            let _ = pending.reply.send(Ok(frame.payload));
        }
    }
}

Or even cleaner: one generic response kind like "worker_response" with the original request kind echoed in the payload, so the broker doesn't need a per-feature arm at all — it just routes any frame carrying a request_id we know about.

Files to touch

  • src/main.rs — replace pending_snapshots with the generic pending_requests; add request_worker(...) helper; collapse the per-feature arms in the worker-frame handler.
  • src/listen_api.rs — update the snapshot handler to call request_worker instead of the bespoke Snapshot { ..., reply } variant. Variants for fire-and-forget operations (send_input, resize) stay as-is.
  • src/pty_worker.rs — workers respond with a consistent envelope ({ "kind": "...", "request_id": ..., "payload": {...}, "error": null | {code, message} }).
  • Tests: cover the timeout path (worker never responds), the not-found path (worker died mid-request), the error path (worker returns {error: {...}}).

Effort

Small. ~100 lines of refactor in main.rs + ~30 lines for the helper + ~50 lines of tests. Net likely negative once the existing pending_snapshots code is deleted.

Why now

Cost compounds. Doing this before #864 saves 3 more pending-maps + 3 more bespoke variants. Doing it now also means #864's view/drive/relay work can lean on one well-tested helper instead of three half-tested ones.

Out of scope

  • Fire-and-forget operations (send_input, resize) stay on their existing channel pattern — they don't need correlation. The helper is opt-in for the request/response cases.
  • This doesn't change the wire protocol externally; it's purely a broker-side refactor.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions