Skip to content

refactor(broker): split main entrypoint#906

Merged
willwashburn merged 8 commits into
mainfrom
codex/issue-875-split-broker-main
May 19, 2026
Merged

refactor(broker): split main entrypoint#906
willwashburn merged 8 commits into
mainfrom
codex/issue-875-split-broker-main

Conversation

@willwashburn
Copy link
Copy Markdown
Member

@willwashburn willwashburn commented May 19, 2026

Summary

  • Move agent-relay-broker command parsing and dispatch out of main.rs into cli/mod.rs
  • Split broker runtime into a runtime/ module with focused startup, event loop, HTTP API, Relaycast, worker-event, delivery, session, path, and utility modules
  • Introduce BrokerRuntime so long-lived state is explicit and RuntimeEvent dispatch keeps the select loop small
  • Update runtime contract tests to follow the new handler files while preserving existing behavior

Closes #875

Verification

  • cargo check -p agent-relay-broker
  • cargo test -p agent-relay-broker
  • cargo clippy -p agent-relay-broker -- -D warnings
  • cargo fmt -- --check

@willwashburn willwashburn requested a review from khaliqgant as a code owner May 19, 2026 00:56
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 19, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Splits the broker binary into a CLI module (crates/broker/src/cli/mod.rs) and a restructured runtime (crates/broker/src/runtime/*), adds runtime submodules (connection, delivery, headless, io, messages, paths, session, spawn_spec, system, util, event loop, API, maintenance, relaycast/worker handlers), updates imports, and adds trajectories and tests.

Changes

Broker CLI + Runtime Split

Layer / File(s) Summary
Trajectory artifacts
.trajectories/completed/*, .trajectories/index.json
Add completed trajectory JSON/MD and trace artifacts and update the trajectories index with new completed entries.
CLI entrypoint and commands
crates/broker/src/cli/mod.rs
New cli::run() initializes tracing, parses commands via clap, emits telemetry, and dispatches subcommands; defines DumpPty/McpArgs/Init/Pty/Headless command structs and conversions.
Import refactors across broker
crates/broker/src/cli_mcp_args.rs, crates/broker/src/pty_worker.rs, crates/broker/src/routing.rs, crates/broker/src/worker.rs, crates/broker/src/wrap.rs
Replace wildcard imports with explicit imports and update references to the new CLI/runtime module locations.
Runtime module entry
crates/broker/src/runtime/mod.rs
Add runtime module with constants, tracing guard, submodule declarations, and crate-visible re-exports for runtime components.
Runtime: connection & dump-pty
crates/broker/src/runtime/connection.rs
Add BrokerConnection, discover_broker_connection, and run_dump_pty to resolve broker URL/API key and fetch PTY snapshots.
Runtime: pending-delivery lifecycle
crates/broker/src/runtime/delivery.rs
Implement pending delivery persistence, inbound queuing, flush/inject flows, retry logic, and worker teardown/clear helpers.
Runtime: headless worker
crates/broker/src/runtime/headless.rs
Implement headless provider mapping and run_headless_worker protocol loop with subprocess spawn, streaming, and delivery lifecycle events.
Runtime: protocol I/O helpers
crates/broker/src/runtime/io.rs
Add send_frame, send_error, send_event, and emit_http_api_event_with_timeout for outbound protocol frames.
Runtime: messages & dedup
crates/broker/src/runtime/messages.rs
Add message/thread extraction, timestamp/sort-key logic, relaycast control dedup keys, thread naming, unread calculations, and bounded history.
Runtime: paths & locking
crates/broker/src/runtime/paths.rs
Add RuntimePaths, continuity path derivation, ephemeral/runtime path creation, per-broker lock handling with stale-lock recovery, and WS base URL derivation.
Runtime: session & startup API
crates/broker/src/runtime/session.rs
Add session/runtime types, startup health endpoint, connect_relay to initialize multi-workspace RelaySession, and related helpers.
Runtime: spawn spec
crates/broker/src/runtime/spawn_spec.rs
Add runtime_label and build_http_api_spawn_spec to build AgentSpec from transport/CLI inputs.
Runtime: system helpers
crates/broker/src/runtime/system.rs
Add terminal-size helpers, Linux memory-by-pid, and build_agent_metrics.
Runtime: util & env helpers
crates/broker/src/runtime/util.rs
Add tracing init, env-timeout parsing, channel defaults, normalize_channel, agent-state event builders/publishers, terminal size wrapper, and MCP id extraction.
Runtime tests
crates/broker/src/runtime/tests.rs
Add comprehensive test suite covering queueing, message grouping, dedup, prompt detection, spawn-spec/model flag helpers, path derivation, and utilities.
Runtime: API, event loop, maintenance, relaycast/worker handlers
crates/broker/src/runtime/{api,event_loop,maintenance,relaycast_events,worker_events}.rs
Add BrokerRuntime event-loop, API request handler, maintenance tick handling (retries/restarts), relaycast inbound mapping and control handling, and worker-event processing including continuity and exit cleanup.

Sequence Diagram(s)

sequenceDiagram
  participant Cli as Cli::run()
  participant Init as run_init()
  participant Relay as Relaycast (Auth/HTTP)
  participant Runtime as BrokerRuntime
  participant Worker as WorkerRegistry
  Cli->>Init: parse args & invoke run_init / subcommand
  Init->>Relay: connect_relay (AuthClient / startup session)
  Relay-->>Init: workspace/session info
  Init->>Runtime: construct BrokerRuntime & start run()
  Runtime->>Worker: queue_and_try_delivery / retry_pending_delivery
  Worker-->>Runtime: delivery result / events
  Runtime->>Relay: relaycast pre-register / mark online/offline
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • khaliqgant

"🐰
I hopped through code to split the main,
Parsers neat and runtime kept sane,
Clap commands tucked in tidy rows,
Imports trimmed where the breezy wind blows,
Issue #875 now ready to gain."

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch codex/issue-875-split-broker-main

devin-ai-integration[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

@willwashburn willwashburn force-pushed the codex/issue-875-split-broker-main branch from 74b8327 to cb59df6 Compare May 19, 2026 01:10
coderabbitai[bot]

This comment was marked as resolved.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
crates/broker/src/runtime/tests.rs (1)

1318-1337: 🏗️ Heavy lift

Avoid mirroring spawn-flag logic inside tests.

compute_bypass_flag and compute_model_flag duplicate production decision logic, so tests can drift from real behavior. Prefer asserting against extracted production helpers used by WorkerRegistry::spawn().

Also applies to: 1664-1676

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/tests.rs` around lines 1318 - 1337, The test-local
functions compute_bypass_flag and compute_model_flag duplicate production logic
and should be removed; update the tests to call the shared decision helpers used
by WorkerRegistry::spawn() instead of mirroring logic. Replace calls to
compute_bypass_flag/compute_model_flag in tests.rs with the real helper
functions (the same helper(s) that WorkerRegistry::spawn() uses to decide
flags/model), import them into the test module, and update assertions to reflect
the helper outputs; remove the duplicated functions at both locations (around
the shown block and lines 1664–1676).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/runtime/messages.rs`:
- Around line 278-289: The parse_sort_key_from_raw_timestamp function mixes
units: numeric strings are returned as raw i64 (seconds-like) while RFC3339
parsing yields milliseconds, so normalize and/or document the behavior; modify
parse_sort_key_from_raw_timestamp to, after parsing trimmed.parse::<i64>(),
apply a seconds-vs-milliseconds heuristic (e.g., if value < 4_102_444_800 then
treat as seconds and multiply by 1000) so the function always returns
milliseconds, keep the RFC3339 branch unchanged, and add/update a doc comment on
parse_sort_key_from_raw_timestamp explaining that returned values are
milliseconds and update relevant tests to expect milliseconds.

In `@crates/broker/src/runtime/paths.rs`:
- Around line 37-48: ensure_ephemeral_paths currently builds the temp root only
from std::process::id(), causing multiple ephemeral brokers in the same process
to collide; use the unused _broker_name parameter and/or a per-instance random
id to make the directory unique. Update the root construction in
ensure_ephemeral_paths to include a sanitized broker identifier and a randomly
generated component (e.g., UUID or secure random hex) — e.g., join
format!("agent-relay-ephemeral-{}-{}", std::process::id(), uuid) or include
broker_name in the format — then create_dir_all on that new root and keep
building RuntimePaths (state, pending, _lock) from it so each broker instance
gets its own state.json/pending.json files.

In `@crates/broker/src/runtime/session.rs`:
- Around line 168-193: Remove token-derived data from the debug artifact by
deleting the token_prefix line and any use of self_token when building
identity_debug in session.rs (the formatted string assigned to identity_debug).
Ensure the formatted output no longer references &self_token or slices thereof
and only includes non-token fields (agent_name, requested_name, agent_id,
default_workspace, workspace_count, timestamp); keep the existing
AGENT_RELAY_NO_DEBUG_FILES gating unchanged so files are still prevented when
the env var is set.

In `@crates/broker/src/runtime/tests.rs`:
- Around line 453-475: This test mutates process-global environment variables
causing flakiness in parallel runs; protect env changes by acquiring a global
test mutex before modifying env and releasing after (use a static
Lazy<Mutex<()>> via once_cell or lazy_static), so in
contract_startup_429_fixture_requires_degraded_health_status lock the mutex
before std::env::set_var and hold it until after std::env::remove_var (or use a
RAII guard to ensure removal in a finally/drop), and apply the same pattern to
the other env-mutating tests (the ones referenced around the other fixture
block) so all tests that set AGENT_RELAY_STARTUP_ERROR_CODE (and similar env
keys) serialize their execution.

---

Nitpick comments:
In `@crates/broker/src/runtime/tests.rs`:
- Around line 1318-1337: The test-local functions compute_bypass_flag and
compute_model_flag duplicate production logic and should be removed; update the
tests to call the shared decision helpers used by WorkerRegistry::spawn()
instead of mirroring logic. Replace calls to
compute_bypass_flag/compute_model_flag in tests.rs with the real helper
functions (the same helper(s) that WorkerRegistry::spawn() uses to decide
flags/model), import them into the test module, and update assertions to reflect
the helper outputs; remove the duplicated functions at both locations (around
the shown block and lines 1664–1676).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 4231966b-9535-4bfa-b89b-73523704da1f

📥 Commits

Reviewing files that changed from the base of the PR and between d52c147 and 450740c.

📒 Files selected for processing (17)
  • .trajectories/completed/2026-05/traj_47akjihewlow.json
  • .trajectories/completed/2026-05/traj_47akjihewlow.md
  • .trajectories/completed/2026-05/traj_47akjihewlow.trace.json
  • .trajectories/index.json
  • crates/broker/src/runtime/connection.rs
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/headless.rs
  • crates/broker/src/runtime/init.rs
  • crates/broker/src/runtime/io.rs
  • crates/broker/src/runtime/messages.rs
  • crates/broker/src/runtime/mod.rs
  • crates/broker/src/runtime/paths.rs
  • crates/broker/src/runtime/session.rs
  • crates/broker/src/runtime/spawn_spec.rs
  • crates/broker/src/runtime/system.rs
  • crates/broker/src/runtime/tests.rs
  • crates/broker/src/runtime/util.rs
✅ Files skipped from review due to trivial changes (3)
  • .trajectories/completed/2026-05/traj_47akjihewlow.json
  • .trajectories/completed/2026-05/traj_47akjihewlow.md
  • .trajectories/completed/2026-05/traj_47akjihewlow.trace.json
🚧 Files skipped from review as they are similar to previous changes (1)
  • .trajectories/index.json

Comment thread crates/broker/src/runtime/messages.rs
Comment thread crates/broker/src/runtime/paths.rs Outdated
Comment thread crates/broker/src/runtime/session.rs
Comment thread crates/broker/src/runtime/tests.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/runtime/api.rs`:
- Around line 1042-1087: The SubscribeChannels/UnsubscribeChannels branches only
mutate handle.spec.channels locally; after updating that vector you must
propagate changes upstream and into persisted agent state: for
SubscribeChannels, after pushing new channels (added), call the
workspace/relaycast subscription API to subscribe the worker's workspace to the
newly added channels (e.g., a helper like subscribe_worker_channels(name,
&added) or workers.workspace.subscribe_channels(&name, &added)), and also update
state.agents (the persisted agent entry for name) to include the new channels
when running in persist mode; for UnsubscribeChannels, after retain() completes
call the workspace/relaycast unsubscription API to remove those channels and
mirror the remaining channel list into state.agents (or remove entries) so
persisted state matches handle.spec.channels. Ensure both branches handle empty
added/removed lists gracefully and send the reply only after upstream and
state.agents are updated.

In `@crates/broker/src/runtime/event_loop.rs`:
- Around line 60-71: The select is repeatedly polling closed channels (api_rx,
ws_inbound_rx, worker_event_rx) which returns None immediately and spins; wrap
each of these fields as Option<...> (e.g. self.api_rx: Option<Receiver<...>>)
and only poll them when Some, using Option::as_mut()/take() in the select arms;
when recv() yields None, set that Option to None (drop the receiver) and emit
the appropriate single-time closed event (e.g. RuntimeEvent::ApiClosed) so the
arm stops being polled thereafter; apply the same pattern for ws_inbound_rx and
worker_event_rx (also at the other select block referenced) so closed receivers
are disabled instead of repeatedly selected.

In `@crates/broker/src/runtime/maintenance.rs`:
- Around line 304-343: After a successful workers.spawn (inside the Ok(_)
branch) refresh the persisted agent metadata for this agent name so the saved
PID/start time/restart_count are updated and not left pointing at the crashed
process; specifically, update the state.agents entry (or the persistence layer
holding agent metadata) for name with the new runtime fields (new PID, new start
timestamp, updated restart_count) right after workers.supervisor.on_restarted
and workers.metrics.on_restart and before sending the agent_restarted
event/publishing the agent state transition (so run_init won’t later reap the
live agent due to a stale persisted PID).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 9aee47e7-eabd-45fb-b2e7-6571f22eab21

📥 Commits

Reviewing files that changed from the base of the PR and between 450740c and 7672f70.

📒 Files selected for processing (11)
  • .trajectories/completed/2026-05/traj_x37bhga2j5ph.json
  • .trajectories/completed/2026-05/traj_x37bhga2j5ph.md
  • .trajectories/index.json
  • crates/broker/src/runtime/api.rs
  • crates/broker/src/runtime/event_loop.rs
  • crates/broker/src/runtime/init.rs
  • crates/broker/src/runtime/maintenance.rs
  • crates/broker/src/runtime/mod.rs
  • crates/broker/src/runtime/relaycast_events.rs
  • crates/broker/src/runtime/tests.rs
  • crates/broker/src/runtime/worker_events.rs
✅ Files skipped from review due to trivial changes (2)
  • .trajectories/completed/2026-05/traj_x37bhga2j5ph.json
  • .trajectories/index.json
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/broker/src/runtime/mod.rs
  • crates/broker/src/runtime/tests.rs

Comment thread crates/broker/src/runtime/api.rs
Comment thread crates/broker/src/runtime/event_loop.rs
Comment thread crates/broker/src/runtime/maintenance.rs
Comment thread crates/broker/src/runtime/relaycast_events.rs
Comment thread crates/broker/src/runtime/worker_events.rs Outdated
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 6 additional findings in Devin Review.

Open in Devin Review

Comment thread crates/broker/src/runtime/api.rs
Copy link
Copy Markdown

@barryollama barryollama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Verdict: ✅ APPROVE

This is a well-executed refactoring that successfully decomposes the monolithic ~7,700-line main.rs into a clean, modular structure.

Highlights

Clean module organization:

  • event_loop.rs — central event dispatch with BrokerRuntime struct
  • init.rs — startup/initialization logic
  • api.rs — HTTP request handlers
  • headless.rs, delivery.rs, worker_events.rs, relaycast_events.rs — focused domain modules
  • session.rs, paths.rs, connection.rs — supporting utilities

Simplified main.rs: Now just 29 lines — module declarations and a delegating main().

Tests preserved: All 369 tests pass (357 + 12 continuity tests), confirming no regressions.

Clean build: cargo check and cargo clippy -D warnings both pass.

Architecture

The BrokerRuntime struct provides explicit long-lived state, and the RuntimeEvent enum keeps the tokio::select! loop clean and manageable. The delegation pattern from cli/mod.rs to runtime/ modules is well-structured.

Good work on this organizational improvement — the broker code is now much more maintainable.

@willwashburn willwashburn merged commit 6378fdb into main May 19, 2026
39 checks passed
@willwashburn willwashburn deleted the codex/issue-875-split-broker-main branch May 19, 2026 02:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

refactor(broker): split main.rs into CLI and runtime modules

2 participants