Skip to content

feat(broker): per-agent session mode + pending-queue routes (#864 sub-2)#884

Merged
willwashburn merged 3 commits into
mainfrom
feat/864-session-mode
May 18, 2026
Merged

feat(broker): per-agent session mode + pending-queue routes (#864 sub-2)#884
willwashburn merged 3 commits into
mainfrom
feat/864-session-mode

Conversation

@willwashburn
Copy link
Copy Markdown
Member

Summary

Adds per-worker SessionMode (relay | human, default relay) plus
four new broker HTTP routes that the upcoming agent-relay drive /
agent-relay relay clients (sub-PR 3) will call. When a worker is in
human mode, inbound relay messages — both /api/send and the
relaycast inbound feed — get parked in a per-worker FIFO pending queue
(cap 256, FIFO eviction with a tracing::warn!) instead of being
injected into the PTY. The four new routes are GET/PUT
/api/spawned/{name}/mode, GET /api/spawned/{name}/pending, and POST
/api/spawned/{name}/flush. Mode flips that transition human → relay
auto-drain the pending queue through the existing inject path before
returning, so callers can never strand messages by flipping back.

Part of #864. (sub-PR 1 is the agent-relay view client landing in
PR #880; sub-PRs 3 and 4 add drive / relay client verbs and will be
opened separately.)

In scope

  • New SessionMode enum and PendingRelayMessage / SessionState
    helpers in src/types.rs (cap = 256; SessionDispatch outcome
    enum; pure accept_inbound gating function).
  • Per-worker HashMap<String, SessionState> in src/main.rs,
    cleaned up on every worker teardown path (Release,
    relaycast_release, worker_exited, worker_permanently_dead,
    reap_exited).
  • Four new ListenApiRequest variants + Axum handlers in
    src/listen_api.rs, mirroring the existing snapshot route shape.
    Typed SessionRouteError for 404 mapping; PUT returns
    { "mode", "flushed" }, GET pending returns FIFO { "pending": [...] }.
  • Broker arms in src/main.rs that own the lookup / state mutation /
    auto-drain.
  • Inject-path gate at the two inbound choke points (ListenApiRequest::Send
    HTTP path and the relaycast inbound queue_and_try_delivery loop).
    Internal broker-driven injections (worker_ready initial task,
    continuity restore) bypass the gate intentionally.
  • Docs: new "Session mode" section + worked example in
    web/content/docs/reference-broker-api.mdx.

Out of scope (per the issue)

  • No agent-relay drive / agent-relay relay CLI clients — that's
    sub-PR 3.
  • No mode persistence: mode resets to relay on broker restart and
    the pending queue is dropped.
  • No telemetry events for mode changes (can add
    agent_session_mode_changed / agent_pending_drained later behind
    a follow-up issue if anyone wants them).
  • No relaycast notification that a message was queued vs injected —
    the sender's MCP tool result is unchanged either way.
  • No changes to the existing /api/send response shape; callers
    cannot tell whether the broker queued or injected.

Test plan

  • cargo fmt --check passes
  • cargo clippy --all-targets --all-features -- -D warnings passes
  • cargo test — 642 passing (up from 626 baseline), 0 failures
  • Unit tests for SessionState::accept_inbound covering relay
    pass-through, human-mode FIFO ordering, cap eviction, snapshot
    non-mutation, and wire-string round-trip.
  • Axum route tests for each of the four endpoints: success path,
    typed-error 404, PUT bad-body 400 short-circuits without
    enqueueing, and an auth-required smoke test covering all four
    routes.
  • Manual broker smoke run: spawn an agent, PUT human, send two
    messages, GET pending shows them, POST flush drains them,
    PUT relay returns flushed: 0 afterwards.

🤖 Generated with Claude Code

Add server-side `SessionMode` (relay | human) per worker and four new
HTTP routes that back the upcoming `agent-relay drive` client: GET/PUT
`/api/spawned/{name}/mode`, GET `/api/spawned/{name}/pending`, POST
`/api/spawned/{name}/flush`. When a worker is in human mode the broker
parks inbound relay messages (both `/api/send` and inbound relaycast)
in a per-worker FIFO pending queue (cap 256, FIFO eviction) instead of
injecting them; mode flips back to relay auto-drain the queue.

Part of #864.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: abe5a121-eaa0-4582-a113-ea2386b1d82a

📥 Commits

Reviewing files that changed from the base of the PR and between 38abde4 and 689eed0.

📒 Files selected for processing (1)
  • src/main.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/main.rs

📝 Walkthrough

Walkthrough

This PR adds session mode control to the relay broker, allowing inbound messages to be auto-injected (relay) or queued in a per-worker FIFO buffer (human) for later draining. It exposes GET/PUT /mode, GET /pending, and POST /flush endpoints, wires gating into HTTP and relaycast inbound paths, preserves full routing metadata for queued messages, and cleans up per-worker session state across lifecycle events.

Changes

Session Mode Feature

Layer / File(s) Summary
Session Mode Type System
src/types.rs
SessionMode enum (relay/human) with serde wire strings, PendingRelayMessage capturing full routing context plus queued_at_ms and optional event_id, SessionState FIFO queue with eviction (MAX_PENDING_PER_WORKER), SessionDispatch outcomes, and unit tests validating queue behavior.
Broker Session State Initialization & Lifecycle
src/main.rs
Adds session_states: HashMap<String, SessionState> and removes entries on HTTP release, relaycast release, worker exit, supervisor permanent death, and non-supervised exit to avoid stale per-worker mode/queue.
Session Gating Core Logic
src/main.rs
Implements GateOutcome, InboundContext, gate_inbound_for_session_mode to enqueue or allow injection, and inject_pending_relay_message to reinject queued messages using original routing fields; applies gating to HTTP send and relaycast inbound paths.
HTTP API Contract & Routing
src/listen_api.rs, src/main.rs
Adds ListenApiRequest variants GetSessionMode, SetSessionMode, GetPending, FlushPending, public SessionRouteError and SetSessionModeOk, wires four authenticated routes, validates/serializes payloads, forwards typed oneshot RPCs to broker, and maps route errors to HTTP responses.
Tests & Documentation
src/listen_api.rs, web/content/docs/reference-broker-api.mdx
Extends auth tests to cover new endpoints (forwarding, invalid-mode rejection, 404 mapping, pending serialization including optional event_id, flush response), and updates API docs with session-mode semantics, events, and a worked takeover example.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

  • AgentWorkforce/relay#873: Overlapping changes to ListenApiRequest and broker request plumbing used by the new session-mode endpoints.

Suggested reviewers

  • khaliqgant

Poem

🐰 I queued a whisper, soft and slow,
the human watch made messages grow.
Switch me back and they tumble through —
flushed and delivered, old becomes new.
Hop, inspect, then drain: a rabbit's view.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding per-agent session mode and pending-queue routes as part of issue #864 sub-2.
Description check ✅ Passed The description covers all required sections with clear summary, detailed scope/out-of-scope breakdown, and test plan with most items checked including cargo fmt/clippy and cargo test passing.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/864-session-mode

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c48e88488b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/main.rs Outdated
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment thread src/types.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/types.rs (1)

15-41: ⚡ Quick win

Add serde serialization round-trip test to verify consistency.

The dual wire-format approach (serde rename_all="snake_case" plus manual as_wire_str()/parse()) creates a maintainability risk: if the enum variants or serde configuration change, the manual methods could diverge from serde's serialized output. The existing test parse_round_trips_wire_strings only verifies the manual methods, not serde.

✅ Suggested test to verify serde consistency

Add this test to the session_tests module:

#[test]
fn serde_matches_wire_str() {
    // Verify serde serialization produces the same strings as as_wire_str()
    let relay_json = serde_json::to_string(&SessionMode::Relay).unwrap();
    assert_eq!(relay_json, r#""relay""#);
    assert_eq!(SessionMode::Relay.as_wire_str(), "relay");
    
    let human_json = serde_json::to_string(&SessionMode::Human).unwrap();
    assert_eq!(human_json, r#""human""#);
    assert_eq!(SessionMode::Human.as_wire_str(), "human");
    
    // Verify serde deserialization accepts the same strings as parse()
    let relay: SessionMode = serde_json::from_str(r#""relay""#).unwrap();
    assert_eq!(relay, SessionMode::Relay);
    assert_eq!(SessionMode::parse("relay"), Some(SessionMode::Relay));
    
    let human: SessionMode = serde_json::from_str(r#""human""#).unwrap();
    assert_eq!(human, SessionMode::Human);
    assert_eq!(SessionMode::parse("human"), Some(SessionMode::Human));
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/types.rs` around lines 15 - 41, Add a test in the session_tests module
that ensures serde's JSON serialization/deserialization of the SessionMode enum
matches the manual wire helpers: verify
serde_json::to_string(&SessionMode::Relay/ Human) equals the strings returned by
SessionMode::as_wire_str(), and verify
serde_json::from_str(r#""relay""#)/r#""human""#) produces the same variants as
SessionMode::parse("relay"/"human"); this keeps the serde rename_all behavior
and the manual as_wire_str()/parse() in sync.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/main.rs`:
- Around line 4198-4281: The queued PendingRelayMessage currently only stores
from/body/event_id, so flushes recreate deliveries with wrong
target/thread/workspace/priority/mode; extend PendingRelayMessage to include
original target, thread_id, workspace_id, priority, and MessageInjectionMode
(and any other delivery metadata), update gate_inbound_for_session_mode where
you build PendingRelayMessage (and SessionState::accept_inbound usage) to
populate these fields from the incoming delivery, and change
inject_pending_relay_message to pass those preserved values into
queue_and_try_delivery_raw (use the saved target instead of worker_name, supply
saved thread/workspace/priority/mode and preserve event_id) so replayed messages
match the original delivery exactly.

---

Nitpick comments:
In `@src/types.rs`:
- Around line 15-41: Add a test in the session_tests module that ensures serde's
JSON serialization/deserialization of the SessionMode enum matches the manual
wire helpers: verify serde_json::to_string(&SessionMode::Relay/ Human) equals
the strings returned by SessionMode::as_wire_str(), and verify
serde_json::from_str(r#""relay""#)/r#""human""#) produces the same variants as
SessionMode::parse("relay"/"human"); this keeps the serde rename_all behavior
and the manual as_wire_str()/parse() in sync.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: a56ff090-66f1-49fa-a6b9-01b8b99b867a

📥 Commits

Reviewing files that changed from the base of the PR and between df45489 and c48e884.

📒 Files selected for processing (4)
  • src/listen_api.rs
  • src/main.rs
  • src/types.rs
  • web/content/docs/reference-broker-api.mdx

Comment thread src/main.rs
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 4 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/main.rs">

<violation number="1" location="src/main.rs:4267">
P1: Flushed pending messages lose routing metadata (`target`, `thread_id`, `workspace_id`, `workspace_alias`). When messages queued during human mode are later injected, the agent receives them with `target` set to the worker's own name (instead of e.g. `#general`) and no thread/workspace context. This breaks in-thread replies and channel-aware agent logic. `PendingRelayMessage` should be extended to capture these fields at queue time so `inject_pending_relay_message` can faithfully reproduce the original delivery.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
Fix all with cubic | Re-trigger cubic

Comment thread src/main.rs
Two parallel changes to PR #884, bundled because they touch the same
queue + dispatch paths:

(1) Preserve full routing metadata across the human-mode queue.

P1 finding from review (cubic / Devin / CodeRabbit / Codex all
agreed): `PendingRelayMessage` only stored `from` / `body` /
`event_id`, so flushed messages were re-injected with `target` set
to the worker's own name and no thread / workspace / priority /
mode context. A `#general` message queued in human mode came back
as a direct-to-worker DM on drain. Channel-aware agent logic,
thread replies, and workspace attribution all broke silently.

Extends `PendingRelayMessage` with `target`, `thread_id`,
`workspace_id`, `workspace_alias`, `priority`, and `mode`. New
`InboundContext<'_>` bundles the args into one call so both gate
sites (`/api/send` and the inbound-relaycast feed) capture the
same context the existing `queue_and_try_delivery_raw` would have
seen at non-queued delivery time.

`inject_pending_relay_message` now reads target / thread /
workspace / priority / mode straight off the queued
`PendingRelayMessage` — a drained message is byte-for-byte
equivalent to the original delivery.

`GET /api/spawned/{name}/pending` surfaces the new fields too;
docs page (`web/content/docs/reference-broker-api.mdx`) updated
with the expanded JSON shape.

(2) Add telemetry events for mode change / pending drain / queue.

Three new event kinds, all routed through the existing `send_event`
path so they appear on `/ws` like every other broker event:

  - `delivery_queued` — per-message event when an inbound is
    parked in the human-mode queue instead of injected. Payload:
    `event_id`, `from`, `target`, `reason: "session_mode_human"`.
  - `agent_session_mode_changed` — fires when the mode flips via
    `PUT /api/spawned/{name}/mode`. Payload: `previous_mode`,
    `mode`.
  - `agent_pending_drained` — fires when the queue is drained
    (auto-drain on human → relay, or explicit `/flush`). Payload:
    `count`, `reason` (`mode_transition` | `explicit_flush`).

(3) Bonus: serde round-trip test for `SessionMode`.

Nit from CodeRabbit: keep `as_wire_str` / `parse` in sync with the
`#[serde(rename_all = "snake_case")]` derive. New
`session_mode_wire_format_matches_serde_round_trip` test guards
against drift between the manual helpers and serde.

Plus a direct regression test
`pending_message_preserves_full_routing_context` that asserts the
full struct round-trips through the queue unchanged.

Test status: full broker suite **644 passed** (was 642 baseline
before this commit, +2 net for the new tests). `cargo fmt --check`
+ `cargo clippy --all-targets --all-features -- -D warnings` clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@willwashburn
Copy link
Copy Markdown
Member Author

Pushed `38abde43` addressing both the review feedback and the missing telemetry events from the original brief. Recap by reviewer:

@cubic-dev-ai (P1) / @devin-ai-integration (🔴 critical) / @coderabbitai (🟠 major) / @chatgpt-codex-connector (P1) — all flagged the same bug: PendingRelayMessage dropped routing metadata, so flushed channel/thread/workspace messages came back as direct-to-worker DMs with default priority.

  • Fixed. Extended PendingRelayMessage with target, thread_id, workspace_id, workspace_alias, priority, and mode. New InboundContext<'_> bundles the args at both gate sites (/api/send + inbound-relaycast feed). inject_pending_relay_message now reads straight off the queued struct — drained messages are byte-for-byte equivalent to the original delivery. GET .../pending JSON shape extended to surface the new fields. Direct regression test: pending_message_preserves_full_routing_context.

@coderabbitai (nit) — add a serde round-trip test for SessionMode to keep as_wire_str / parse in sync with #[serde(rename_all = "snake_case")].

  • Added. session_mode_wire_format_matches_serde_round_trip covers both directions for both variants.

Telemetry events (separate ask from the human reviewer — original brief had punted these):

  • Added 3 events on /ws:
    • `delivery_queued` — per-message, fires when an inbound is parked in the human-mode queue. Payload: `event_id`, `from`, `target`, `reason: "session_mode_human"`.
    • `agent_session_mode_changed` — fires on mode flip via `PUT .../mode`. Payload: `previous_mode`, `mode`.
    • `agent_pending_drained` — fires on queue drain. Payload: `count`, `reason` (`mode_transition` | `explicit_flush`).

Docs (`web/content/docs/reference-broker-api.mdx`) updated with the expanded pending JSON shape AND the three new event kinds in the `/ws` event-kinds table.

Test status: full broker suite 644 passed, 0 failed (was 642 baseline before this commit, +2 net for the new round-trip and context-preservation tests). `cargo fmt --check` + `cargo clippy --all-targets --all-features -- -D warnings` clean.

All four inline review threads marked as resolved on the GitHub UI.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create an environment for this repo.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

@willwashburn Great work closing out all the threads in a single clean commit. A few highlights worth calling out:

  • Routing-context fix is the right approach — InboundContext<'_> as a bundle at the gate sites is clean, and having inject_pending_relay_message read directly off the queued struct eliminates any risk of future divergence between live and flushed delivery paths. The regression test makes it easy to catch regressions if the struct grows again.
  • Serde round-trip test (session_mode_wire_format_matches_serde_round_trip) — exactly what was asked for; both variants, both directions. ✅
  • Telemetry trio (delivery_queued / agent_session_mode_changed / agent_pending_drained) covers all the observable state transitions on the /ws feed. The reason field on agent_pending_drained (mode_transition | explicit_flush) is a nice touch for consumers who need to distinguish the two drain paths.
  • 644 passed, fmt + clippy clean — no regressions introduced.

Everything looks good from my side. Happy to approve.

(ノ◕ヮ◕)ノ*:・゚✧

✅ Actions performed

Comments resolved. Approval is disabled; enable reviews.request_changes_workflow to allow explicit top-level @coderabbitai resolve or @coderabbitai approve commands.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant