Skip to content

feat: async microsvc transports#43

Merged
patrickleet merged 12 commits into
mainfrom
codex/async-microsvc-transports
May 27, 2026
Merged

feat: async microsvc transports#43
patrickleet merged 12 commits into
mainfrom
codex/async-microsvc-transports

Conversation

@patrickleet
Copy link
Copy Markdown
Collaborator

@patrickleet patrickleet commented May 27, 2026

Summary

  • add microsvc handler metadata, Message, and subscription planning
  • update register_handlers! to derive handler specs from COMMAND, EVENT, and EVENTS constants at registration time
  • remove per-handler SPEC boilerplate and projection-local envelope/subscriber wrappers
  • add fluent registration builders: Service::command("name").handle(handler), Service::command("name").guarded(guard, handler), and matching event/events builders
  • expose both ctx.message() and ctx.input::() to every handler; Message carries optional durable id, raw payload bytes, metadata, correlation_id, and causation_id helpers
  • remove HandlerInput, MessageEnvelope, .envelope(), Service::command_guarded, and Service::commands

Tests

  • cargo test --all-features
  • cargo test --all-features microsvc
  • cargo test --all-features --test distributed_read_model --test distributed_read_model_board
  • git diff --check

Notes

  • GitKB spec specs/async-microsvc-transports is current on KB commit 019e6b20-4175-7d62-b0e6-f5bc63b9d5a9
  • async source/publisher traits, CloudEvents/Knative, Kafka/NATS, and outbox publisher mapping remain future slices

Summary by CodeRabbit

  • New Features

    • Service introspection: query command/event names and handler specifications; builder-style handler registration and bulk registration support.
  • Refactor

    • Unified message envelope dispatch path; subscription now acknowledges and skips unknown events.
    • Context and handler APIs now expose transport message metadata and payload decoding consistently.
  • Documentation

    • Updated transport examples to the builder-style registration and introspection APIs.
  • Tests

    • Tests updated to use new registration, dispatch, and message-introspection patterns.
  • Bug Fixes

    • Bus dispatch no longer rejects non-JSON payloads in tested scenarios.

Review Change Stack

Implements the first slice of [[specs/async-microsvc-transports]].

Adds handler metadata, message envelopes, subscription planning, and projection handler envelope dispatch.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 27, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Service registration and dispatch were refactored around a transport-visible Message and HandlerSpec model, register_handlers! macro updated to a service-first API, Context now stores Message, and projection services/tests were simplified by removing custom projection wrappers.

Changes

Handler Specification Framework Migration

Layer / File(s) Summary
Message types and service state foundation
src/microsvc/service.rs
Adds Message, MessageKind, DeliveryKind, HandlerNames, HandlerSpec, SubscriptionPlan, and tracks handler_specs on Service<D>.
Handler registration and dispatch refactoring
src/microsvc/service.rs
Introduces HandlerBuilder/handler API, dispatch_message(&Message), invoke path, generalized message conversion helpers, and updates tests for payload decoding and metadata exposure.
register_handlers! macro and public API updates
src/microsvc/mod.rs
Macro becomes service-first, supports command/event/events forms, and re-exports new messaging and handler-spec types.
Context -> Message integration
src/microsvc/context.rs
Context stores a Message, decodes inputs from message.payload_json(), and exposes message()/message_name() accessors.
Remove ProjectionMessage and simplify projection handlers
tests/distributed_read_model/projection_service/handlers/*, tests/distributed_read_model_board/projections_service/handlers/*
Drop local ProjectionMessage types; handlers construct Event from ctx.message() and adapt guard checks.
Projection service registration and subscriber removal
tests/distributed_read_model/projection_service/*, tests/distributed_read_model_board/projections_service/*, tests/distributed_read_model/*
Remove ProjectionSubscriber and subscriber helper; rewrite services to use register_handlers! and pass queue.new_subscriber() to microsvc::subscribe.
Integration test subscription simplification
tests/distributed_read_model/main.rs, tests/distributed_read_model_board/projections_service/mod.rs
Tests updated to import projection services directly and subscribe using queue.new_subscriber() without custom subscriber wrappers.
Docs/examples and HTTP/gRPC updates
src/microsvc/http.rs, src/microsvc/grpc.rs, src/microsvc/mod.rs
Examples switched to chained .command(...).handle(...); health endpoints now use service.command_names(); subscribe docs updated.
Tests and examples
tests/microsvc/*, tests/distributed_read_model/*
Tests updated to use builder-style handler registration and to assert on command_names/subscription_plan and new dispatch behaviors.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 Hopping through the frame,
Types now guide the game—
HandlerSpec shows the way,
Envelopes route the day!
A rabbit cheers for cleaner wiring.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'feat: async microsvc transports' is vague and overly broad. The PR primarily refactors handler registration APIs, message metadata modeling, and subscription planning—not async transport implementation. It does not accurately reflect the main changes in the changeset. Consider a more specific title that highlights the core refactoring, such as 'refactor: microsvc handler registration and message metadata' or 'feat: add microsvc message metadata and fluent handler builders'.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 96.15% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch codex/async-microsvc-transports

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/microsvc/service.rs`:
- Around line 208-209: The handlers HashMap is keyed only by name so commands
and events with the same name collide; change the registry key to include
MessageKind (e.g., use (MessageKind, String) as the key) and update all places
that access it: registration logic that inserts into handlers, lookup in
dispatch(), the handles() method, and any code used by subscription_plan() and
subscribe() to use (kind, name) instead of name; also ensure registration either
rejects duplicate (kind,name) entries or overwrites intentionally so HandlerSpec
and handlers remain consistent.
- Around line 693-695: message_to_session currently copies metadata keys as-is
which causes transport-dependent lookup (e.g., X-Hasura-* vs lowercase); update
message_to_session to normalize keys (e.g., lowercase) before building the vars
map so Session::from_map receives normalized keys; locate the function
message_to_session and the local vars HashMap<String,String> built from
message.metadata and transform each key to lowercase (or whatever canonical form
used by src/microsvc/http.rs) before calling Session::from_map.

In `@tests/sagas/handlers/saga/on_inventory_reserved.rs`:
- Around line 4-5: The handler SPEC is incorrectly declared as a command; since
it consumes the InventoryReserved event it should use an event spec. Update the
SPEC declaration (pub const SPEC) to call
sourced_rust::microsvc::HandlerSpec::event(...) instead of
HandlerSpec::command(...), passing the InventoryReserved event identifier used
in this test (replace COMMAND with the appropriate InventoryReserved event
constant/name).

In `@tests/sagas/handlers/saga/on_order_completed.rs`:
- Around line 4-5: The handler is incorrectly registered as a command; update
the SPEC to register it as an event handler by replacing
HandlerSpec::command(COMMAND) with HandlerSpec::event(COMMAND) (and optionally
rename the COMMAND constant to something like EVENT or ORDER_COMPLETED_EVENT for
clarity), ensuring the constant references the OrderCompleted completion event.

In `@tests/sagas/handlers/saga/on_order_created.rs`:
- Around line 4-5: The handler SPEC is currently registered as a command which
breaks event subscription; change the spec to register it as an event consumer
by replacing sourced_rust::microsvc::HandlerSpec::command(COMMAND) with the
event-style spec (e.g., sourced_rust::microsvc::HandlerSpec::event(COMMAND) or
the library's consumer variant) so the on_order_created saga (SPEC / COMMAND) is
treated as an event consumer.

In `@tests/sagas/handlers/saga/on_payment_succeeded.rs`:
- Around line 4-5: SPEC is currently defined as
sourced_rust::microsvc::HandlerSpec::command(COMMAND) but PaymentSucceeded is an
upstream event; change the spec to use HandlerSpec::event(...) instead of
.command(...) so the saga subscribes as an event listener. Update the line
declaring SPEC to call sourced_rust::microsvc::HandlerSpec::event(COMMAND)
(ensuring COMMAND still refers to the PaymentSucceeded message) so subscription
planning treats it as an event.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 41012709-0231-47de-a028-d33f0e92eb08

📥 Commits

Reviewing files that changed from the base of the PR and between c5bd412 and 8c1746b.

📒 Files selected for processing (35)
  • src/microsvc/mod.rs
  • src/microsvc/service.rs
  • tests/distributed_read_model/checkout_saga_service/handlers/record_seat_reserved.rs
  • tests/distributed_read_model/checkout_saga_service/handlers/start.rs
  • tests/distributed_read_model/checkout_saga_service/service.rs
  • tests/distributed_read_model/main.rs
  • tests/distributed_read_model/projection_service/handlers/checkout.rs
  • tests/distributed_read_model/projection_service/handlers/mod.rs
  • tests/distributed_read_model/projection_service/handlers/seat.rs
  • tests/distributed_read_model/projection_service/mod.rs
  • tests/distributed_read_model/projection_service/service.rs
  • tests/distributed_read_model/seat_inventory_service/handlers/add.rs
  • tests/distributed_read_model/seat_inventory_service/handlers/reserve_started_checkout_seat.rs
  • tests/distributed_read_model/seat_inventory_service/service.rs
  • tests/distributed_read_model_board/board_service/handlers/board_add_card.rs
  • tests/distributed_read_model_board/board_service/handlers/board_move_card.rs
  • tests/distributed_read_model_board/board_service/handlers/board_open.rs
  • tests/distributed_read_model_board/board_service/handlers/board_remove_card.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/distributed_read_model_board/projections_service/handlers/mod.rs
  • tests/distributed_read_model_board/projections_service/mod.rs
  • tests/microsvc/convention.rs
  • tests/microsvc/handlers/counter_create.rs
  • tests/microsvc/handlers/counter_increment.rs
  • tests/microsvc/handlers/whoami.rs
  • tests/sagas/handlers/inventory/init.rs
  • tests/sagas/handlers/inventory/reserve.rs
  • tests/sagas/handlers/orders/complete.rs
  • tests/sagas/handlers/orders/create.rs
  • tests/sagas/handlers/payments/process.rs
  • tests/sagas/handlers/saga/on_inventory_reserved.rs
  • tests/sagas/handlers/saga/on_order_completed.rs
  • tests/sagas/handlers/saga/on_order_created.rs
  • tests/sagas/handlers/saga/on_payment_succeeded.rs
  • tests/sagas/handlers/saga/start.rs

Comment thread src/microsvc/service.rs Outdated
Comment thread src/microsvc/service.rs Outdated
Comment thread tests/sagas/handlers/saga/on_inventory_reserved.rs Outdated
Comment thread tests/sagas/handlers/saga/on_order_completed.rs Outdated
Comment thread tests/sagas/handlers/saga/on_order_created.rs Outdated
Comment thread tests/sagas/handlers/saga/on_payment_succeeded.rs Outdated
Removes per-handler SPEC constants and has register_handlers! construct HandlerSpec values from COMMAND, EVENT, and EVENTS constants.
Drops the compatibility Service::command, Service::command_guarded, and Service::commands APIs and updates tests/docs to use HandlerSpec registration.
@patrickleet patrickleet changed the title feat: add microsvc handler message specs feat: async microsvc transports May 27, 2026
Adds HandlerBuilder so command/event registration can use .handle(...) or .guarded(...), with envelope selection before registration.
Removes envelope input modes so handlers always get ctx.message() plus ctx.input::<T>() for JSON payload decoding.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/microsvc/service.rs (1)

791-793: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Normalize metadata keys for consistent session lookups.

message_to_session() preserves metadata key casing, but the HTTP transport lowercases headers before creating Session. This makes ctx.user_id() and ctx.role() transport-dependent when metadata arrives with different casing (e.g., X-Hasura-User-Id vs x-hasura-user-id).

Suggested fix
 fn message_to_session(message: &Message) -> Session {
-    let vars: HashMap<String, String> = message.metadata.iter().cloned().collect();
+    let vars: HashMap<String, String> = message
+        .metadata
+        .iter()
+        .map(|(key, value)| (key.to_ascii_lowercase(), value.clone()))
+        .collect();
     Session::from_map(vars)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/service.rs` around lines 791 - 793, message_to_session currently
preserves the original metadata key casing which leads to transport-dependent
lookups (e.g., X-Hasura-User-Id vs x-hasura-user-id); update message_to_session
(where vars: HashMap<String,String> is built and passed to Session::from_map) to
normalize keys the same way HTTP transport does — convert all metadata keys to
lowercase (or apply the same canonical normalization) before inserting into vars
so ctx.user_id()/ctx.role() work consistently regardless of header casing.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/microsvc/service.rs`:
- Around line 243-250: The payload_bitcode function currently uses
bitcode::deserialize while bounding T: serde::de::DeserializeOwned, causing a
trait mismatch; update payload_bitcode to call
bitcode::serde::deserialize(&self.payload) (keeping T:
serde::de::DeserializeOwned) OR alternatively change the generic bound to T:
bitcode::Decode and keep bitcode::deserialize. Locate the function
payload_bitcode in service.rs and replace the deserialize call accordingly
and/or adjust the trait bound to match the chosen bitcode::deserialize variant
so the types align.

---

Duplicate comments:
In `@src/microsvc/service.rs`:
- Around line 791-793: message_to_session currently preserves the original
metadata key casing which leads to transport-dependent lookups (e.g.,
X-Hasura-User-Id vs x-hasura-user-id); update message_to_session (where vars:
HashMap<String,String> is built and passed to Session::from_map) to normalize
keys the same way HTTP transport does — convert all metadata keys to lowercase
(or apply the same canonical normalization) before inserting into vars so
ctx.user_id()/ctx.role() work consistently regardless of header casing.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: addd8cbd-a960-467f-b3b8-4ddf98b31882

📥 Commits

Reviewing files that changed from the base of the PR and between e6cf9ca and 6bdf20a.

📒 Files selected for processing (10)
  • src/microsvc/context.rs
  • src/microsvc/mod.rs
  • src/microsvc/service.rs
  • tests/distributed_read_model/projection_service/handlers/checkout.rs
  • tests/distributed_read_model/projection_service/handlers/mod.rs
  • tests/distributed_read_model/projection_service/handlers/seat.rs
  • tests/distributed_read_model/projection_service/service.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/distributed_read_model_board/projections_service/handlers/mod.rs
  • tests/distributed_read_model_board/projections_service/mod.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • tests/distributed_read_model/projection_service/handlers/seat.rs
  • tests/distributed_read_model/projection_service/service.rs
  • tests/distributed_read_model/projection_service/handlers/mod.rs

Comment thread src/microsvc/service.rs
Comment on lines +243 to +250
pub fn payload_bitcode<T: serde::de::DeserializeOwned>(&self) -> Result<T, HandlerError> {
bitcode::deserialize(&self.payload).map_err(|e| {
HandlerError::DecodeFailed(format!(
"invalid bitcode payload for message '{}': {}",
self.name, e
))
})
}
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

bitcode 0.6 rust crate deserialize trait requirements

💡 Result:

In the bitcode crate version 0.6, the requirements for deserialization depend on whether you are using the crate's native bitcode traits or its optional Serde integration [1][2][3]. Native Traits: If you are using bitcode's native API, the function bitcode::deserialize requires the type T to implement the bitcode::Decode trait [4]. To implement Decode for your custom types, the recommended approach is to use the provided derive macro, #[derive(Decode)] [5], which is available when the "derive" feature is enabled (this is part of the default feature set) [2][3]. Serde Integration: The bitcode crate also supports Serde as an optional feature [1][3]. If you choose to use serde::Deserialize (often via bitcode::serde::deserialize), the requirement is that your type T must implement the serde::Deserialize trait [6]. This integration is gated behind the "serde" feature flag in your Cargo.toml [1][3]. Key Implementation Requirements: 1. Feature Flags: Ensure the "derive" feature is enabled in your Cargo.toml to use #[derive(Decode)] [2][3]. If using Serde, you must explicitly enable the "serde" feature [1][3]. 2. Trait Bounds: - For native decoding: The type must satisfy T: Decode [6][4]. - For Serde decoding: The type must satisfy serde::Deserialize [6]. 3. Constraints: The bitcode::Decode trait is not dyn-compatible (object-safe) [5]. For most use cases, adding #[derive(bitcode::Decode, bitcode::Encode)] to your structs or enums is the standard way to satisfy the requirements for bitcode's native binary format [2][3][5].

Citations:


Fix payload_bitcode trait mismatch with bitcode::deserialize
In src/microsvc/service.rs (lines 243-250), payload_bitcode<T: serde::de::DeserializeOwned> calls bitcode::deserialize(&self.payload), but in bitcode 0.6 the native bitcode::deserialize expects T: bitcode::Decode (Serde integration uses bitcode::serde::deserialize for T: serde::Deserialize). Update the call to bitcode::serde::deserialize or change the bound to bitcode::Decode.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/service.rs` around lines 243 - 250, The payload_bitcode function
currently uses bitcode::deserialize while bounding T:
serde::de::DeserializeOwned, causing a trait mismatch; update payload_bitcode to
call bitcode::serde::deserialize(&self.payload) (keeping T:
serde::de::DeserializeOwned) OR alternatively change the generic bound to T:
bitcode::Decode and keep bitcode::deserialize. Locate the function
payload_bitcode in service.rs and replace the deserialize call accordingly
and/or adjust the trait bound to match the chosen bitcode::deserialize variant
so the types align.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this unchanged because this finding does not apply to the crate version/features in this repo. Cargo.toml uses bitcode = 0.6.9 with features = ["serde"], and that crate re-exports the serde serializer/deserializer at the crate root via pub use crate::serde::*. cargo test --all-features compiles and passes with payload_bitcode<T: DeserializeOwned>, so there is no trait-bound mismatch here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrickleet, thanks for the clarification — that makes sense. With bitcode = "0.6.9" and features = ["serde"] enabled, the crate re-exports its serde integration at the crate root (pub use crate::serde::*), so bitcode::deserialize already accepts T: serde::de::DeserializeOwned and there is no trait-bound mismatch. I'll record this for future reviews.


✏️ Learnings added
Learnt from: patrickleet
Repo: patrickleet/sourced_rust PR: 43
File: src/microsvc/service.rs:249-256
Timestamp: 2026-05-27T20:53:23.241Z
Learning: In `src/microsvc/service.rs` (Rust), `payload_bitcode<T: serde::de::DeserializeOwned>` calls `bitcode::deserialize(&self.payload)`. This is valid and compiles correctly because `Cargo.toml` uses `bitcode = "0.6.9"` with `features = ["serde"]`, which re-exports the serde-based deserializer at the crate root via `pub use crate::serde::*`. Do not flag this as a trait-bound mismatch; `cargo test --all-features` passes without changes.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

@patrickleet patrickleet merged commit a0bb3e7 into main May 27, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant