Separate outbox worker store boundary#40
Conversation
Implements specs/outbox-worker-store-boundary
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis PR refactors the outbox pattern from repository-extension traits to dedicated ChangesOutbox Store API Refactoring
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/outbox_worker/store.rs (1)
242-274:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAlign claim ordering with SQL stores (created_at, then message_id).
Line 253/357 currently sorts by
message_idonly, while SQL store implementations claim bycreated_at ASC, message_id ASC(src/postgres_repo/mod.rsline range 348-352 andsrc/sqlite_repo/mod.rsline range 557-561 in provided snippets). This backend divergence can reorder delivery and produce inconsistent behavior across repositories.Suggested fix
- let mut ids = storage.keys().cloned().collect::<Vec<_>>(); - ids.sort(); + let mut ids = storage + .values() + .map(|m| (m.created_at, m.id().to_string())) + .collect::<Vec<_>>(); + ids.sort_by(|(l_ts, l_id), (r_ts, r_id)| l_ts.cmp(r_ts).then_with(|| l_id.cmp(r_id))); let mut claimed = Vec::new(); - for id in ids { + for (_, id) in ids { let Some(message) = storage.get_mut(&id) else { continue; };Also applies to: 342-381
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/outbox_worker/store.rs` around lines 242 - 274, The claim ordering currently sorts only by message id in the claim function (fn claim) causing divergence from SQL backends; change the iteration to collect entries from self.storage (iterate storage.iter() or storage.keys().cloned() together with their messages) and sort by (message.created_at, message_id) — i.e., order by created_at ASC then message_id ASC — before claiming; apply the same fix to the other similar loop noted around lines 342-381 so both places use created_at then id for consistent ordering across repositories (use the OutboxMessage.created_at field and the message id key from the map when building the sort key).
🧹 Nitpick comments (3)
docs/read-models.md (1)
264-264: ⚡ Quick winConsider showing the import or full path for
outbox_message_schema().The function
outbox_message_schema()is called without context. Readers may benefit from seeing where this function is imported from or its full path.📘 Suggested clarification
```rust +use sourced_rust::outbox::outbox_message_schema; + let mut registry = TableSchemaRegistry::new(); registry.register_schema(outbox_message_schema())?;Or use the full path inline:
-registry.register_schema(outbox_message_schema())?; +registry.register_schema(sourced_rust::outbox::outbox_message_schema())?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/read-models.md` at line 264, Show where outbox_message_schema() comes from by adding its import or using its full path when registering the schema: update the example that creates let mut registry = TableSchemaRegistry::new(); to either include a use line that imports outbox_message_schema (e.g., use sourced_rust::outbox::outbox_message_schema;) or call registry.register_schema(sourced_rust::outbox::outbox_message_schema()) to make the function origin explicit.src/hashmap_repo/repository.rs (1)
411-416: ⚡ Quick winExtract this outbox write validation into one shared helper.
This is duplicated from
src/sqlx_repo/mod.rs, so HashMap and SQL-backed repositories can quietly diverge on validation behavior. Pulling it into a shared outbox/table utility keeps store conformance aligned.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/hashmap_repo/repository.rs` around lines 411 - 416, The validate_outbox_table_write function duplicates logic present in src/sqlx_repo/mod.rs; extract its body into a shared helper (e.g., outbox::validate_outbox_message_write or outbox::outbox_write_validator) and replace both validate_outbox_table_write (in hashmap repo) and the equivalent code in sqlx_repo with a call to that shared helper; keep the same behavior (calling crate::outbox::outbox_message_insert_plan(...).and_then(|plan| plan.validate().map(|()| plan)).map(|_| ()).map_err(|err| RepositoryError::Model(err.to_string()))) and ensure the helper returns Result<(), RepositoryError> so callers (including validate_outbox_table_write) simply delegate to it.src/outbox_worker/store.rs (1)
323-338: ⚡ Quick winMake async status listing ordering deterministic with sync path.
Line 337 sorts only by
created_at; syncmessages_by_statusalso tie-breaks bymessage_id(Line 234-238). Adding the same tie-breaker avoids unstable ordering when timestamps collide.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/outbox_worker/store.rs` around lines 323 - 338, The async listing in messages_by_status_async currently sorts only by created_at, causing nondeterministic ordering when timestamps tie; mirror the sync messages_by_status tie-breaker by sorting by (created_at, message_id) instead of solely created_at. Update messages.sort_by_key(...) to use a composite key or use sort_by comparing created_at then message_id so ordering matches the sync path (referencing messages_by_status, OutboxMessage.created_at and OutboxMessage.message_id).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/outbox_worker/thread.rs`:
- Around line 111-118: Remove the unnecessary Clone bound from the
OutboxWorkerThread spawn APIs: update the generic constraints on spawn and
spawn_with_id so S: OutboxStore + Send + 'static (drop S: Clone) and similarly
remove Clone where spawn delegates to spawn_with_id; confirm there are no
store.clone() calls in methods like record_publish_claim,
record_publish_complete, or record_publish_failure and adjust any signatures
that currently require Clone so valid OutboxStore implementations that are not
Clone compile.
In `@src/outbox/table.rs`:
- Around line 26-33: The temporal columns (created_at, next_available_at,
claimed_until, published_at, failed_at) are defined as
ColumnType::UnsignedInteger in the table schema (see the table_column(...) calls
in src/outbox/table.rs) but runtime SQL in postgres_repo and sqlite_repo uses
timestamp semantics (to_timestamp(...), CURRENT_TIMESTAMP, timestamp
comparisons), causing a type mismatch; change those table_column(...)
definitions to a proper timestamp type (or the project’s canonical DB timestamp
ColumnType) so the schema matches the SQL paths, and update any related
migration/bootstrap artifacts and tests to use timestamps consistently (also
review usages in src/postgres_repo/mod.rs and src/sqlite_repo/mod.rs to ensure
they read/write the same timestamp representation).
In `@src/sqlite_repo/mod.rs`:
- Around line 127-136: The bootstrap_table_schema_for_dev loop (and the sibling
method around lines 171-180) incorrectly maps SQL execution failures using
read_model_storage_error; update those sqlx::query(...).execute(...).await
.map_err(...) calls to use the table-schema/table-store specific error mapper
used by the Postgres implementation (replace read_model_storage_error(...) with
the table-specific mapper function used elsewhere in the codebase, e.g. the
TableStoreError mapper from the Postgres module), so failures are converted into
TableStoreError instead of read-model errors.
---
Outside diff comments:
In `@src/outbox_worker/store.rs`:
- Around line 242-274: The claim ordering currently sorts only by message id in
the claim function (fn claim) causing divergence from SQL backends; change the
iteration to collect entries from self.storage (iterate storage.iter() or
storage.keys().cloned() together with their messages) and sort by
(message.created_at, message_id) — i.e., order by created_at ASC then message_id
ASC — before claiming; apply the same fix to the other similar loop noted around
lines 342-381 so both places use created_at then id for consistent ordering
across repositories (use the OutboxMessage.created_at field and the message id
key from the map when building the sort key).
---
Nitpick comments:
In `@docs/read-models.md`:
- Line 264: Show where outbox_message_schema() comes from by adding its import
or using its full path when registering the schema: update the example that
creates let mut registry = TableSchemaRegistry::new(); to either include a use
line that imports outbox_message_schema (e.g., use
sourced_rust::outbox::outbox_message_schema;) or call
registry.register_schema(sourced_rust::outbox::outbox_message_schema()) to make
the function origin explicit.
In `@src/hashmap_repo/repository.rs`:
- Around line 411-416: The validate_outbox_table_write function duplicates logic
present in src/sqlx_repo/mod.rs; extract its body into a shared helper (e.g.,
outbox::validate_outbox_message_write or outbox::outbox_write_validator) and
replace both validate_outbox_table_write (in hashmap repo) and the equivalent
code in sqlx_repo with a call to that shared helper; keep the same behavior
(calling crate::outbox::outbox_message_insert_plan(...).and_then(|plan|
plan.validate().map(|()| plan)).map(|_| ()).map_err(|err|
RepositoryError::Model(err.to_string()))) and ensure the helper returns
Result<(), RepositoryError> so callers (including validate_outbox_table_write)
simply delegate to it.
In `@src/outbox_worker/store.rs`:
- Around line 323-338: The async listing in messages_by_status_async currently
sorts only by created_at, causing nondeterministic ordering when timestamps tie;
mirror the sync messages_by_status tie-breaker by sorting by (created_at,
message_id) instead of solely created_at. Update messages.sort_by_key(...) to
use a composite key or use sort_by comparing created_at then message_id so
ordering matches the sync path (referencing messages_by_status,
OutboxMessage.created_at and OutboxMessage.message_id).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: bef64870-9c08-4b6d-93be-eec3c19794ea
📒 Files selected for processing (35)
README.mddocs/async-repositories.mddocs/read-models.mdsrc/bus/mod.rssrc/hashmap_repo/mod.rssrc/hashmap_repo/repository.rssrc/lib.rssrc/outbox/commit.rssrc/outbox/mod.rssrc/outbox/table.rssrc/outbox_worker/mod.rssrc/outbox_worker/store.rssrc/outbox_worker/thread.rssrc/outbox_worker/worker.rssrc/postgres_repo/mod.rssrc/repository/async_repository.rssrc/repository/mod.rssrc/sqlite_repo/mod.rssrc/sqlx_repo/mod.rssrc/table/mod.rssrc/table/sql.rstests/async_repository/main.rstests/bomberman/main.rstests/distributed_read_model/main.rstests/distributed_read_model_board/main.rstests/hashmap_repository_conformance/main.rstests/microsvc/convention.rstests/persistent_repository_conformance/outbox.rstests/postgres_repository_conformance/main.rstests/sagas/distributed.rstests/sagas/microsvc_saga.rstests/sourced_snapshot/main.rstests/sqlite_repository/main.rstests/sqlite_repository_conformance/main.rstests/todos/main.rs
|
@coderabbitai I also addressed the non-inline review items in
Follow-up cleanup in Validation run after these commits:
|
|
Triggering a review of the new commits. ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/outbox/table.rs (1)
17-39:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAdd
updated_atto the generated outbox schema.The runtime SQL outbox paths mutate
updated_aton claim/complete/release/fail, butoutbox_message_schema()never declares that column. Anyoutbox_messagestable bootstrapped from this schema will therefore fail on the first worker update with a missing-column error. Addupdated_athere and make sure the insert/default path is consistent with it.🛠 Minimal schema fix
columns: vec![ table_column("message_id", ColumnType::Text, false), table_column("event_type", ColumnType::Text, false), table_column("payload", ColumnType::Bytes, false), table_column("payload_codec", ColumnType::Text, false), table_column("payload_codec_version", ColumnType::UnsignedInteger, false), table_column("destination", ColumnType::Text, true), table_column("metadata", ColumnType::Json, false), table_column("status", ColumnType::Text, false), table_column("created_at", ColumnType::Timestamp, false), table_column("next_available_at", ColumnType::Timestamp, false), table_column("claimed_by", ColumnType::Text, true), table_column("claimed_until", ColumnType::Timestamp, true), table_column("attempts", ColumnType::UnsignedInteger, false), table_column("last_error", ColumnType::Text, true), table_column("published_at", ColumnType::Timestamp, true), table_column("failed_at", ColumnType::Timestamp, true), + table_column("updated_at", ColumnType::Timestamp, true), table_column("source_aggregate_type", ColumnType::Text, true), table_column("source_aggregate_id", ColumnType::Text, true), table_column("source_sequence", ColumnType::UnsignedInteger, true), table_column("correlation_id", ColumnType::Text, true), table_column("causation_id", ColumnType::Text, true), ],🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/outbox/table.rs` around lines 17 - 39, outbox_message_schema() currently omits the updated_at column while runtime SQL outbox mutates updated_at on claim/complete/release/fail; add table_column("updated_at", ColumnType::Timestamp, false) to the columns vector in outbox_message_schema() (near the existing created_at/next_available_at columns) and ensure any insert/default logic that sets created_at also sets updated_at (or gives it the same default) so bootstrap schema and runtime updates use the same column and won’t cause a missing-column error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/outbox/table.rs`:
- Around line 17-39: outbox_message_schema() currently omits the updated_at
column while runtime SQL outbox mutates updated_at on
claim/complete/release/fail; add table_column("updated_at",
ColumnType::Timestamp, false) to the columns vector in outbox_message_schema()
(near the existing created_at/next_available_at columns) and ensure any
insert/default logic that sets created_at also sets updated_at (or gives it the
same default) so bootstrap schema and runtime updates use the same column and
won’t cause a missing-column error.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 8095e36a-11cf-4e67-b13b-bd79ba8411d7
📒 Files selected for processing (17)
docs/read-models.mdsrc/hashmap_repo/repository.rssrc/outbox/mod.rssrc/outbox/table.rssrc/outbox_worker/store.rssrc/outbox_worker/thread.rssrc/read_model/metadata.rssrc/sqlite_repo/mod.rssrc/sqlx_repo/mod.rssrc/table/sql.rstests/bomberman/commands.rstests/bomberman/domain/game_map.rstests/read_model_schema_bootstrap/main.rstests/sagas/order/mod.rstests/sagas/order/order_aggregate.rstests/sagas/order/payment.rstests/sagas/order/saga.rs
✅ Files skipped from review due to trivial changes (2)
- tests/sagas/order/payment.rs
- docs/read-models.md
|
@coderabbitai Addressed the latest
Validation:
|
|
Triggering a review of commit ✅ Actions performedReview triggered.
|
Summary
Verification
Note: cargo clippy --all-targets --all-features -- -D warnings still fails on existing Bomberman/saga test lint debt unrelated to this change.
Summary by CodeRabbit
New Features
Documentation
Breaking Changes