Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3186c2b
refactor: remove old pattern
patrickleet May 28, 2026
9f0dfc1
feat(transport): async transport foundation in microsvc::transport
patrickleet May 29, 2026
bb10271
test(transport): reusable conformance harness + in-memory reference run
patrickleet May 29, 2026
70298e1
feat(transport): Postgres durable receive via OutboxSource
patrickleet May 29, 2026
b7ca692
feat(transport): NATS JetStream adapter + integration tests + CI
patrickleet May 29, 2026
a5b9a8b
feat(transport): RabbitMQ (AMQP) adapter + integration tests + CI
patrickleet May 29, 2026
512b688
feat(transport): Knative CloudEvents HTTP ingress
patrickleet May 29, 2026
8a78405
feat(transport): Kafka adapter + integration tests + CI
patrickleet May 29, 2026
2b1b372
ci: extract integration tests to reusable workflows; run on push-to-main
patrickleet May 29, 2026
efbf5bc
docs(transport): add async transports guide
patrickleet May 29, 2026
5bab4ec
feat(transport): add Bus/BusConsumer facade + InMemoryBus
patrickleet May 29, 2026
d59ec24
feat(transport): add NatsBus (send/listen + publish/subscribe)
patrickleet May 29, 2026
5e80467
feat(transport): add PostgresBus (work queue + log/offset fan-out)
patrickleet May 29, 2026
0121e45
feat(transport): add RabbitBus (send/listen + publish/subscribe)
patrickleet May 29, 2026
14de16a
feat(transport): add KafkaBus (send/listen + publish/subscribe)
patrickleet May 29, 2026
9cc8b40
feat(transport): add KnativeBus (Bus produce + manifest generation)
patrickleet May 29, 2026
e19fb3a
docs(transport): document the bus facade + transport-swap example
patrickleet May 29, 2026
f528994
ci: install libcurl dev headers for the rdkafka build
patrickleet May 29, 2026
f91c9fc
docs(read-model): clarify ReadModelCommitOutcome is an intentional stub
patrickleet May 29, 2026
4ef3a6c
fix(transport): address CodeRabbit review on PR #47
patrickleet May 29, 2026
6213cc8
feat(inbox): add InboxReceipt/InboxOutcome + CommitBatch participant …
patrickleet May 29, 2026
266e7ee
feat(inbox): persist consumer_inbox receipts across all backends
patrickleet May 29, 2026
bb2209e
test+fix(inbox): empty-receipt parity + shared conformance scenario
patrickleet May 29, 2026
94f69f7
test(matrix): generic transport×persistence harness + in-memory cell
patrickleet May 29, 2026
bb6230f
test(matrix): full transport×persistence matrix over the bus facade
patrickleet May 29, 2026
b35f771
test(matrix): complete transport×persistence grid + refactor gold-sta…
patrickleet May 29, 2026
6eb448f
test(cutover): migrate transport_subscribe onto the async InMemoryBus
patrickleet May 29, 2026
1cfaef4
feat(read-model): async ReadModelWorkspace (load_async/commit_async p…
patrickleet May 29, 2026
b0b0e79
feat(queued-repo): async QueuedRepository — per-aggregate serializati…
patrickleet May 29, 2026
f9d175f
test(cutover): migrate transport_listen onto the async InMemoryBus
patrickleet May 29, 2026
a87edcf
test(cutover): migrate microsvc_saga distributed test onto the async …
patrickleet May 29, 2026
cf18eda
test(cutover): remove superseded raw-legacy-bus saga tests (distribut…
patrickleet May 29, 2026
c0bd6a5
test(cutover): decouple projection handlers from bus::Event; migrate …
patrickleet May 29, 2026
a64f92c
refactor!: remove the legacy sync bus
patrickleet May 29, 2026
27f58ea
feat(microsvc)!: async handler model (core) — handlers become async fn
patrickleet May 30, 2026
5e70ac1
test(microsvc)!: migrate all integration test crates to async handlers
patrickleet May 30, 2026
5a67159
refactor!: remove the sync repository API — the crate is now async-only
patrickleet May 30, 2026
e2da936
test: gate matrix table_schema_registry helper to postgres/sqlite
patrickleet May 30, 2026
47466ea
test: address CodeRabbit review (block_on, handler panic, weak assert…
patrickleet May 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/integration-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Kafka Integration Tests

# Reusable workflow: referenced via `uses: ./.github/workflows/integration-kafka.yaml`
# from both the PR-quality and push-to-main pipelines.
on:
workflow_call:

jobs:
kafka:
name: Kafka Integration Tests
runs-on: ubuntu-latest
services:
kafka:
image: apache/kafka:3.8.0
ports:
- 9092:9092
env:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
options: >-
--health-cmd "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092"
--health-interval 10s
--health-timeout 10s
--health-retries 20
--health-start-period 20s
env:
CARGO_TERM_COLOR: always
KAFKA_BROKERS: localhost:9092
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
# ubuntu-latest ships cmake + gcc, which rdkafka's cmake-build needs. It
# also has libcurl's runtime but not its headers; librdkafka's bundled
# build enables curl (OAUTHBEARER OIDC) when it finds the lib and then
# needs curl/curl.h, so install the dev package.
- name: Install librdkafka build dependencies
run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev
- name: Run Kafka transport integration tests
run: cargo test --test kafka_transport --features kafka --verbose
32 changes: 32 additions & 0 deletions .github/workflows/integration-nats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: NATS JetStream Integration Tests

# Reusable workflow: referenced via `uses: ./.github/workflows/integration-nats.yaml`
# from both the PR-quality and push-to-main pipelines.
on:
workflow_call:

jobs:
nats:
name: NATS JetStream Integration Tests
runs-on: ubuntu-latest
env:
CARGO_TERM_COLOR: always
NATS_URL: nats://localhost:4222
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
# NATS service containers can't override the entrypoint to enable
# JetStream, so start it as a step instead.
- name: Start NATS (JetStream)
run: |
docker run -d --name nats -p 4222:4222 nats:2.10-alpine -js
for i in $(seq 1 30); do
if nc -z localhost 4222; then echo "nats up"; break; fi
sleep 1
done
- name: Run NATS transport integration tests
run: cargo test --test nats_transport --features nats --verbose
46 changes: 46 additions & 0 deletions .github/workflows/integration-postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Postgres Integration Tests

# Reusable workflow: referenced via `uses: ./.github/workflows/integration-postgres.yaml`
# from both the PR-quality and push-to-main pipelines.
on:
workflow_call:

jobs:
postgres:
name: Postgres Integration Tests
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourced_rust_test
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres -d sourced_rust_test"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
CARGO_TERM_COLOR: always
DATABASE_URL: postgres://postgres:postgres@localhost:5432/sourced_rust_test
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
# These tests run with --all-features, which compiles rdkafka; librdkafka's
# bundled cmake build needs curl/curl.h (the runner has libcurl's runtime
# but not its headers).
- name: Install librdkafka build dependencies
run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev
- name: Run Postgres integration tests
run: |
cargo test --test postgres_repository --all-features --verbose
cargo test --test postgres_repository_conformance --all-features --verbose
cargo test --test postgres_transport --all-features --verbose
cargo test --test distributed_read_model --all-features --verbose
33 changes: 33 additions & 0 deletions .github/workflows/integration-rabbitmq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: RabbitMQ Integration Tests

# Reusable workflow: referenced via `uses: ./.github/workflows/integration-rabbitmq.yaml`
# from both the PR-quality and push-to-main pipelines.
on:
workflow_call:

jobs:
rabbitmq:
name: RabbitMQ Integration Tests
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.13-management-alpine
ports:
- 5672:5672
options: >-
--health-cmd "rabbitmq-diagnostics -q ping"
--health-interval 5s
--health-timeout 5s
--health-retries 20
env:
CARGO_TERM_COLOR: always
AMQP_URL: amqp://guest:guest@localhost:5672/%2f
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
- name: Run RabbitMQ transport integration tests
run: cargo test --test rabbitmq_transport --features rabbitmq --verbose
41 changes: 10 additions & 31 deletions .github/workflows/on-pr-quality.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,13 @@ jobs:
cargo_incremental: true

postgres:
name: Postgres Integration Tests
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourced_rust_test
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres -d sourced_rust_test"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
CARGO_TERM_COLOR: always
DATABASE_URL: postgres://postgres:postgres@localhost:5432/sourced_rust_test
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
- name: Run Postgres integration tests
run: |
cargo test --test postgres_repository --all-features --verbose
cargo test --test postgres_repository_conformance --all-features --verbose
cargo test --test distributed_read_model --all-features --verbose
uses: ./.github/workflows/integration-postgres.yaml

nats:
uses: ./.github/workflows/integration-nats.yaml

rabbitmq:
uses: ./.github/workflows/integration-rabbitmq.yaml

kafka:
uses: ./.github/workflows/integration-kafka.yaml
43 changes: 11 additions & 32 deletions .github/workflows/on-push-main-version-and-tag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,22 @@ jobs:
cargo_incremental: true

postgres:
name: Postgres Integration Tests
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourced_rust_test
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres -d sourced_rust_test"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
CARGO_TERM_COLOR: always
DATABASE_URL: postgres://postgres:postgres@localhost:5432/sourced_rust_test
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
- name: Run Postgres integration tests
run: |
cargo test --test postgres_repository --all-features --verbose
cargo test --test postgres_repository_conformance --all-features --verbose
cargo test --test distributed_read_model --all-features --verbose
uses: ./.github/workflows/integration-postgres.yaml

nats:
uses: ./.github/workflows/integration-nats.yaml

rabbitmq:
uses: ./.github/workflows/integration-rabbitmq.yaml

kafka:
uses: ./.github/workflows/integration-kafka.yaml

# This uses commit logs and tags from git to determine the next version number and create a tag for the release.
# Some commits such as chore: will not trigger a version bump and tag; this is by design.
version-and-tag:
name: Version and Tag
needs: [quality, postgres]
needs: [quality, postgres, nats, rabbitmq, kafka]
uses: unbounded-tech/workflow-vnext-tag/.github/workflows/workflow.yaml@v1.20.2
secrets:
DEPLOY_KEY: ${{ secrets.DEPLOY_KEY }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ jobs:
with:
components: llvm-tools-preview
- uses: taiki-e/install-action@cargo-llvm-cov
# --all-features compiles rdkafka; librdkafka's bundled cmake build needs
# curl/curl.h (the runner has libcurl's runtime but not its headers).
- name: Install librdkafka build dependencies
run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev
- name: Generate coverage report
run: cargo llvm-cov --all-features --verbose --lcov --output-path lcov.info
- name: Upload coverage artifact
Expand Down
15 changes: 11 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@ categories = ["data-structures", "database"]
path = "src/lib.rs"

[features]
default = ["emitter", "bus"]
default = ["emitter"]
emitter = ["dep:event-emitter-rs"]
bus = []
http = ["bus", "dep:axum", "dep:tokio"]
grpc = ["bus", "dep:tonic", "dep:prost", "dep:tokio"]
http = ["dep:axum", "dep:reqwest", "dep:tokio"]
grpc = ["dep:tonic", "dep:prost", "dep:tokio"]
postgres = ["dep:sqlx", "dep:tokio", "sqlx/postgres", "sqlx/runtime-tokio"]
sqlite = ["dep:sqlx", "dep:tokio", "sqlx/runtime-tokio", "sqlx/sqlite"]
nats = ["dep:async-nats", "dep:futures", "dep:tokio"]
rabbitmq = ["dep:lapin", "dep:futures", "dep:tokio"]
kafka = ["dep:rdkafka", "dep:tokio"]

[dependencies]
async-nats = { version = "0.38", optional = true }
axum = { version = "0.7", optional = true }
base64 = "0.22.1"
futures = { version = "0.3", optional = true }
lapin = { version = "2", optional = true }
rdkafka = { version = "0.36", features = ["cmake-build", "tokio"], optional = true }
reqwest = { version = "0.12", default-features = false, optional = true }
bitcode = { version = "0.6.9", features = ["serde"] }
event-emitter-rs = { version = "0.1.4", optional = true }
serde = { version = "1.0.210", features = ["derive"] }
Expand Down
61 changes: 61 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# Local dependencies for development and transport integration tests.
#
# docker compose up -d # start all
# docker compose up -d postgres # just one
#
# Then point each test's env var at the service and run the feature-gated target
# (each integration test skips when its env var is unset):
#
# DATABASE_URL=postgres://sourced:sourced@localhost:5432/sourced_rust \
# cargo test --test postgres_transport --features postgres
# AMQP_URL=amqp://guest:guest@localhost:5672/%2f \
# cargo test --test rabbitmq_transport --features rabbitmq
# KAFKA_BROKERS=localhost:9092 \
# cargo test --test kafka_transport --features kafka
# NATS_URL=nats://localhost:4222 \
# cargo test --test nats_transport --features nats
services:
postgres:
image: postgres:16
Expand All @@ -12,3 +28,48 @@ services:
interval: 2s
timeout: 5s
retries: 20

rabbitmq:
image: rabbitmq:3.13-management-alpine
ports:
- "5672:5672" # AMQP
- "15672:15672" # management UI
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 5s
timeout: 5s
retries: 20

kafka:
image: apache/kafka:3.8.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1"]
interval: 10s
timeout: 10s
retries: 30

nats:
image: nats:2.10-alpine
command: ["-js", "-m", "8222"] # JetStream + HTTP monitoring (for healthcheck)
ports:
- "4222:4222"
- "8222:8222"
healthcheck:
test: ["CMD-SHELL", "wget -q -O - http://localhost:8222/healthz || exit 1"]
interval: 5s
timeout: 5s
retries: 20
Loading
Loading