refactor: extract the bus into a standalone crate::bus module#51
Conversation
Decouple the consume path from the concrete microsvc::Service:
- Add `MessageRouter` { handles, subscription_plan, dispatch } in
transport/router.rs — the trait run_source and the BusConsumer adapters
depend on instead of Service<D>.
- impl MessageRouter for Service<D> (microsvc/message_router.rs); the
HandlerError -> TransportError classification happens on the microsvc side
so the runner only sees an already-classified error.
- run_source<R: MessageRouter, S, I: Send>: drop the <D> Service generic,
keep the <I> inbox-hook generic (RunOptions::inbox still works).
- BusConsumer::listen/subscribe take Arc<impl MessageRouter>; rewrite all five
adapters (in_memory/nats/rabbitmq/kafka/postgres) + rabbit::ensure_subscription
to derive topology from subscription_plan() instead of command_names()/
event_names().
No behavior change. 226 lib + 414 integration + 12 in-memory conformance tests
green; cargo check --all-features clean.
Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]]
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Handlers is the second MessageRouter impl and the standalone, Service-free
way to consume the bus: register a closure per (kind, name) and run it with
bus.listen/bus.subscribe — the Rust analog of Node's bus.listen('x', fn).
- Reuses the AsyncMessageHandler HRTB boxed-future pattern (over &Message, no
Context/deps/guards); idempotent-only by design.
- A Service<()> facade is impossible (bus must not depend on microsvc), so
Handlers is its own engine.
- Tests prove full InMemoryBus publish->subscribe and send->listen round trips
with no Service.
Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]]
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ule (Phase 3) Move the canonical transport vocabulary out of microsvc::service into a dependency-free transport::message module, so the bus stops returning a microsvc error type from payload decoding: - New transport/message.rs: Message, MessageKind, and a bus-core PayloadDecodeError. payload_json/payload_bitcode now return PayloadDecodeError instead of microsvc::HandlerError. - microsvc::error gains From<PayloadDecodeError> for HandlerError; Context::input maps back, so handler signatures are unchanged. - Re-export Message/MessageKind/PayloadDecodeError from both transport and microsvc, so every `crate::microsvc::Message` consumer is unaffected. No behavior change. 229 lib + full default integration suite + 12 conformance green; cargo check --all-features clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The bus is no longer embedded under microsvc: `src/microsvc/transport/` is now `src/bus/` (crate::bus). microsvc keeps a transitional `pub use crate::bus as transport;` alias so existing `microsvc::transport::…` paths (incl. tests) keep resolving; call sites and the bus's own upward imports are cleaned in P4b. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…b/1) SubscriptionPlan is bus vocabulary (consumed by MessageRouter + adapters); move it out of microsvc::service into bus::message. Re-exported from microsvc for source compatibility. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
OutboxDispatcher/OutboxSource depend on crate::outbox + crate::outbox_worker, so they belong with the worker (which depends up on the bus's publisher/source traits), not in bus core. Re-exported at crate root; the two test crates that used them via microsvc::transport now import from the crate root. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…P4b/3) Mechanical: bus production files now reference the canonical types via super:: (crate::bus) instead of crate::microsvc::. Remaining crate::microsvc references in bus are error.rs (B4), knative ingress (B5), and Service-based test modules (B6). Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- bus/error.rs no longer names HandlerError: the From<HandlerError> conversion and classification move to microsvc (HandlerError::transport_error_kind); From<RepositoryError> for TransportError moves to the outbox bridge (which knows both types). Bus core's error.rs is now microsvc-free. - Knative split: the Service-coupled HTTP ingress (cloud_events_router / ingress_handler / CloudEvent parsing) moves to microsvc::knative_ingress; bus/knative.rs keeps only the Message/SubscriptionPlan-only manifest helpers (knative_triggers, sanitize_k8s_name). Two test crates updated. 229 lib tests green; cargo check --all-features clean. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…(P4b/5) - Rewrite the runner and in_memory_bus unit tests to register handlers via the dependency-free Handlers builder instead of microsvc::Service, so the bus has zero crate::microsvc references (production AND tests). - Fix the rabbitmq_transport test to pass &Service (ensure_subscription now takes &impl MessageRouter, which doesn't deref-coerce &Arc). - cargo fmt. src/bus/ now imports nothing from microsvc. 229 lib + 493 default integration tests green; cargo test --all-features --no-run clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rate::bus Remove the transitional pub use crate::bus as transport; alias and move every remaining reference to crate::bus / sourced_rust::bus: the message_router and context internal imports, the outbox_source test, all 11 test crates, and the README + async-transports docs. microsvc::transport no longer exists anywhere. 493 default integration tests green; cargo test --all-features --no-run clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Point the bus module-doc links at the types' new homes after the extraction: OutboxDispatcher/OutboxDispatchOutcome/OutboxSource -> crate root re-exports, cloud_events_router -> crate::microsvc. Brings cargo doc back to parity with main (4 pre-existing warnings, zero new). Found by the branch-wide review. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR refactors the async transport layer to decouple bus consumers from a concrete ChangesBus Transport Refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/bus/handlers.rs (1)
110-122: ⚡ Quick winMake
subscription_plan()deterministic and linear-time.
HashMapiteration order is nondeterministic, and thebucket.iter().any(...)check is redundant here because keys are already unique by(kind, name). Build buckets directly and sort before returning.♻️ Proposed refactor
fn subscription_plan(&self) -> SubscriptionPlan { - let mut plan = SubscriptionPlan::default(); - for (kind, name) in self.handlers.keys() { - let bucket = match kind { - MessageKind::Command => &mut plan.commands, - MessageKind::Event => &mut plan.events, - }; - if !bucket.iter().any(|existing| existing == name) { - bucket.push(name.clone()); - } - } - plan + let mut plan = SubscriptionPlan::default(); + for (kind, name) in self.handlers.keys() { + match kind { + MessageKind::Command => plan.commands.push(name.clone()), + MessageKind::Event => plan.events.push(name.clone()), + } + } + plan.commands.sort(); + plan.events.sort(); + plan }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/bus/handlers.rs` around lines 110 - 122, The subscription_plan() currently iterates a HashMap and does an O(n^2) duplicate check which yields nondeterministic order; change it to build the command and event buckets directly by iterating self.handlers.keys() and pushing names into the appropriate bucket (no .any() duplicate check since keys are unique by (kind,name)), then sort the buckets (e.g., plan.commands.sort(); plan.events.sort()) before returning so the result is deterministic and overall linearithmic instead of quadratic; update the function subscription_plan, operate on SubscriptionPlan::default()'s commands and events, and match on MessageKind to decide which bucket to push into.tests/microsvc/transport_subscribe.rs (1)
8-10: ⚡ Quick winImport
MessageandMessageKindfromsourced_rust::busin this bus-facing test.This test still builds transport messages through
sourced_rust::microsvc, so it doesn't fully cover the new top-level bus API. Pulling those types fromsourced_rust::bushere would make the migration complete and keep this test aligned with the boundary the PR is establishing.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/microsvc/transport_subscribe.rs` around lines 8 - 10, Update the test imports so Message and MessageKind are pulled from the top-level bus API instead of microsvc: remove Message and MessageKind from the sourced_rust::microsvc import and import them from sourced_rust::bus (alongside Bus, BusConsumer, InMemoryBus, RunOptions) while leaving Service (and other existing imports) intact; this ensures the test constructs transport messages using sourced_rust::bus types (symbols: Message, MessageKind, Service, Bus, BusConsumer).src/bus/rabbit_bus.rs (1)
148-157: ⚡ Quick winShort-circuit empty event plans before declaring RabbitMQ resources.
ensure_subscriptionandsubscribestill perform broker setup on a no-op event plan. Move the empty-plan check ahead of declarations to avoid unnecessary AMQP writes and connection work.♻️ Suggested change
pub async fn ensure_subscription<R: MessageRouter>( &self, router: &R, ) -> Result<(), TransportError> { + let plan = router.subscription_plan(); + if plan.events.is_empty() { + return Ok(()); + } self.declare_events_exchange(&self.channel).await?; let queue = self.group_queue(); self.declare_queue(&self.channel, &queue).await?; - let plan = router.subscription_plan(); for name in &plan.events { self.channel .queue_bind( &queue, &self.events_exchange, @@ async fn subscribe<R: MessageRouter>( &self, router: Arc<R>, options: RunOptions, ) -> Result<(), TransportError> { self.ensure_subscription(router.as_ref()).await?; - if router.subscription_plan().events.is_empty() { - return Ok(()); - } let channel = connect_channel(&self.uri).await?; let source = RabbitBusSource { channel, queues: vec![self.group_queue()], // Events are published with routing key == the bare event name.Also applies to: 230-233
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/bus/rabbit_bus.rs` around lines 148 - 157, The functions ensure_subscription and subscribe currently perform RabbitMQ setup even when the router.subscription_plan() has no events; to fix, compute let plan = router.subscription_plan() early in both ensure_subscription and subscribe, return Ok(()) immediately if plan.events.is_empty() (short-circuit), and only proceed to call declare_events_exchange, declare_queue, and iterate plan.events when non-empty — update references in ensure_subscription and subscribe to check plan.events before any channel or broker declaration to avoid unnecessary AMQP work.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/bus/knative.rs`:
- Around line 40-44: The current knative_triggers function can produce duplicate
metadata.name after sanitize_k8s_name because different event strings can
normalize to the same DNS label; change the trigger name generation so you
append a short deterministic suffix derived from the original event (e.g., a 6–8
char hex of a stable hash like SHA-1 or CRC32 of event) to the base
"{subscriber_service}-{event}" before calling sanitize_k8s_name, ensuring
uniqueness while keeping names deterministic; update the trigger_name creation
in knative_triggers to compute that short hash suffix and include it in the
formatted name.
In `@src/bus/mod.rs`:
- Line 141: The module docs still describe run_source dispatching via
Service::dispatch_message but the public API now exports MessageRouter; update
the header docs to describe router-based dispatch instead: replace or augment
the overview that references run_source and Service::dispatch_message with
language that explains messages are dispatched through the MessageRouter
abstraction (mention how run_source interacts with or delegates to MessageRouter
if applicable), and update any examples/links in the doc comments to reference
MessageRouter rather than Service::dispatch_message so rustdoc reflects the new
public surface.
---
Nitpick comments:
In `@src/bus/handlers.rs`:
- Around line 110-122: The subscription_plan() currently iterates a HashMap and
does an O(n^2) duplicate check which yields nondeterministic order; change it to
build the command and event buckets directly by iterating self.handlers.keys()
and pushing names into the appropriate bucket (no .any() duplicate check since
keys are unique by (kind,name)), then sort the buckets (e.g.,
plan.commands.sort(); plan.events.sort()) before returning so the result is
deterministic and overall linearithmic instead of quadratic; update the function
subscription_plan, operate on SubscriptionPlan::default()'s commands and events,
and match on MessageKind to decide which bucket to push into.
In `@src/bus/rabbit_bus.rs`:
- Around line 148-157: The functions ensure_subscription and subscribe currently
perform RabbitMQ setup even when the router.subscription_plan() has no events;
to fix, compute let plan = router.subscription_plan() early in both
ensure_subscription and subscribe, return Ok(()) immediately if
plan.events.is_empty() (short-circuit), and only proceed to call
declare_events_exchange, declare_queue, and iterate plan.events when non-empty —
update references in ensure_subscription and subscribe to check plan.events
before any channel or broker declaration to avoid unnecessary AMQP work.
In `@tests/microsvc/transport_subscribe.rs`:
- Around line 8-10: Update the test imports so Message and MessageKind are
pulled from the top-level bus API instead of microsvc: remove Message and
MessageKind from the sourced_rust::microsvc import and import them from
sourced_rust::bus (alongside Bus, BusConsumer, InMemoryBus, RunOptions) while
leaving Service (and other existing imports) intact; this ensures the test
constructs transport messages using sourced_rust::bus types (symbols: Message,
MessageKind, Service, Bus, BusConsumer).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 1ffc2cb8-6399-4324-a06b-1e830250d21f
📒 Files selected for processing (46)
README.mddocs/async-transports.mdsrc/bus/bus.rssrc/bus/capabilities.rssrc/bus/error.rssrc/bus/failure_policy.rssrc/bus/handlers.rssrc/bus/in_memory_bus.rssrc/bus/kafka.rssrc/bus/kafka_bus.rssrc/bus/knative.rssrc/bus/knative_bus.rssrc/bus/message.rssrc/bus/mod.rssrc/bus/nats.rssrc/bus/nats_bus.rssrc/bus/postgres_bus.rssrc/bus/publisher.rssrc/bus/rabbit_bus.rssrc/bus/rabbitmq.rssrc/bus/router.rssrc/bus/run_options.rssrc/bus/runner.rssrc/bus/source.rssrc/bus/stable_id.rssrc/lib.rssrc/microsvc/context.rssrc/microsvc/error.rssrc/microsvc/knative_ingress.rssrc/microsvc/message_router.rssrc/microsvc/mod.rssrc/microsvc/service.rssrc/outbox_worker/mod.rssrc/outbox_worker/outbox_dispatch.rssrc/outbox_worker/outbox_source.rstests/distributed_read_model/main.rstests/distributed_read_model_board/main.rstests/kafka_transport/main.rstests/knative_cloudevents/main.rstests/microsvc/transport_listen.rstests/microsvc/transport_subscribe.rstests/nats_transport/main.rstests/postgres_transport/main.rstests/rabbitmq_transport/main.rstests/sagas/microsvc_saga.rstests/transport_conformance/mod.rs
💤 Files with no reviewable changes (1)
- src/bus/error.rs
Distinct CloudEvent types can normalize to the same RFC-1123 label (e.g. `order.created` and `order-created`, or a command and an event of the same name), so the generated `Trigger` `metadata.name`s could collide and the second would clobber the first on apply — silently dropping a subscription. Add `unique_k8s_name`, which sanitizes and then de-duplicates against the names already emitted (numeric suffix, capped at 63 chars; the `type:` filter still carries the raw event name, so routing is unchanged). Thread one dedup set through both generators: `knative_triggers` and `KnativeBus::manifests` (commands + events share the set). Addresses CodeRabbit review on PR #51. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Leftover formatting from the clean-break import sweep (rustfmt sorts the bus import before the microsvc one). No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
After the MessageRouter seam landed, the module header still said run_source dispatches through `Service::dispatch_message`. Update it to reference the `MessageRouter` consume seam (implemented by microsvc::Service and the dependency-free Handlers builder) and `MessageRouter::dispatch`, so rustdoc matches the new public surface. Addresses CodeRabbit review on PR #51. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
Extracts the message bus out of
src/microsvc/transport/into a standalone top-levelsrc/bus/module that imports nothing frommicrosvc.microsvcnow depends on the bus, not the reverse. Behavior-preserving — pure restructuring.Implements
specs/bus-module-decomposition(hardened after an adversarial design review) /tasks/bus-decomposition-phase1.Why
Three goals, all delivered:
Servicevia the new dependency-freeHandlersbuilder.bus.subscribe(Handlers::new().on_event("seat.reserved", |msg| async { … }), opts).How (phased, each commit compiles + green)
MessageRouterseam —run_source+ all fiveBusConsumeradapters consume a smallMessageRouter { handles, subscription_plan, dispatch }trait instead of the concreteService<D>.Service<D>implements it; classification ofHandlerErrorhappens on the microsvc side.Handlers— a second, dependency-freeMessageRouterimpl (the Rust analog of Node'sbus.listen('x', fn)); idempotent-only by design.Message/MessageKind/SubscriptionPlan→ bus core, plus a bus-corePayloadDecodeErrorso payload decoding no longer returnsmicrosvc::HandlerError(mapped back viaFromon the microsvc side).bus::errornames no microsvc types;From<HandlerError>/classification live in microsvc,From<RepositoryError>with the outbox bridge.outbox_worker(OutboxDispatcher/OutboxSource, re-exported at the crate root).Service-coupled HTTP ingress (cloud_events_router) →microsvc::knative_ingress; theMessage/SubscriptionPlan-only manifest helpers stay in the bus.microsvc::transportalias is removed; all call sites (incl. tests + docs) usecrate::bus.Verification
cargo test --all-features --no-runclean (all broker/Knative test crates compile).--no-default-features [--features …]) compile.grepconfirmssrc/bus/imports nothing fromcrate::microsvc.cargo docat parity withmain(4 pre-existing warnings, zero new).Public API moves (intended)
cloud_events_router→sourced_rust::microsvc::cloud_events_router(was under the transport module).OutboxDispatcher/OutboxSource→sourced_rust::crate root.Message/MessageKind/SubscriptionPlan/PayloadDecodeErrorre-exported from bothsourced_rust::busandsourced_rust::microsvc.Deferred (tracked in the spec)
The per-transport crate split (
sourced-bus-*) is deferred — feature-gating already provides compile isolation, so its only added benefit is independent versioning. Revisit when a consumer needs it.🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Documentation
Refactor
Tests