Skip to content

feat!: async microsvc transports + persistence matrix + async-only consolidation#47

Merged
patrickleet merged 39 commits into
mainfrom
feat/bus-transports
May 30, 2026
Merged

feat!: async microsvc transports + persistence matrix + async-only consolidation#47
patrickleet merged 39 commits into
mainfrom
feat/bus-transports

Conversation

@patrickleet
Copy link
Copy Markdown
Collaborator

@patrickleet patrickleet commented May 29, 2026

The umbrella PR to main. Builds the async microsvc transport layer, proves the distributed read-model flow across a transport×persistence matrix, and consolidates the crate to a single async-only repository + bus surface (removing the sync/async mix that caused subtle combination bugs).

184 files, +13,390 / −8,045 (39 commits). Each slice was implemented, put through adversarial multi-agent review with fixes applied, and committed separately. cargo test (default, no brokers): 490 passed / 0 failed; broker/feature cells verified against real infra; clippy --all-targets clean.


1. Async microservice transports

microsvc owns handler registration + dispatch; transport adapters own receive/ack/retry/publish. Shared vocabulary lives in microsvc::transport.

  • Core: TransportError, FailurePolicy/FailureAction, RunOptions/InboxHook, TransportCapabilities, stable-id rules; the AsyncMessageSource/ReceivedMessage + run_source receive loop; AsyncMessagePublisher, OutboxMessageMessage, OutboxDispatcher (claim→publish→complete); a reusable conformance harness + in-memory reference.
  • Bus facade: Bus (produce) + BusConsumer (listen/subscribe) over InMemoryBus/NatsBus/PostgresBus/RabbitBus/KafkaBus/KnativeBus.
  • Consumer inbox: a CommitBatch participant (consumer_inbox table) giving effectively-once (consumer, message_id) semantics across in-memory/SQLite/Postgres.
  • Adapters (feature-gated, verified against real brokers via docker): Postgres (OutboxSource, SKIP LOCKED + lease), NATS JetStream, RabbitMQ, Kafka, and Knative/HTTP CloudEvents ingress (200/503/422 mapping; knative_triggers() from subscription_plan()).
  • CI: compose.yaml + reusable .github/workflows/integration-*.yaml broker jobs on PRs and push-to-main.

2. Transport × persistence matrix

The gold-standard seat-checkout saga (domain flow → read-model projection → query) runs across a 6×3 grid{ InMemoryBus, NatsBus, RabbitBus, KafkaBus, PostgresBus, Knative } × { HashMap, SQLite, Postgres } — plus the HTTP/gRPC command surface, proving the abstraction holds over every transport and store. The same generic async flow + project_message_async + query helpers drive all cells.

3. Async-only consolidation (breaking)

End state: one async bus facade, one async repository surface; HashMap/SQLite/Postgres all behave the same.

  • Legacy sync bus removedsrc/bus/ (~1.4k lines), OutboxWorkerThread, the bus-gated service.rs surface + EventMessage bridges, and the bus Cargo feature (http/grpc no longer depend on it). Projection handlers decoupled from bus::Event.
  • Async ergonomic buildersReadModelWorkspace gains load_async/commit_async/workspace_async(); QueuedRepository gains a runtime-agnostic waker-based async lock + the full async repo surface, so .queued_async().async_aggregate::<T>() serializes per-aggregate get/commit (an adversarial review caught + fixed two latent unlock defects).
  • Async handler modelHandlerFn is a boxed Send future with an AsyncHandler HRTB trait, so async fn handle(ctx: &Context<'_, D>) registers directly; dispatch/invoke are async; guards stay sync.
  • Sync repository API deletedGet/Commit/Repository/GetOne/GetMany/TransactionalCommit, sync SnapshotStore/ReadModelWritePlanStore/RelationalReadModelQueryStore, sync AggregateRepository/QueuedRepository/CommitBuilder/OutboxCommit, and the now-unused sync lock module — across all backends.

⚠️ Breaking changes

  • Handlers are now async fn; Service::dispatch/dispatch_message/dispatch_request are async.
  • The synchronous repository/read-model/snapshot/lock APIs are removed — use the Async* equivalents, .queued_async(), and .async_aggregate().
  • The legacy bus::* module and OutboxWorkerThread are gone; use microsvc::transport (InMemoryBus + OutboxSource/run_source).

🤖 Generated with Claude Code

patrickleet and others added 3 commits May 28, 2026 16:28
Establishes the shared async transport layer in three reviewed slices:

- Core contracts: TransportError (retryable/permanent), FailurePolicy/FailureAction,
  RunOptions/ConsumerDeliveryMode/InboxHook, TransportCapabilities, stable-id rules.
- Source runner: AsyncMessageSource/ReceivedMessage + run_source (ack after handler
  success, retryable->nack, permanent->failure policy, ack-and-ignore unhandled,
  graceful stop, no swallowed errors).
- Publisher/outbox bridge: AsyncMessagePublisher, OutboxMessage->Message mapping
  (reserved x-sourced- metadata namespace), OutboxDispatcher (dispatch_ids/
  dispatch_batch sharing claim->publish->complete), claim-by-id across
  HashMap/SQLite/Postgres, From<RepositoryError> for TransportError.

Not feature-gated; executor-agnostic (no tokio dependency). Verified with
cargo test --all-features (242 lib unit tests + integration/conformance suites),
fmt, clippy, and doc-link checks.

Implements [[tasks/async-transport-core-contracts]],
[[tasks/async-message-source-runner]], and
[[tasks/async-message-publisher-outbox]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds tests/transport_conformance/mod.rs (adapter-neutral fakes: FakeSource,
FakeReceived, FakePublisher, recording service, plus source-runner and outbox
dispatcher contract fns) and the tests/transport_in_memory target that runs the
full contract against the in-memory reference. Concrete adapters reuse the
harness via #[path]. Adds OutboxDispatcher::publisher()/store() accessors.

Implements [[tasks/async-transport-conformance-tests]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Removes distributed processed-message idempotency (DB, API, stores, repos, tests) and adds a new async transport subsystem: core traits, runner, publishers, adapters (NATS/RabbitMQ/Knative), outbox dispatcher/source with explicit-ID claiming, conformance tests, and CI/compose updates.

Changes

Processed Message Idempotency Removal

Layer / File(s) Summary
Database schema cleanup
migrations/postgres/0001_initial.sql, migrations/sqlite/0001_initial.sql
Drop read_model_processed_messages table from initial migrations; Postgres adds aggregate_events(recorded_at) index.
Read-model session and API surface
src/read_model/session.rs, src/read_model/mod.rs, src/repository/async_repository.rs, src/lib.rs
Remove ProcessedMessageMark and related API: ReadModelWritePlan now only contains mutations; ReadModelCommitOutcome simplified; ReadModelAdapterCapabilities drops processed_messages; is_processed/is_processed_async removed; mark_processed APIs removed and re-exports cleared.
In-memory and HashMap store implementations
src/read_model/in_memory.rs, src/hashmap_repo/repository.rs
Remove processed_messages state, staging, and checks; adapt apply/commit signatures; drop is_processed delegations; update unit tests accordingly.
Postgres and SQLite repositories
src/postgres_repo/mod.rs, src/sqlite_repo/mod.rs
Remove SQL-level processed-message deduplication and insertion; sql_read_model_capabilities drops the flag; always commit read-model transactions after applying plans; add optional message_ids filtering to outbox claim queries.
Outbox & table plumbing
src/outbox/table.rs
Construct TableWritePlan with only mutations vector; tests adjusted accordingly.
Tests and examples
various tests/*
Remove or simplify tests that asserted processed-message marking or duplicate-skip behavior; delete the distributed-idempotency test file and adjust many projection/handler tests to no longer call mark_processed.
Exports
src/lib.rs
Remove ProcessedMessageMark from crate re-exports.

Async Transport Subsystem

Layer / File(s) Summary
Transport module foundation and core contracts
src/microsvc/mod.rs, src/microsvc/transport/mod.rs, src/microsvc/transport/source.rs
Add transport submodule; define AsyncMessageSource and ReceivedMessage contracts and document receive/publish semantics.
Runner and run options
src/microsvc/transport/runner.rs, src/microsvc/transport/run_options.rs, src/microsvc/transport/stable_id.rs
Implement run_source loop with failure-policy routing, inbox vs idempotent delivery modes, and stable message-id validation; include unit tests for ordering, failure routing, and inbox behavior.
Transport error / failure policy
src/microsvc/transport/error.rs, src/microsvc/transport/failure_policy.rs
Add TransportError/TransportErrorKind with classifier and mapping to FailureAction via FailurePolicy::resolve.
Publisher and outbox dispatcher/source
src/microsvc/transport/publisher.rs, src/microsvc/transport/outbox_dispatch.rs, src/microsvc/transport/outbox_source.rs
Add AsyncMessagePublisher trait and default publish_batch; add OutboxDispatcher to claim → map → publish → settle outbox rows and OutboxSource to expose outbox rows as an AsyncMessageSource.
Transport capabilities and adapters
src/microsvc/transport/capabilities.rs, src/microsvc/transport/nats.rs, src/microsvc/transport/rabbitmq.rs, src/microsvc/transport/knative.rs
Record per-transport capabilities; add NATS JetStream, RabbitMQ, and Knative/CloudEvents adapters with publish/receive/settle implementations and adapter tests.
Conformance tests, feature flags, CI, compose
tests/transport_conformance/*, tests/transport_in_memory/*, tests/nats_transport/*, tests/rabbitmq_transport/*, tests/knative_cloudevents/*, Cargo.toml, .github/workflows/on-pr-quality.yaml, compose.yaml
Add reusable transport conformance harness, in-memory and adapter integration tests, new Cargo feature flags for adapters, CI job update to run postgres transport tests, and compose services for RabbitMQ/Kafka/NATS.

Outbox Worker Enhancement

Layer / File(s) Summary
Outbox claim-by-ids
src/outbox_worker/store.rs, tests/persistent_repository_conformance/outbox.rs
Add optional message_ids: Option<Vec<String>> to ClaimOutboxMessages, implement ClaimOutboxMessages::for_ids(...), and adapt sync/async stores to honor explicit-id claim filters; add conformance tests verifying only requested IDs are claimed.

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

"I nibble through rows and headers bright,
No processed marks left in morning light,
Runners hum, publishers hop in time,
Outbox claims and settles one message, one chime,
A rabbit cheers the tests that pass tonight." 🐰✨

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/bus-transports

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/read_model/session.rs`:
- Around line 59-68: ReadModelCommitOutcome currently always reports applied
(ReadModelCommitOutcome::applied() / was_applied() -> true) which removes the
storage-side replay/dedupe signal; restore a proper commit outcome enum or add
explicit skipped/replay semantics and wire them into the storage adapters:
implement a non-applied outcome (e.g., Skipped), make
ReadModelCommitOutcome::was_applied return true only for the applied variant,
and ensure the storage implementations call mark_processed/was_skipped and
populate/read the read_model_processed_messages behavior used by Postgres/SQLite
adapters so the write-plan commit boundary acts as a replay barrier; update the
methods ReadModelCommitOutcome, applied, was_applied, mark_processed,
was_skipped and ensure read_model_processed_messages handling is present in the
storage adapter code paths or alternatively make projector inbox/stable-id
enforcement explicit in the public API so non-idempotent handlers are rejected.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 2d2a8743-5492-4d7d-99bf-616acca9dbe0

📥 Commits

Reviewing files that changed from the base of the PR and between c4656dc and bb10271.

📒 Files selected for processing (45)
  • migrations/postgres/0001_initial.sql
  • migrations/sqlite/0001_initial.sql
  • src/commit_builder/mod.rs
  • src/hashmap_repo/repository.rs
  • src/lib.rs
  • src/microsvc/mod.rs
  • src/microsvc/transport/capabilities.rs
  • src/microsvc/transport/error.rs
  • src/microsvc/transport/failure_policy.rs
  • src/microsvc/transport/mod.rs
  • src/microsvc/transport/outbox_dispatch.rs
  • src/microsvc/transport/publisher.rs
  • src/microsvc/transport/run_options.rs
  • src/microsvc/transport/runner.rs
  • src/microsvc/transport/source.rs
  • src/microsvc/transport/stable_id.rs
  • src/outbox/table.rs
  • src/outbox_worker/store.rs
  • src/postgres_repo/mod.rs
  • src/read_model/in_memory.rs
  • src/read_model/mod.rs
  • src/read_model/session.rs
  • src/repository/async_repository.rs
  • src/sqlite_repo/mod.rs
  • tests/async_repository/main.rs
  • tests/distributed_read_model/main.rs
  • tests/distributed_read_model/projection_service/handlers/checkout.rs
  • tests/distributed_read_model/projection_service/handlers/seat.rs
  • tests/distributed_read_model/projection_service/mod.rs
  • tests/distributed_read_model/projection_service/service.rs
  • tests/distributed_read_model_board/main.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/distributed_read_model_board/projections_service/mod.rs
  • tests/hashmap_repository_conformance/main.rs
  • tests/persistent_repository_conformance/outbox.rs
  • tests/persistent_repository_conformance/read_models.rs
  • tests/postgres_repository/main.rs
  • tests/postgres_repository_conformance/main.rs
  • tests/read_model_distributed_idempotency/main.rs
  • tests/read_model_relationship_includes/main.rs
  • tests/read_model_session/main.rs
  • tests/sqlite_repository/main.rs
  • tests/sqlite_repository_conformance/main.rs
  • tests/transport_conformance/mod.rs
  • tests/transport_in_memory/main.rs
💤 Files with no reviewable changes (9)
  • tests/read_model_distributed_idempotency/main.rs
  • tests/distributed_read_model/projection_service/service.rs
  • src/commit_builder/mod.rs
  • migrations/sqlite/0001_initial.sql
  • tests/distributed_read_model_board/projections_service/mod.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/read_model_relationship_includes/main.rs
  • migrations/postgres/0001_initial.sql
  • src/repository/async_repository.rs

Comment thread src/read_model/session.rs
OutboxSource<S: AsyncOutboxStore> turns any outbox store into an
AsyncMessageSource (claim -> Message -> settle by row status:
ack=complete, nack=release-for-retry, dead-letter/park=fail).
OutboxSource<PostgresOutboxStore> is the Postgres starter transport
(outbox-backed mode, FOR UPDATE SKIP LOCKED + lease, no new table).

Adds tests/postgres_transport integration tests (verified against real
Postgres: drain, concurrent SKIP-LOCKED claim safety, retry, dead-letter),
wires postgres_transport into the Postgres CI job, and adds RabbitMQ/Kafka/
NATS services to compose.yaml for local integration testing.

sqlxmq was evaluated (owner suggestion) and not adopted: its push-based
JobRegistry conflicts with our pull-based AsyncMessageSource/run_source
boundary; patterns borrowed per the spec, not the crate.

Implements [[tasks/postgres-transport-adapter-first-pass]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@patrickleet
Copy link
Copy Markdown
Collaborator Author

Progress update: conformance harness + Postgres transport (OutboxSource, outbox-backed) landed and verified against real Postgres (concurrent SKIP-LOCKED claim safety, retry, dead-letter). compose.yaml now defines postgres/rabbitmq/kafka/nats for local integration testing, and the Postgres transport runs in CI. Next: NATS/RabbitMQ adapters + CI jobs, Knative ingress, docs cutover.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/microsvc/transport/outbox_source.rs (1)

171-188: ⚡ Quick win

Clarify or replace the hand-rolled block_on to avoid hangs when futures can suspend.

outbox_source.rs’s helper busy-spins on Poll::Pending and never registers wakeups. The current tests don’t hit Pending because they use HashMapOutboxStore, whose AsyncOutboxStore methods are pure async move blocks with no .await (so they complete synchronously). If these tests ever switch to stores like SqliteOutboxStore/PostgresOutboxStore (which do .await), they can hang—so either use a real test executor or explicitly document/enforce the “never suspends” precondition for this helper.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/transport/outbox_source.rs` around lines 171 - 188, The custom
block_on in outbox_source.rs busy-loops and never registers wakeups, causing
hangs when futures suspend; replace it with a real executor call (e.g. use
futures::executor::block_on or accept a runtime handle and call
tokio::runtime::Handle::block_on) in the block_on function so suspended futures
are properly woken and polled; alternatively change the API around
AsyncOutboxStore consumers to run on an injected executor or document/enforce
that AsyncOutboxStore methods never .await and add tests/assertions to prevent
using stores (like SqliteOutboxStore/PostgresOutboxStore) that suspend. Ensure
references to the function name block_on in outbox_source.rs are updated to use
the chosen executor approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/microsvc/transport/outbox_source.rs`:
- Around line 62-69: The builder currently allows lease == Duration::ZERO and
batch_size == 0 which silently misconfigures behavior (recv() will behave
incorrectly); add upfront validation in the builder methods `with_lease` and
`with_batch_size` to reject invalid values: in `with_lease(lease: Duration)`
assert/validate that `lease > Duration::ZERO` and return/panic with a clear
message if not, and in `with_batch_size(batch_size: usize)` assert/validate that
`batch_size > 0` and return/panic with a clear message if not; keep these checks
inside the existing `with_lease` and `with_batch_size` functions so callers fail
fast rather than causing `recv()` to silently misbehave.

---

Nitpick comments:
In `@src/microsvc/transport/outbox_source.rs`:
- Around line 171-188: The custom block_on in outbox_source.rs busy-loops and
never registers wakeups, causing hangs when futures suspend; replace it with a
real executor call (e.g. use futures::executor::block_on or accept a runtime
handle and call tokio::runtime::Handle::block_on) in the block_on function so
suspended futures are properly woken and polled; alternatively change the API
around AsyncOutboxStore consumers to run on an injected executor or
document/enforce that AsyncOutboxStore methods never .await and add
tests/assertions to prevent using stores (like
SqliteOutboxStore/PostgresOutboxStore) that suspend. Ensure references to the
function name block_on in outbox_source.rs are updated to use the chosen
executor approach.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: e64e78d8-31d7-4cae-b5b8-0ceb073d0b9b

📥 Commits

Reviewing files that changed from the base of the PR and between bb10271 and 70298e1.

📒 Files selected for processing (5)
  • .github/workflows/on-pr-quality.yaml
  • compose.yaml
  • src/microsvc/transport/mod.rs
  • src/microsvc/transport/outbox_source.rs
  • tests/postgres_transport/main.rs
✅ Files skipped from review due to trivial changes (1)
  • .github/workflows/on-pr-quality.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/microsvc/transport/mod.rs

Comment thread src/microsvc/transport/outbox_source.rs
NatsPublisher (publish ack threshold) and NatsJetStreamSource (durable pull
consumer; ack/nak/term settle) behind the nats feature, over the shared
AsyncMessagePublisher/AsyncMessageSource/run_source boundary. Stable id +
metadata ride as headers (Nats-Msg-Id is also the JetStream dedup key).

Adds tests/nats_transport (verified against nats:2.10 -js: round-trip +
metadata preservation), a nats CI job, and the nats compose service.

Implements [[tasks/nats-transport-adapter]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/nats_transport/main.rs (1)

62-72: 💤 Low value

Box::leak creates intentional memory leaks.

Using Box::leak(subject.clone().into_boxed_str()) to obtain a &'static str works but leaks memory on each test run. Since these are integration tests that run a bounded number of times, the practical impact is negligible. However, if the Service::event() API could accept &str or String instead of &'static str, this pattern wouldn't be necessary.

Also applies to: 108-120

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/nats_transport/main.rs` around lines 62 - 72, The test leaks memory by
using Box::leak(subject.clone().into_boxed_str()) to produce a &'static str for
Service::event; change the Service::event API to accept an owned String or a
generic Into<Cow<'static, str>>/impl Into<Cow<'_, str>> (or &str) so callers can
pass subject.clone() or subject.as_str() without leaking, then update the test
calls that use Box::leak (including the other occurrence at lines 108-120) to
pass the String/&str directly to Service::event.
src/microsvc/transport/nats.rs (1)

171-186: 💤 Low value

Consider documenting or making batch size configurable.

Fetching one message at a time (max_messages(1)) is straightforward but may limit throughput in high-volume scenarios. This is acceptable for a first implementation, but a future enhancement could allow configuring a prefetch buffer or larger batch size with internal iteration.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/transport/nats.rs` around lines 171 - 186, The recv method
currently hardcodes batch().max_messages(1) which limits throughput; make the
batch size configurable by adding a field (e.g., batch_size: usize or u32) to
the NATS transport struct and replace the literal 1 in recv with that field (use
consumer.batch().max_messages(self.batch_size) and keep
expires(self.fetch_timeout) as-is), update the constructor/initialization to
accept a configurable default (and document the new field in the struct
comment), and ensure NatsReceived::from_jetstream is still applied per message
when iterating the returned batch; consider validating a sensible default and
bounds for batch_size.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/microsvc/transport/nats.rs`:
- Around line 171-186: The recv method currently hardcodes
batch().max_messages(1) which limits throughput; make the batch size
configurable by adding a field (e.g., batch_size: usize or u32) to the NATS
transport struct and replace the literal 1 in recv with that field (use
consumer.batch().max_messages(self.batch_size) and keep
expires(self.fetch_timeout) as-is), update the constructor/initialization to
accept a configurable default (and document the new field in the struct
comment), and ensure NatsReceived::from_jetstream is still applied per message
when iterating the returned batch; consider validating a sensible default and
bounds for batch_size.

In `@tests/nats_transport/main.rs`:
- Around line 62-72: The test leaks memory by using
Box::leak(subject.clone().into_boxed_str()) to produce a &'static str for
Service::event; change the Service::event API to accept an owned String or a
generic Into<Cow<'static, str>>/impl Into<Cow<'_, str>> (or &str) so callers can
pass subject.clone() or subject.as_str() without leaking, then update the test
calls that use Box::leak (including the other occurrence at lines 108-120) to
pass the String/&str directly to Service::event.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 1e64c495-1c45-4533-b7ac-cbb2d7b89eaf

📥 Commits

Reviewing files that changed from the base of the PR and between 70298e1 and b7ca692.

📒 Files selected for processing (5)
  • .github/workflows/on-pr-quality.yaml
  • Cargo.toml
  • src/microsvc/transport/mod.rs
  • src/microsvc/transport/nats.rs
  • tests/nats_transport/main.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/microsvc/transport/mod.rs
  • .github/workflows/on-pr-quality.yaml

patrickleet and others added 2 commits May 28, 2026 22:11
RabbitPublisher (publisher-confirm threshold) and RabbitSource (basic_get;
ack/nack-requeue/reject settle) behind the rabbitmq feature, over the shared
transport traits. Stable id via the AMQP message_id property, metadata+kind via
headers.

Adds tests/rabbitmq_transport (verified against rabbitmq:3.13), a rabbitmq CI
job (service container), and updates the module docs for the NATS/RabbitMQ
adapters.

Implements [[tasks/rabbitmq-transport-adapter]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cloud_events_router parses binary + structured CloudEvents into the canonical
Message and calls Service::dispatch_message (the same boundary as run_source).
HTTP response is the ack: 200 success, 503 retryable, 422 permanent, 400
malformed. knative_triggers() renders Trigger YAML from subscription_plan().
Retry/DLQ is platform-managed by Knative here (not this crate's FailurePolicy).

Adds tests/knative_cloudevents (6 in-process HTTP integration tests). Behind the
http feature.

Implements [[tasks/knative-cloudevents-ingress]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/microsvc/transport/rabbitmq.rs`:
- Around line 120-137: The connect function currently declares the queue with
FieldTable::default() which causes messages rejected with requeue: false (in
your dead_letter/park handlers) to be discarded; update the connect function
(the connect method that calls connect_channel and queue_declare, and returns
Self::new) to either (A) build and pass a FieldTable that includes the
dead-letter/parking args (e.g., "x-dead-letter-exchange" and, if needed,
"x-dead-letter-routing-key"/"x-dead-letter-queue" entries) derived from
configuration or env and use that instead of FieldTable::default(), or (B)
validate that DLX/parking config exists and return a clear TransportError
(instead of declaring without args) when it does not; ensure you still wrap AMQP
errors with retryable("amqp queue_declare", err) and return the error type used
by connect (TransportError).
- Around line 63-80: message_properties currently only writes headers and
message_id, so content_type is lost; update message_properties to set AMQP
BasicProperties.content_type when Message.content_type is present (use
properties = properties.with_content_type(ShortString::from(ct)) where ct is
message.content_type.as_str()), and also update RabbitReceived::from_delivery to
read delivery.properties.content_type (if present) and pass that through to
Message::new instead of hardcoding "application/json" so the original
content_type survives the round-trip.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 2b995cbd-5a86-44d7-96f9-4fdb04832b45

📥 Commits

Reviewing files that changed from the base of the PR and between b7ca692 and a5b9a8b.

📒 Files selected for processing (5)
  • .github/workflows/on-pr-quality.yaml
  • Cargo.toml
  • src/microsvc/transport/mod.rs
  • src/microsvc/transport/rabbitmq.rs
  • tests/rabbitmq_transport/main.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/microsvc/transport/mod.rs

Comment thread src/microsvc/transport/rabbitmq.rs Outdated
Comment on lines +120 to +137
/// Connect, declare a durable queue, and poll it. The default exchange routes
/// a message published with routing key == `queue` into this queue, so the
/// queue name is the message name consumers subscribe to.
pub async fn connect(uri: &str, queue: &str) -> Result<Self, TransportError> {
let channel = connect_channel(uri).await?;
channel
.queue_declare(
queue,
QueueDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await
.map_err(|err| retryable("amqp queue_declare", err))?;
Ok(Self::new(channel, queue))
}
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Fix RabbitMQ default connect to avoid silently discarding dead_letter/park failures

connect declares the durable queue with FieldTable::default() (no x-dead-letter-exchange) while the dead_letter/park handlers reject with requeue: false (lines ~211-225). With no DLX configured, messages routed to dead_letter/park will be discarded instead of dead-lettered/parked (lines ~120-137).

Update the default connect path to either (a) declare DLX/parking-related queue args (e.g., x-dead-letter-exchange, and any required routing-key args) or (b) fail loudly with a clear TransportError when DLX/parking isn’t configured rather than rejecting without requeue.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/transport/rabbitmq.rs` around lines 120 - 137, The connect
function currently declares the queue with FieldTable::default() which causes
messages rejected with requeue: false (in your dead_letter/park handlers) to be
discarded; update the connect function (the connect method that calls
connect_channel and queue_declare, and returns Self::new) to either (A) build
and pass a FieldTable that includes the dead-letter/parking args (e.g.,
"x-dead-letter-exchange" and, if needed,
"x-dead-letter-routing-key"/"x-dead-letter-queue" entries) derived from
configuration or env and use that instead of FieldTable::default(), or (B)
validate that DLX/parking config exists and return a clear TransportError
(instead of declaring without args) when it does not; ensure you still wrap AMQP
errors with retryable("amqp queue_declare", err) and return the error type used
by connect (TransportError).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping this one, with reason. dead_letter/park reject with requeue: false, which is the AMQP signal for "stop redelivery." With no DLX configured the broker drops the message, which is the correct terminal outcome for "give up on this message" under at-least-once delivery — not a silent bug. The handler doc already states this ("routes to the queue's dead-letter exchange if one is configured, otherwise drops").

Forcing a DLX declaration (or failing loudly) in the default connect would impose a dead-letter/parking topology this thin adapter shouldn't own and would break the common no-DLX dev/test path. A configurable DLX/parking exchange is a reasonable enhancement, but it's a follow-up rather than part of this PR.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrickleet, that's a sound rationale. The AMQP reject(requeue: false) semantics are well-defined — broker drops without a DLX, which is the correct terminal outcome for this thin adapter under at-least-once delivery. Noted that the doc already covers the conditional DLX routing. I'll keep this in mind for future reviews of this adapter.

Would you like me to open a follow-up GitHub issue to track the configurable DLX/parking-exchange enhancement for a later PR?

(╥_╥)🐇


✏️ Learnings added
Learnt from: patrickleet
Repo: patrickleet/sourced_rust PR: 47
File: src/microsvc/transport/rabbitmq.rs:122-139
Timestamp: 2026-05-29T06:09:28.874Z
Learning: In `src/microsvc/transport/rabbitmq.rs`, `RabbitSource::connect` intentionally declares the queue with `FieldTable::default()` (no `x-dead-letter-exchange`). The `dead_letter`/`park` settlement methods reject with `requeue: false`, which causes the broker to drop the message when no DLX is configured — this is the correct terminal outcome under at-least-once delivery, not a silent bug. Configurable DLX/parking-exchange support is a planned follow-up, not part of the initial PR. The doc comments already state this conditional behavior.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/microsvc/transport/knative.rs (1)

86-92: 💤 Low value

Consider validating ce-source as required per CloudEvents spec.

The CloudEvents specification requires source as a mandatory context attribute alongside id and type. Currently, only ce-id and ce-type are validated, so malformed events missing ce-source will be accepted. If strict CloudEvents compliance is desired, add validation for ce-source.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/microsvc/transport/knative.rs` around lines 86 - 92, The parse_binary
function currently validates ce-id and ce-type but not ce-source; add validation
to require the ce-source header per the CloudEvents spec by reading
header(headers, "ce-source") and returning an Err (e.g., ok_or("missing
ce-source header")) when absent, then use the resulting source value like the
existing id/name/content_type variables so malformed events without ce-source
are rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/microsvc/transport/knative.rs`:
- Around line 190-193: The trigger_name built in knative_triggers (using
subscriber_service and event) can produce invalid Kubernetes names; add a
sanitize function (e.g., sanitize_k8s_name) and call it when composing
trigger_name to enforce RFC1123: convert to lowercase, replace any
non-alphanumeric characters with '-', collapse repeated '-' runs, trim
leading/trailing '-' so the name starts/ends with alphanumeric, and truncate to
63 characters (preserving uniqueness if needed); update the trigger_name
construction (and any other places using subscriber_service or event for
resource names) to use this sanitizer before formatting.

---

Nitpick comments:
In `@src/microsvc/transport/knative.rs`:
- Around line 86-92: The parse_binary function currently validates ce-id and
ce-type but not ce-source; add validation to require the ce-source header per
the CloudEvents spec by reading header(headers, "ce-source") and returning an
Err (e.g., ok_or("missing ce-source header")) when absent, then use the
resulting source value like the existing id/name/content_type variables so
malformed events without ce-source are rejected.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 5c0ce1d2-ba21-424e-b169-fc46a2537bc8

📥 Commits

Reviewing files that changed from the base of the PR and between a5b9a8b and 512b688.

📒 Files selected for processing (3)
  • src/microsvc/transport/knative.rs
  • src/microsvc/transport/mod.rs
  • tests/knative_cloudevents/main.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/microsvc/transport/mod.rs

Comment thread src/microsvc/transport/knative.rs Outdated
patrickleet and others added 3 commits May 28, 2026 22:29
KafkaPublisher (acks=all producer ack threshold) and KafkaSource (consumer
group, auto-commit off; ack=commit offset, nack=seek-back, dead-letter/park=
commit-skip) behind the kafka feature (rdkafka/librdkafka via cmake). recv rides
through transient broker-transport errors within a fetch-timeout budget.

Adds tests/kafka_transport (verified against apache/kafka:3.8.0 KRaft), a kafka
CI job (KRaft service container), and the kafka compose service.

Implements [[tasks/kafka-transport-adapter]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Moves the postgres/nats/rabbitmq/kafka integration jobs into reusable
workflow_call files (.github/workflows/integration-*.yaml) referenced via local
./ paths from both on-pr-quality and on-push-main-version-and-tag. The push-to-
main pipeline now runs all broker integration tests and gates version-and-tag on
them. Validated with actionlint.

Relates to [[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
docs/async-transports.md documents the transport layer: core contracts, the
two confirmation thresholds, the source runner, the publisher/outbox dispatcher,
all five adapters (in-memory, Postgres, NATS, RabbitMQ, Kafka, Knative), and how
to run the conformance + broker integration tests.

Progresses [[tasks/transport-docs-examples-cutover]] under
[[tasks/async-transport-implementation]].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@patrickleet patrickleet changed the title feat: async microservice transports (foundation + conformance, WIP) feat: async microservice transports May 29, 2026
@patrickleet
Copy link
Copy Markdown
Collaborator Author

All broker adapters (NATS, RabbitMQ, Kafka) + Postgres transport + Knative ingress are implemented and verified locally against real services (full cargo test --all-features + the four integration suites green: 34 test binaries, 0 failures). CI integration jobs were extracted into reusable ./.github/workflows/integration-*.yaml and now run on PRs and push-to-main. Remaining: the examples cutover / legacy-bus removal (breaking — needs your sign-off) and a Postgres poll/NOTIFY consumer daemon.

patrickleet and others added 7 commits May 28, 2026 23:48
Introduce the ergonomic servicebus-style surface over the async
transport traits: `Bus` (produce — send/publish/send_message/
publish_message) and `BusConsumer` (consume — listen/subscribe,
generic over the service data, deriving message names from the
service's command/event handlers and running through run_source).

Knative will implement only `Bus` (it consumes via generated Triggers
+ the HTTP ingress); pull transports implement both.

InMemoryBus is the dev/test reference implementation: competing-
consumer queues back send/listen (point-to-point, each message popped
once) and retained per-subscriber-cursor logs back publish/subscribe
(fan-out — every subscriber sees every event), the in-memory shape of
the Postgres-as-log fan-out model. 5 unit tests cover both semantics
plus unknown-command-ignored and handler-error-via-failure-policy.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
NatsBus implements Bus + BusConsumer over one JetStream stream bound to
`{namespace}.>`:

- send  → subject `{ns}.cmd.{name}` (command)
- publish → subject `{ns}.evt.{name}` (event)
- listen → durable pull consumer `{group}_cmd` filtered to the service's
  command subjects; replicas sharing a group share the durable, so
  JetStream load-balances — point-to-point / competing-consumer.
- subscribe → durable `{group}_evt`; each distinct group gets its own
  durable on the shared stream, so every group sees every event — fan-out.

Adds NatsJetStreamSource::with_strip_prefix (default off, backwards
compatible) so the dispatched message name is the bare name once the
`{ns}.cmd.`/`{ns}.evt.` subject prefix is removed.

Two integration tests prove competing-across-a-group (each command
handled exactly once by concurrent replicas) and fan-out-across-groups
(every group sees every event) against a live JetStream server. Also
makes tests/nats_transport unique() process-unique so re-runs against a
persistent server don't collide with leftover stream/consumer state.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
PostgresBus implements Bus + BusConsumer as a complete single-DB bus:

- send/listen (point-to-point): a bus_queue work table claimed
  FOR UPDATE SKIP LOCKED under a lease, so replicas sharing a group
  compete — each command handled once (ack=delete, nack=release,
  dead_letter/park=delete).
- publish/subscribe (fan-out): Postgres as a log — append-only bus_log
  (monotonic seq, retained) + per-consumer bus_offset (consumer →
  last_seq). publish appends; each group reads seq > last_seq for its
  event names in order and advances its own offset, so every group sees
  every event. ack advances the offset (the effectively-once point),
  nack leaves it for redelivery, dead_letter/park skips it.

ensure_tables() provisions the three tables (mirrors NatsBus::ensure_stream).

Implementation note: uses the spec's sanctioned claim-lease backend, not
sqlxmq. Decision #8 listed sqlxmq as recommended with claim-lease as the
no-dependency alternative; sqlxmq owns an always-on push JobRunner loop
that doesn't compose with the facade's uniform drain-to-idle run_source
model (a claim-lease source returns Ok(None) when empty and stops). The
module header records this; sqlxmq stays a viable future backend.

Two integration tests prove competing-across-a-group (each command once
via concurrent replicas) and fan-out-across-groups (each group reads
every event), against a live Postgres.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
RabbitBus implements Bus + BusConsumer over two AMQP exchange shapes:

- send/listen (point-to-point): the default exchange routes to a durable
  queue {ns}.cmd.{name}; send declares the queue and publishes with a
  publisher confirm. Replicas sharing a queue compete (AMQP round-robin).
- publish/subscribe (fan-out): a durable topic exchange {ns}.events;
  publish routes by event name; each subscriber declares its own queue
  {ns}.evt.{group} bound to the exchange for its event names, so every
  group receives every event.

The message name is resolved from the delivery routing key (stripping
the {ns}.cmd. prefix for commands). Exposes pub(super) connect_channel /
message_properties / RabbitReceived::from_delivery_with_name from the
adapter, and a public RabbitBus::ensure_subscription so a producer can
bind all subscriber queues before publishing (a topic exchange drops
events with no matching binding).

Two integration tests prove competing-across-a-group and
fan-out-across-groups against a live broker. Makes tests/rabbitmq_transport
unique() process-unique so durable queue/exchange names don't collide
across runs.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
KafkaBus implements Bus + BusConsumer; point-to-point vs fan-out is a
consumer-group choice:

- send/listen (point-to-point): commands → topics {ns}.cmd.{name};
  listen joins a shared group {ns}.{group}.cmd, so Kafka distributes
  partitions across members — each record handled by one replica.
- publish/subscribe (fan-out): events → topics {ns}.evt.{name};
  subscribe joins a per-service group {ns}.{group}.evt. Kafka delivers
  every record to every group, so each distinct group sees every event.

Adds KafkaSource::with_strip_prefix (default off) so the dispatched
message name is the topic minus its {ns}.cmd./{ns}.evt. prefix.

Two integration tests against a live broker: point-to-point is proven
deterministically (a second consumer in the same group reads nothing —
the group's offset is committed past the end), which avoids the
rebalance-redelivery flakiness a concurrent two-replica race would have;
fan-out is proven across two distinct groups each reading from earliest.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per the locked spec (Decision #6), KnativeBus implements only Bus and
NOT BusConsumer — Knative is a GitOps/HTTP transport with no in-process
consume loop:

- produce: send/publish POST a binary-mode CloudEvent to a broker-ingress
  URL ({ingress_base}/{namespace}/{broker}); publish targets the service's
  own {source}-events broker, send targets a downstream {commands_broker}.
  A message without an id is rejected (CloudEvents mandates `id`).
- consume = deploy-time artifacts: manifests(&plan, &subscriptions)
  renders role-based Broker + per-name Trigger YAML — own {source}-commands
  broker + command triggers if it handles commands, own {source}-events
  broker if publishes_events, and a Trigger per subscribed event on its
  producer's broker, with /cloudevent/<type> subscriber URIs. A .local(addr)
  builder switches subscribers to a kubefwd address.

Adds the per-type /cloudevent/{type} route to cloud_events_router
(Decision #7) and reqwest (optional, default-features off) under the http
feature for the POST.

Four tests: produce round-trips through a local cloud_events_router into
dispatch_message; missing id rejected; manifests render brokers/triggers;
pure-consumer (publishes_events=false) owns no broker and uses the local
subscriber URI.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a "Bus facade" section to docs/async-transports.md: the Bus /
BusConsumer surface, a transport-swap example (same service + handlers,
one constructor line changes), the per-transport competing-vs-fan-out
topology table, and the Knative Bus-only + manifests note. Refresh Status.

Implements [[tasks/build-transport-bus-facade]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@patrickleet
Copy link
Copy Markdown
Collaborator Author

Added: the Bus facade (send/listen + publish/subscribe)

Built the servicebus-style ergonomic surface on top of the transport adapters (tasks/build-transport-bus-facade, spec specs/transport-bus-facade):

  • Bus (produce: send/publish/send_message/publish_message) + BusConsumer (consume: listen/subscribe, generic over the service, deriving names from the service's command/event handlers and running through the shared run_source). Handler code and dispatch_message are unchanged across transports.
  • Six *Bus types, each shining in its native topology — point-to-point vs fan-out is a consumer-group/identity choice, not a flattened trait:
    • InMemoryBus — competing queue / retained-log-per-cursor (dev/test reference).
    • NatsBus — shared durable consumer (listen) / per-group durable (subscribe).
    • PostgresBusbus_queue FOR UPDATE SKIP LOCKED (listen) / bus_log + per-consumer bus_offset Kafka-style fan-out (subscribe).
    • RabbitBus — default exchange → durable queue (listen) / topic exchange → per-group queue (subscribe).
    • KafkaBus — shared consumer group (listen) / group-per-service (subscribe).
    • KnativeBusBus-only (produce → broker-ingress POST); consume = manifests(&plan, &subscriptions) role-based Broker/Trigger YAML + cloud_events_router (per-type /cloudevent/<type> routes).

Each transport's integration binary gained a competing-consumer test (one delivery across a shared group) and a fan-out test (every group sees every event), verified against the real broker. cargo test --all-features = 612 passed / 0 failed against live NATS / Postgres / RabbitMQ / Kafka.

docs/async-transports.md documents the facade + a transport-swap example.

Note on Decision #8: PostgresBus uses the spec's sanctioned claim-lease work queue rather than sqlxmq — sqlxmq's always-on push JobRunner doesn't compose with the facade's uniform drain-to-idle run_source model that every other *Bus shares (a claim-lease source returns Ok(None) when empty and stops cleanly). The reasoning is recorded in the postgres_bus.rs module header; sqlxmq remains a documented future backend.

🤖 Generated with Claude Code

patrickleet and others added 3 commits May 29, 2026 00:55
The kafka, postgres (--all-features), and coverage (--all-features) jobs
compile rdkafka-sys, whose bundled librdkafka cmake build enables curl
(OAUTHBEARER OIDC) when it finds the runner's libcurl runtime, then fails
with `curl/curl.h: No such file or directory` because the dev headers
aren't installed. Install libcurl4-openssl-dev in those three jobs.
nats/rabbitmq use narrow feature sets, build no rdkafka, and were green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address the CodeRabbit review on PR #47. was_applied() is hardcoded true
not by accident: the read_model_processed_messages dedupe table and the
skipped_duplicate outcome were deliberately removed (see
specs/consumer-inbox-design.md, 2026-05-28) because coupling delivery
dedupe to the read-model projection contract was the wrong boundary.
Replay safety is now a projection convention (idempotent handlers +
per-row ExpectedVersion OCC); a first-class replay barrier returns with
the consumer inbox as a CommitBatch participant. Document this on the
type so the always-true was_applied() isn't misread as a lost signal.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Three valid review findings:

- outbox_source: reject zero lease / batch_size in the builder. A zero
  batch makes recv() return Ok(None) forever; a zero lease makes claimed
  rows immediately re-claimable. Both are silent misconfigurations on a
  public builder — assert up front. (+ #[should_panic] tests)

- rabbitmq: preserve Message.content_type across the AMQP round-trip.
  message_properties now sets AMQP content_type, and from_delivery reads
  it back instead of letting Message::new hardcode application/json. Also
  fixes the same loss for RabbitBus (shares both paths). (+ round-trip
  test asserting a non-JSON content type survives)

- knative: sanitize generated Trigger names to RFC 1123 (lowercase,
  alphanumeric/'-', no leading/trailing '-', <=63 chars) via a shared
  sanitize_k8s_name helper, applied in both knative_triggers and
  KnativeBus::trigger_yaml. CloudEvent types can contain dots/uppercase
  that are invalid in k8s resource names. (+ unit tests)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
patrickleet and others added 3 commits May 29, 2026 01:41
…field

First slices of the consumer inbox (tasks/model-consumer-inbox-across-
persistence-implementations, per specs/consumer-inbox-design):

- New src/repository/inbox.rs: InboxReceipt { consumer, message_id,
  processed_at } and InboxOutcome { Processed, Duplicate } (Duplicate is
  success, never an error). Relocates the naming/semantics of the removed
  read_model_processed_messages into a first-class, non-read-model type.
- Add inbox_receipts: Vec<InboxReceipt> to both CommitBatch (sync) and
  AsyncCommitBatch (async), defaulted empty in new()/empty() and at every
  literal construction site. Trait signatures unchanged; empty everywhere
  so behavior is unchanged (257 lib tests green).

The receipt is a commit-batch participant so it commits atomically with
handler effects (the effect fence); storage writes + runner wiring follow
in subsequent slices.

Implements [[model-consumer-inbox-across-persistence-implementa]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Storage model for the consumer inbox (per specs/consumer-inbox-design):

- Migration: consumer_inbox operational table (PK (consumer, message_id),
  processed_at default) in both Postgres and SQLite. In-memory backend
  gains an inbox_store set.
- commit_batch / commit_batch_async write batch.inbox_receipts inside the
  existing commit transaction (the effect fence). The (consumer, message_id)
  primary key is the dedupe gate: a duplicate receipt rolls the whole batch
  back via the new RepositoryError::DuplicateInboxReceipt, so a redelivery's
  effects are never double-applied (in-memory checks the staged set; SQL maps
  the unique violation).
- New AsyncInboxStore::inbox_contains_async pre-check, implemented for
  in-memory / SQLite / Postgres, so a consumer can skip an already-processed
  message before opening a transaction.

Tests prove record + pre-check + dedupe + atomic rollback (a batch with a
duplicate and a fresh receipt rolls back whole) on all three backends:
in-memory unit test, SQLite, and live Postgres.

Implements [[model-consumer-inbox-across-persistence-implementa]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address the adversarial review of the consumer-inbox storage model:

- Parity (was: in-memory accepted empty consumer/message_id while the SQL
  CHECK rejected with a generic Model error): add InboxReceipt::validate()
  + RepositoryError::InvalidInboxReceipt, enforced in all commit paths, so
  every backend rejects an empty receipt with the same typed error (the SQL
  CHECK is now a backstop).
- Move inbox coverage into the shared persistent_repository_conformance
  suite (new inbox.rs), run against in-memory / SQLite / Postgres, replacing
  the hand-duplicated per-backend tests. The scenario now proves the fence
  against a REAL effect: a batch with a duplicate receipt AND an outbox write
  rolls the outbox write back too (effectively-once), plus multi-distinct
  receipts, cross-consumer independence, and empty-receipt rejection.
- Clarify that InboxReceipt.processed_at is advisory (stamped server-side,
  not persisted from the field today).

Deferred (low, noted): a barrier-based concurrent same-key commit test — the
fence is the consumer_inbox PRIMARY KEY (a well-defined engine contract) and
the unique-violation→rollback mapping is covered sequentially.

Implements [[model-consumer-inbox-across-persistence-implementa]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
patrickleet and others added 16 commits May 29, 2026 22:08
First slice of the distributed read-model matrix (async bus facade only,
no sync path). Ungate the generic async flow helpers so they are the
primary path, and add run_checkout_over_bus<B: Bus + BusConsumer, R>:
drive the seat-checkout domain flow + read-model projection + query on
persistence R, route the events over transport B, and assert the
projected checkout screen. Validated cell: HashMapRepository × InMemoryBus.

Refs [[tasks/transport-persistence-matrix]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The distributed read-model seat-checkout scenario now runs across every
async transport × persistence backend, all green against live brokers:

  transports : InMemoryBus, NatsBus, RabbitBus, KafkaBus, PostgresBus, Knative
  persistence: HashMapRepository, SqliteRepository, PostgresRepository

12 matrix cells (broker/DB cells skip when their env var is unset):
in-memory & sqlite over each of InMemory/NATS/Rabbit/Kafka/Knative,
in-memory & postgres-persistence over a Postgres bus / in-memory bus.

Knative is a first-class transport cell: KnativeBus POSTs CloudEvents to a
local cloud_events_router serving the projection sink (the HTTP/gRPC command
ingress is this same Knative surface) — no broker needed. RabbitMQ binds the
subscription before publishing (topic exchange drops unrouted events); NATS
ensures the stream; Postgres bus ensures its tables.

Shared helpers: build_collector (the transport sink), run_checkout_over_bus
(pull buses), run_checkout_over_knative (HTTP), project_and_assert_checkout.
All on the async bus facade — no sync path.

Refs [[tasks/transport-persistence-matrix]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ndard test onto the async bus

Refactor (not delete) the gold-standard seat_checkout_saga test onto the
async InMemoryBus: same services, choreography, projection, query, and
assertions — the legacy InMemoryQueue/OutboxWorkerThread/Subscribable
wiring is replaced by publish_pending_outbox (claim→publish→complete bridge)
+ bus.subscribe hops. The projection_service/query_service modules are kept.

Complete the matrix to the full 6×3 grid (18 cells), all green against live
brokers: { HashMap, SQLite, Postgres } persistence × { InMemoryBus, NatsBus,
RabbitBus, KafkaBus, PostgresBus, Knative } transport. Postgres-persistence
fixtures + Postgres-bus pairings added; broker/DB cells skip without env.

Full distributed_read_model suite: 23 passed (refactored sync test + 18
matrix cells + 2 async flow tests + HTTP/gRPC command tests).

Refs [[tasks/transport-persistence-matrix]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
First step of Phase 1 (legacy sync bus removal): the pub/sub transport test
now publishes events to InMemoryBus and drains them via bus.subscribe,
instead of Bus::from_queue(InMemoryQueue) + microsvc::subscribe. Proves the
migration pattern; the legacy bus src stays until all consumers are migrated.

Refs [[tasks/async-only-consolidation]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…arity)

The load -> mutate -> sync -> commit workspace ergonomic existed only over
the sync store traits; the async path used the bare write-plan builder. This
restores parity: the mutation/sync/diff surface is store-independent, so the
same `ReadModelWorkspace` now gains `load_async`/`commit_async` over the
`Async{ReadModelWritePlanStore,RelationalReadModelQueryStore}` traits, plus
`AsyncReadModelLoadBuilder` and `AsyncReadModelWorkspaceExt::workspace_async()`.

No struct extraction or duplicated diff logic: `load`/`commit` move to small
sync- and async-bound impl blocks; everything else stays shared and unbounded.

Proven with async mirrors of the include-hydration and sync-roundtrip tests
on `InMemoryReadModelStore` (impls both async store traits). Sync workspace
API and its tests unchanged.

Part of [[tasks/async-only-consolidation]] (Phase 2).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…on over the async surface

Async paths previously bypassed QueuedRepository entirely (AsyncCommitBuilder
commits straight through commit_batch_async), so two concurrent async commits
to the same aggregate could interleave. This restores the queueing ability for
async: `repo.queued_async().async_aggregate::<T>()` serializes per-aggregate
get/commit exactly like the sync `.queued().aggregate::<T>()`.

Lock primitive (runtime-agnostic — no tokio dep, matching the crate's RPITIT
async surface):
- AsyncLock / AsyncLockManager traits + InMemoryAsyncLock / InMemoryAsyncLockManager,
  a hand-rolled waker-based async mutex (try_lock/unlock stay sync; only acquire awaits).

QueuedRepository<R, AsyncLockManager> (struct/Clone bound moved to the impls so an
async lock manager is accepted):
- AsyncGetStream / AsyncTransactionalCommit with the sync locking contract:
  reads acquire+hold the per-stream lock, commit releases on success and holds on
  error, multi-locks acquired in sorted/deduped order. Keyed by StreamIdentity::storage_key
  consistently across get/commit/unlock.
- Non-locking forwards (drop-in completeness): AsyncSnapshotStore,
  AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, AsyncInboxStore.
- AsyncGetWithOpts / AsyncGetAllWithOpts (no_lock opt-out) + AsyncUnlockableRepository.
- Queueable::queued_async() / queued_async_with(); AsyncAggregateRepository gains
  get_with/peek/get_all_with/peek_all/abort/unlock mirroring the sync layer.

Adversarial review (3 lenses) found two latent defects in unlock(), both fixed:
waking wakers while holding the std Mutex guard could (1) poison/brick the lock if
a waker panics and (2) deadlock if a waker synchronously re-polls. unlock() now
drains under the guard and wakes outside it; regression tests cover both.

Tests: async lock unit tests (incl. re-entrant + panicking waker regressions) and
tests/queued_repo_async (mutual exclusion, per-aggregate granularity, no_lock peek,
abort release). Sync QueuedRepository API and its tests unchanged.

Part of [[tasks/async-only-consolidation]] (Phase 2).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replaces the legacy `Bus::from_queue`/`microsvc::listen` queue tests with the
async `InMemoryBus` + `BusConsumer::listen` (competing-consumer queues keyed by
command name). The legacy `stats.handled`/`stats.failed` handle has no async
analogue, so:
- success is asserted via domain outcomes (committed aggregate state), not counts;
- failure tolerance is asserted by showing the consumer drains past a failing
  message and still processes the rest;
- metadata->Session is verified through `whoami` over the bus (works via
  run_source -> dispatch_message -> message_to_session), with a negative control
  under FailurePolicy::Stop;
- arbitrary queue names ("counters"/"creates") become command-name routing, so
  two services on one bus consume disjoint command queues without competing.

Confirms Phase 1 needs no new runtime capability — metadata->Session already
works and the stats gap is a test-rewrite. microsvc crate: 15 passed.

Part of [[tasks/async-only-consolidation]] (Phase 1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…InMemoryBus

Replaces the threaded legacy-bus choreography (InMemoryQueue +
OutboxWorkerThread::spawn_routed + microsvc::listen per service + sleep-poll)
with a deterministic, thread-free drive over the async InMemoryBus:

- `publish_pending_outbox` claims each service's outbox and forwards messages by
  destination — worker-addressed messages are point-to-point commands
  (send_message → consumed via `listen`), saga-addressed messages are events
  (publish_message → consumed via `subscribe`).
- Each round uses a FRESH bus (the in-memory topic log is retained across reads,
  so a shared bus would re-deliver every prior event to the saga), forwards the
  pending outbox backlog, then drains the consumers. The loop ends when no
  service has pending work — i.e. the saga reached Completed.

The `stats.handled` assertions (no async analogue) are dropped in favor of the
existing domain assertions (saga/order Completed, inventory 95 available / 5
reserved, payment successful). Test 1 (saga_orchestrated) was already bus-free
and is unchanged. Both tests pass.

Part of [[tasks/async-only-consolidation]] (Phase 1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ed.rs)

tests/sagas/distributed.rs drove the order-fulfillment saga over the raw
`bus::Bus`/`Subscribable` API with hand-spawned threads and manual aggregate
handling (bus.subscribe(&[names]) -> events.recv() loops). The async InMemoryBus
has no raw-receiver equivalent — listen/subscribe are Service-driven — so the
file cannot be faithfully migrated; a rewrite would duplicate the async
microsvc_saga::saga_distributed test (same saga) plus the matrix metadata
coverage. Removed as superseded (owner-confirmed): no coverage is lost.

Also drops the now-unused event payloads in tests/sagas/order/events.rs (only
distributed.rs constructed them). sagas crate: 7 passed, no warnings.

Part of [[tasks/async-only-consolidation]] (Phase 1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…board onto InMemoryBus

The projection handlers of BOTH the gold-standard matrix and the board still
decoded via bus::Event (`Event::try_from(ctx.message())` + event.decode/
json_decode + event.id) — so the legacy bus could not be removed without
touching the gold-standard test. Decoupled both (refactor, not delete):

- decode straight from ctx.message().payload(): serde_json::from_slice for the
  matrix (JSON), BitcodePayloadCodec::decode for the board (bitcode — identical
  bytes to the old event.decode()); match on ctx.message().name(); event id from
  ctx.message().id(). Dropped the bus::Event `event()` helper from both
  projection handlers/mod.rs.
- board main.rs: replaced InMemoryQueue + OutboxWorkerThread + the threaded
  start_board_projection_service + wait_for_* polling with publish_pending_outbox
  (fan-out events) + a single bus.subscribe; the projection's monotonic
  source_version guard makes the per-event-type drain order-independent. Added
  projections_service::load_board (direct read) replacing the poll loop.

matrix: 2 passed (in-memory cell + refactored saga, both exercise the decode);
board: 3 passed; sagas: 7 passed. clippy/fmt clean.

Part of [[tasks/async-only-consolidation]] (Phase 1 — last test migration before
the src removal).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The legacy sync bus is fully superseded by the async bus facade (InMemoryBus +
the BusConsumer listen/subscribe + OutboxSource) and had no remaining consumers
after the test migrations. Removed:

- `src/bus/` entirely (Bus/Subscribable/InMemoryQueue/Listener/Sender/EventBus/
  Event/Publisher, ~1.4k lines).
- `OutboxWorkerThread` + WorkerStats + OutboxWorkerJoinError (the threaded
  outbox->bus bridge) and `src/outbox_worker/thread.rs`.
- The bus-gated `microsvc::service` surface: `dispatch_event`,
  `dispatch_listened_event`, `subscribe`/`listen`, `TransportHandle` +
  `TransportStats`/`TransportJoinError`, and the `From<&Event> for Message` /
  `TryFrom<&Message> for Event` / `from_bus_event` bridges (+ their unit tests).
- The `bus` Cargo feature (out of `default`); `http`/`grpc` no longer depend on
  it — they use the unconditional `microsvc::Message`, confirmed by building
  `--features http,grpc`.
- The bus-gated crate-root re-exports (`InMemoryQueue`, `bus::Message`,
  the threaded-worker types).

All consumers were migrated first (transport_subscribe/listen, microsvc_saga,
the board) or removed as superseded (sagas/distributed.rs), and both projection
handlers were decoupled from `bus::Event`. Default test sweep: 238 lib + all
integration crates green; `--features http,grpc` builds; clippy/fmt clean.

Closes Phase 1 of [[tasks/async-only-consolidation]] — one async bus facade,
no sync bus path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Converts the microsvc handler model from sync to async, the foundation for
dropping the sync repository API so all backends are async-only (the sync/async
mix was the source of subtle bugs).

Core (lib green; integration test crates migrated in follow-up commits):
- HandlerFn is now `dyn for<'a> Fn(&'a Context<'a, D>) -> Pin<Box<dyn Future<
  Output=Result<Value, HandlerError>> + Send + 'a>>`, with an `AsyncHandler<'a,D>`
  HRTB helper trait so `async fn handle(ctx: &Context<D>)` registers directly.
  Guards stay synchronous.
- Service::dispatch / dispatch_message / dispatch_request / invoke are async.
- dependencies.rs: HasRepo/HasReadModelStore now resolve via the ASYNC repo +
  read-model traits (+ HasRepo for AsyncAggregateRepository / AsyncSnapshotAggregateRepository).
- run_source + the http/grpc/knative transports await dispatch.
- src unit tests converted (async-closure handlers + awaited dispatch).

Handler authors write `async fn handle`; closures need an explicit ctx type
annotation and must extract owned values before the `async move` (the future
cannot borrow ctx across the await — an HRTB-closure limitation).

cargo build (default + --features http,grpc) green; 238 lib tests pass.
NOTE: tests/ integration crates still use the sync handler API and are migrated
in the following commits (all-or-nothing handler switch).

Part of [[tasks/async-only-consolidation]] (Phase 3).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Completes the integration half of the async handler switch: all 21 test crates
(microsvc, sagas, the gold-standard distributed_read_model matrix, the board,
the transport conformance crates, and the ~15 direct-repo crates) now use the
async handler + async repo API exclusively:

- handlers are `async fn handle(ctx: &Context<'_, D>)` with awaited
  ctx.repo().get/commit/peek and ctx.repo().outbox(msg).commit(&mut a).await;
  read-model handlers use workspace_async()/load_async()/commit_async().await.
- services build with .queued_async().async_aggregate(); inline handler closures
  use the `|ctx: &Context<D>| { extract ctx reads; async move { ... } }` form.
- test bodies await dispatch and the now-async repo reads.

Guards stay synchronous. Assertions and domain logic unchanged. The sync repo
trait surface is still present (deleted next); 502 default tests pass, the gold
-standard matrix's gated cells compile, http/grpc/sqlite cells pass.

Part of [[tasks/async-only-consolidation]] (Phase 3).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Deletes the entire synchronous repository/read-model/snapshot trait surface,
now unused after the async handler switch. This eliminates the sync/async mix
that was the source of subtle combination bugs: there is exactly one (async)
path for every backend.

Removed (traits + all backend impls + re-exports):
- repository: Get/Commit/Repository (repository.rs), GetOne/GetMany/Gettable
  (gettable.rs), the TransactionalCommit trait (batch.rs; CommitBatch kept).
- snapshot: sync SnapshotStore + sync SnapshotAggregateRepository/SnapshotOutboxCommit.
- read_model: sync ReadModelWritePlanStore/RelationalReadModelQueryStore, the sync
  ReadModelWorkspace load/commit impl, ReadModelLoadBuilder, ReadModelWorkspaceExt,
  and ReadModelWritePlanBuilder::commit (async equivalents kept).
- aggregate: GetAggregate/GetAllAggregates/CommitAggregate + the sync
  AggregateRepository/AggregateBuilder (AsyncAggregateRepository/Builder kept).
- commit_builder: SyncCommitBuilder/SyncStagedCommitBuilder/exts.
- outbox: SyncOutboxCommit/SyncOutboxCommitExt (outbox_sync/commit_sync).
- hashmap/postgres/sqlite/in-memory backends: their sync impls.
- queued_repo: the sync QueuedRepository impls + sync Queueable::queued; the sync
  lock module (Lock/LockManager/InMemoryLock/InMemoryLockManager) is now fully
  unused and deleted (Async lock variants kept; LockError kept).
- src/ unit tests that exercised the removed sync surface, converted to async.

Also converted 5 remaining fully-sync integration crates the earlier sweep
missed (bomberman [19 files], read_model_relationship_includes,
read_model_commit_bridge, sourced_upcasting, transport_conformance's store_outbox).

cargo test: 490 passed / 0 failed; --features http,grpc / postgres / sqlite all
build; clippy clean; no sync trait remains in src/.

Completes Phase 3 of [[tasks/async-only-consolidation]] — HashMap/SQLite/Postgres
all async-only and consistent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
It is only used by the postgres/sqlite-gated matrix cells, so it (and its
`TableSchemaRegistry` import) tripped a dead-code warning on the default build.
Gate both with cfg(any(feature = "postgres", feature = "sqlite")) to match the
call sites. Default clippy is now fully clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ion)

- src/ unit tests (outbox/commit, snapshot/in_memory, snapshot/repository,
  read_model/in_memory, commit_builder, outbox_worker/store, hashmap_repo):
  replace the custom busy-poll `block_on` (no-op waker, ignores Poll::Pending —
  would spin on any yielding future) with `#[tokio::test]`. Transport modules
  keep their intentionally runtime-free block_on.
- board projection handler: `event_version` returns Result<_, HandlerError>
  instead of panicking on a malformed message id; the handler propagates with
  `?`. Its unit test now asserts the error path.
- tests/todos: the bulk-commit roundtrip now asserts the commit succeeds and that
  exactly 3 todos are present (was: ignored result + an `if !empty` that masked
  failures). The concurrency-race commits (deliberately may lose the lock) keep
  their `let _ =`.

490 tests pass; clippy --all-targets clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@patrickleet patrickleet changed the title feat: async microservice transports feat!: async microsvc transports + persistence matrix + async-only consolidation May 30, 2026
@patrickleet patrickleet merged commit fb6f1e8 into main May 30, 2026
7 checks passed
patrickleet added a commit that referenced this pull request May 30, 2026
Address the CodeRabbit review on PR #47. was_applied() is hardcoded true
not by accident: the read_model_processed_messages dedupe table and the
skipped_duplicate outcome were deliberately removed (see
specs/consumer-inbox-design.md, 2026-05-28) because coupling delivery
dedupe to the read-model projection contract was the wrong boundary.
Replay safety is now a projection convention (idempotent handlers +
per-row ExpectedVersion OCC); a first-class replay barrier returns with
the consumer inbox as a CommitBatch participant. Document this on the
type so the always-true was_applied() isn't misread as a lost signal.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
patrickleet added a commit that referenced this pull request May 30, 2026
Three valid review findings:

- outbox_source: reject zero lease / batch_size in the builder. A zero
  batch makes recv() return Ok(None) forever; a zero lease makes claimed
  rows immediately re-claimable. Both are silent misconfigurations on a
  public builder — assert up front. (+ #[should_panic] tests)

- rabbitmq: preserve Message.content_type across the AMQP round-trip.
  message_properties now sets AMQP content_type, and from_delivery reads
  it back instead of letting Message::new hardcode application/json. Also
  fixes the same loss for RabbitBus (shares both paths). (+ round-trip
  test asserting a non-JSON content type survives)

- knative: sanitize generated Trigger names to RFC 1123 (lowercase,
  alphanumeric/'-', no leading/trailing '-', <=63 chars) via a shared
  sanitize_k8s_name helper, applied in both knative_triggers and
  KnativeBus::trigger_yaml. CloudEvent types can contain dots/uppercase
  that are invalid in k8s resource names. (+ unit tests)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant