feat: big refactor for actual persistence - psql, sqlite; outbox refactor; snapshot cleanup; microsvc context#37
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds StreamIdentity and async repository trait surface, extends macros to accept durable aggregate_type strings, implements async wrappers and full async backends (HashMap, SQLite, Postgres), refactors outbox ownership/workers, updates crate exports, adds docs/migrations, and broad Tokio integration and conformance tests. ChangesAsync Repository Boundary and Backends
Sequence Diagram(s)sequenceDiagram
participant Client
participant AsyncRepo as AsyncRepository
participant SQLDB as SQLxDB
Client->>AsyncRepo: get_stream(StreamIdentity)
AsyncRepo->>SQLDB: query aggregate_events by (aggregate_type, aggregate_id)
SQLDB-->>AsyncRepo: rows -> EventRecord[]
AsyncRepo->>Client: hydrated Entity/Aggregate
Client->>AsyncRepo: commit_batch_async(AsyncCommitBatch)
AsyncRepo->>SQLDB: begin transaction
AsyncRepo->>SQLDB: validate prepared appends, reject duplicates
AsyncRepo->>SQLDB: insert events, insert outbox rows, upsert snapshots
SQLDB-->>AsyncRepo: commit / unique-violation
AsyncRepo->>Client: Result (ok | ConcurrentWrite | ModelError)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
src/aggregate/async_aggregate.rs (1)
63-74: 💤 Low valueConsider documenting the ordering/correspondence semantics of
get_all.The method collects identities from
ids, callsget_streams, and hydrates whatever entities are returned. If some IDs don't exist, the result may have fewer items thanids, and the order may not correspond to the input order. This could be surprising to callers who expect a 1:1 positional match.If this is the intended behavior (skip missing aggregates), consider adding a doc comment clarifying it. Alternatively, if callers need position correspondence, returning
Vec<Option<A>>would be more explicit.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/aggregate/async_aggregate.rs` around lines 63 - 74, The get_all method currently maps ids -> stream_identity_for::<A>(id), calls self.repo.get_streams(&identities).await and hydrates returned entities, but the result may omit missing IDs and lose positional correspondence with the input ids; update the behavior and document it: either add a doc comment on async_aggregate::get_all (and mention stream_identity_for, get_streams, hydrate) to state that missing aggregates are skipped and order is not guaranteed, or change the signature to return Vec<Option<A>> and implement logic to preserve input order by mapping each id to its corresponding hydrate result (None for missing), using stream_identity_for::<A> to build lookups and get_streams to fetch entities. Ensure tests and call sites are updated accordingly.sourced_rust_macros/src/lib.rs (1)
698-708: ⚡ Quick winValidate
aggregate_typeliterals during macro parsing.These literals are known at compile time; rejecting empty/whitespace/reserved-delimiter values here avoids deferred runtime failures (
InvalidStreamIdentity).🧩 Proposed patch
use syn::{ @@ }; +fn validate_aggregate_type_literal(lit: &LitStr) -> syn::Result<()> { + let value = lit.value(); + if value.trim().is_empty() { + return Err(syn::Error::new_spanned( + lit, + "`aggregate_type` must not be empty or whitespace", + )); + } + if value.contains('\u{1f}') { + return Err(syn::Error::new_spanned( + lit, + "`aggregate_type` must not contain the reserved stream delimiter (U+001F)", + )); + } + Ok(()) +} + impl Parse for AggregateInput { @@ - input.parse::<Token![=]>()?; - aggregate_type = Some(input.parse::<LitStr>()?); + input.parse::<Token![=]>()?; + let lit = input.parse::<LitStr>()?; + validate_aggregate_type_literal(&lit)?; + aggregate_type = Some(lit); } @@ if kw == "events" { @@ } else if kw == "aggregate_type" { input.parse::<Ident>()?; input.parse::<Token![=]>()?; - aggregate_type = Some(input.parse::<LitStr>()?); + let lit = input.parse::<LitStr>()?; + validate_aggregate_type_literal(&lit)?; + aggregate_type = Some(lit); } else if kw == "enqueue" {Also applies to: 820-836
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sourced_rust_macros/src/lib.rs` around lines 698 - 708, When parsing the aggregate_type literal in the macro (the block that sets aggregate_type using input.parse::<LitStr>()), validate the parsed string value immediately: if it's empty, only whitespace, or contains reserved delimiter characters used for stream identity, return a syn::Error::new with the LitStr span and a clear message (e.g. "invalid aggregate_type: empty/whitespace/reserved characters") instead of accepting it; apply the same validation to the other equivalent parsing block around the aggregate_type handling at the later location (the similar input.parse::<LitStr>() usage between lines ~820-836) so malformed literals are rejected at macro-expansion time.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/async-repositories.md`:
- Around line 3-4: Replace the placeholder name "Distributed" in the sentence
shown (the word Distributed in the docs/async-repositories.md content) with the
actual crate/project name used in this repo so the documentation reflects the
current project; update the sentence "Distributed keeps the synchronous
in-memory repository API intact..." to use the real project/crate identifier
consistently.
In `@src/repository/identity.rs`:
- Around line 25-39: The validation for stream identity components
(aggregate_type and aggregate_id) currently only checks for emptiness; update
the checks in the function that returns RepositoryError::InvalidStreamIdentity
so they also reject the storage-key delimiter character '\u{1f}' (and any other
reserved delimiters if used) by treating strings containing that codepoint as
invalid and returning RepositoryError::InvalidStreamIdentity with an appropriate
reason; apply the same guard to the other validation location referenced (lines
57-59) so both aggregate_type and aggregate_id cannot contain '\u{1f}' before
composing storage keys.
---
Nitpick comments:
In `@sourced_rust_macros/src/lib.rs`:
- Around line 698-708: When parsing the aggregate_type literal in the macro (the
block that sets aggregate_type using input.parse::<LitStr>()), validate the
parsed string value immediately: if it's empty, only whitespace, or contains
reserved delimiter characters used for stream identity, return a syn::Error::new
with the LitStr span and a clear message (e.g. "invalid aggregate_type:
empty/whitespace/reserved characters") instead of accepting it; apply the same
validation to the other equivalent parsing block around the aggregate_type
handling at the later location (the similar input.parse::<LitStr>() usage
between lines ~820-836) so malformed literals are rejected at macro-expansion
time.
In `@src/aggregate/async_aggregate.rs`:
- Around line 63-74: The get_all method currently maps ids ->
stream_identity_for::<A>(id), calls self.repo.get_streams(&identities).await and
hydrates returned entities, but the result may omit missing IDs and lose
positional correspondence with the input ids; update the behavior and document
it: either add a doc comment on async_aggregate::get_all (and mention
stream_identity_for, get_streams, hydrate) to state that missing aggregates are
skipped and order is not guaranteed, or change the signature to return
Vec<Option<A>> and implement logic to preserve input order by mapping each id to
its corresponding hydrate result (None for missing), using
stream_identity_for::<A> to build lookups and get_streams to fetch entities.
Ensure tests and call sites are updated accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 6489368a-8e2f-4433-9743-b19e88b78a62
📒 Files selected for processing (17)
docs/async-repositories.mddocs/research-and-roadmap.mdsourced_rust_macros/src/lib.rssrc/aggregate/aggregate.rssrc/aggregate/async_aggregate.rssrc/aggregate/mod.rssrc/hashmap_repo/repository.rssrc/lib.rssrc/outbox_worker/repository_ext.rssrc/read_model/in_memory.rssrc/read_model/session.rssrc/repository/async_repository.rssrc/repository/error.rssrc/repository/identity.rssrc/repository/mod.rssrc/snapshot/in_memory.rstests/async_repository/main.rs
Implements [[tasks/extract-shared-sqlx-repository-helpers]]
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/postgres_repo/mod.rs`:
- Around line 400-404: The code swallows failures from stream_version_pool by
using unwrap_or_default(), turning DB lookup errors into actual=0 and
misreporting outages as concurrency conflicts; change the call to
stream_version_pool(pool, identity).await to handle its Result explicitly: if it
returns Ok(actual) proceed to return Err(RepositoryError::ConcurrentWrite {
actual, /* expected... */ }), but if it returns Err(e) propagate or map that
error (e.g., return Err(mapped RepositoryError::Storage or forward the original
error) instead of defaulting); update the branch that currently uses
is_postgres_unique_violation, stream_version_pool, and
RepositoryError::ConcurrentWrite to remove unwrap_or_default and return the
stream lookup failure when it occurs.
In `@src/sqlite_repo/mod.rs`:
- Around line 315-325: The update is vulnerable to a TOCTOU race because you
check expected_version before calling update_document_in_tx, which performs an
UPDATE only by (collection, id); change update_document_in_tx to accept the
expected_version (or make a new helper) and include "AND version = ?" in the SQL
UPDATE predicate, then have it return the number of rows affected; in the caller
(where next_document_version, update_document_in_tx, commit_read_model_tx are
used) check rows_affected == 0 and map that to
ReadModelError::ConcurrencyConflict (using M::COLLECTION, model.id(), expected
and actual as before) instead of relying on the separate pre-check so the
read-model update becomes atomic.
- Around line 947-958: The function system_time_from_storage currently
constructs a Duration with parsed nanos which can panic if nanos >=
1_000_000_000; add a guard after parsing that checks nanos < 1_000_000_000 (and
return UNIX_EPOCH or otherwise handle the invalid value) before calling
Duration::new(secs, nanos) so invalid stored values don't cause a panic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: a35b7fb4-d32e-4866-bc5c-6f68e2b00476
📒 Files selected for processing (11)
Cargo.tomlcompose.yamldocs/async-repositories.mdmigrations/postgres/0001_initial.sqlmigrations/sqlite/0001_initial.sqlsrc/lib.rssrc/postgres_repo/mod.rssrc/sqlite_repo/mod.rssrc/sqlx_repo/mod.rstests/postgres_repository/main.rstests/sqlite_repository/main.rs
✅ Files skipped from review due to trivial changes (3)
- migrations/sqlite/0001_initial.sql
- migrations/postgres/0001_initial.sql
- docs/async-repositories.md
Implements GitKB task tasks/implement-persistent-repository-conformance-harness.
Keeps the persistent repository conformance fixture closer to public aggregate usage.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/persistent_repository_conformance/scenario.rs (1)
104-106:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winMake
get_allassertion order-insensitive to avoid backend-specific flakiness.These assertions assume deterministic ordering from
get_all, which is often not guaranteed across SQL backends.Suggested fix
- assert_eq!(loaded[0].entity.id(), first_id); - assert_eq!(loaded[1].entity.id(), second_id); + let mut actual_ids = loaded + .iter() + .map(|seat| seat.entity.id().to_string()) + .collect::<Vec<_>>(); + actual_ids.sort(); + let mut expected_ids = vec![first_id, second_id]; + expected_ids.sort(); + assert_eq!(actual_ids, expected_ids);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/persistent_repository_conformance/scenario.rs` around lines 104 - 106, Test assumes deterministic ordering from get_all; make the assertion order-insensitive by comparing the returned entities by identity rather than by index. In the scenario test where loaded, first_id and second_id are used, replace the index-based asserts with an unordered comparison: map loaded to a collection of ids (from loaded[*].entity.id()), convert to a HashSet (or sort both vectors) and assert that this set equals the set containing first_id and second_id; update the assertions that reference loaded[0]/loaded[1] accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/persistent_repository_conformance/checkout_saga.rs`:
- Around line 26-39: Tighten the event guard on record_seat_reserved so the saga
won’t accept a seat payload that conflicts with an already-associated seat: in
the #[event(... when = ...)] expression add checks that either the saga has no
reserved seat/category yet or the incoming seat_id and seat_category match the
saga’s existing values (e.g. require (self.reserved_seat_id.is_empty() ||
self.reserved_seat_id == seat_id) && (self.seat_category.is_empty() ||
self.seat_category == seat_category) in addition to the existing checkout/status
checks for the record_seat_reserved handler).
In `@tests/persistent_repository_conformance/seat.rs`:
- Around line 23-30: The reserve transition's guard must also verify the
provided seat identifiers match the aggregate so events cannot drift; update the
#[event(... when = ...)] guard on the reserve function to require matching
identifiers (e.g., add self.seat_id == seat_id && self.seat_category ==
seat_category) in addition to the existing status and checkout_id checks,
ensuring the event only fires when the incoming seat_id/seat_category align with
the aggregate's self.seat_id/self.seat_category.
---
Outside diff comments:
In `@tests/persistent_repository_conformance/scenario.rs`:
- Around line 104-106: Test assumes deterministic ordering from get_all; make
the assertion order-insensitive by comparing the returned entities by identity
rather than by index. In the scenario test where loaded, first_id and second_id
are used, replace the index-based asserts with an unordered comparison: map
loaded to a collection of ids (from loaded[*].entity.id()), convert to a HashSet
(or sort both vectors) and assert that this set equals the set containing
first_id and second_id; update the assertions that reference loaded[0]/loaded[1]
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 7b3da5e8-7919-4e85-a50d-5f8508052504
📒 Files selected for processing (4)
tests/persistent_repository_conformance/checkout.rstests/persistent_repository_conformance/checkout_saga.rstests/persistent_repository_conformance/scenario.rstests/persistent_repository_conformance/seat.rs
💤 Files with no reviewable changes (1)
- tests/persistent_repository_conformance/checkout.rs
Implements [[specs/outbox-delivery-table]]. - Persist outbox messages as batch rows instead of aggregate streams. - Add SQLite/Postgres outbox tables and worker claim/update paths. - Use owned message commit APIs and exact caller-provided message IDs. - Add shared conformance coverage for atomic commit and worker lifecycle.
|
Outbox delivery-table update pushed in e9bcd6a. Summary:
Verification:
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/commit_builder/mod.rs (1)
232-243:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftPropagate outbox source in staged commits for parity with non-staged commits.
CommitBuilder::commitsets outbox source (Line 108), butStagedCommitBuilder::commitforwards messages unchanged. In staged flows (aggregate(...).outbox(...).commit()), outbox rows can lose aggregate-source linkage.Suggested direction
pub fn commit(mut self) -> Result<(), RepositoryError> where R: TransactionalCommit, { self.check_staged()?; + + // Ensure staged flow applies the same source-link behavior as CommitBuilder::commit. + // Example approach: + // - when exactly one staged aggregate exists, set outbox source from it + // - or require caller-provided source and validate before commit + // (implementation depends on available OutboxMessage source setter APIs) let mut entity_refs: Vec<&mut Entity> = self.entities.iter_mut().collect(); entity_refs.extend(self.staged_entities); self.repo.commit_batch(CommitBatch { entities: entity_refs, outbox_messages: self.outbox_messages, read_model_plans: self.read_model_plans, snapshots: Vec::new(), }) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/commit_builder/mod.rs` around lines 232 - 243, StagedCommitBuilder::commit is forwarding outbox_messages unchanged which loses the aggregate/source linkage that CommitBuilder::commit sets; before calling self.repo.commit_batch(CommitBatch { ... }) map/transform self.outbox_messages the same way as in CommitBuilder::commit to populate the outbox message source (use the same field/logic used there, e.g. set message.source = self.source or derive from the staged entity/aggregate id), then pass the transformed outbox_messages into CommitBatch so staged commits preserve the outbox source.tests/bomberman/commands.rs (1)
221-230:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftKill-event attribution can be wrong when multiple detonations happen in one tick.
Lines 222–230 use the most recent detonation for every killed player, so
killed_by_bomb/bomb_ownercan be incorrect for kills caused by earlier detonations in the same tick.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/bomberman/commands.rs` around lines 221 - 230, The current code always uses saga.detonations.last() (variable det) for every killed player which can misattribute kills when multiple detonations occur in the same tick; instead locate the specific detonation that actually caused this killed_id by searching saga.detonations (e.g., saga.detonations.iter().find(...)) for a detonation that lists or affects that player (match on a field like affected_players, killed_players, bomb_id, or timestamp relative to the kill event) and use that det to populate "killed_by_bomb" and "bomb_owner" in the OutboxMessage::create payload, falling back to "unknown" only if no matching detonation is found.
🧹 Nitpick comments (2)
tests/todos/main.rs (1)
73-74: ⚡ Quick winDon’t discard commit results in test setup paths.
At Line 73 and Line 94, ignoring commit results can hide the actual failure point and make test failures noisier downstream.
Tighten test failure signals
- let _ = repo.outbox(init_message).commit(&mut todo); + repo.outbox(init_message).commit(&mut todo).unwrap(); @@ - let _ = repo.outbox(complete_message).commit(&mut retrieved_todo); + repo.outbox(complete_message) + .commit(&mut retrieved_todo) + .unwrap();Also applies to: 94-95
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/todos/main.rs` around lines 73 - 74, Replace the ignored commit results for test setup (the calls to repo.outbox(init_message).commit(&mut todo) and the similar call around line 94) with explicit error handling so failures surface immediately; for example, call .expect("commit failed in test setup") or propagate the Result instead of using let _ = ..., ensuring the commit() return value is checked and the test aborts with a clear message if the commit fails.migrations/postgres/0001_initial.sql (1)
39-68: ⚡ Quick winAdd status-dependent CHECK constraints to prevent impossible outbox states.
Current constraints validate individual columns, but they don’t guard cross-field state consistency (e.g.,
status='pending'withclaimed_byset). Enforcing these invariants at DB level will protect worker lifecycle correctness.Schema hardening example
CREATE TABLE IF NOT EXISTS outbox_messages ( @@ CHECK (status IN ('pending', 'in_flight', 'published', 'failed')), - CHECK (attempts >= 0) + CHECK (attempts >= 0), + CHECK ( + (status <> 'in_flight') + OR (claimed_by IS NOT NULL AND claimed_until IS NOT NULL) + ), + CHECK ( + (status = 'in_flight') + OR (claimed_by IS NULL AND claimed_until IS NULL) + ), + CHECK ( + (status = 'published') = (published_at IS NOT NULL) + ), + CHECK ( + (status = 'failed') = (failed_at IS NOT NULL) + ) );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@migrations/postgres/0001_initial.sql` around lines 39 - 68, Add table-level CHECK constraints to outbox_messages to enforce cross-column invariants for status-driven states: add a CHECK using a CASE/WHEN expression that for status='pending' requires claimed_by IS NULL AND claimed_until IS NULL AND published_at IS NULL AND failed_at IS NULL AND attempts = 0; for status='in_flight' requires claimed_by IS NOT NULL AND claimed_until IS NOT NULL AND published_at IS NULL AND failed_at IS NULL; for status='published' requires published_at IS NOT NULL AND failed_at IS NULL; and for status='failed' requires failed_at IS NOT NULL. Modify the CREATE TABLE outbox_messages definition to include these CHECK(...) constraints referencing status, claimed_by, claimed_until, published_at, failed_at, and attempts so impossible combinations are rejected by the DB.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
@.agents/skills/supabase-postgres-best-practices/references/security-rls-basics.md:
- Around line 8-10: Update the wording to use consistent hyphenation: change the
heading "Enable Row Level Security for Multi-Tenant Data" to "Enable Row-Level
Security for Multi-Tenant Data" and replace occurrences of the phrase "Row Level
Security" in the body with "Row-Level Security" (search for the exact string
"Row Level Security" and update each instance) so the document consistently uses
the hyphenated form.
In @.agents/skills/supabase-postgres-best-practices/SKILL.md:
- Around line 44-48: Add a language tag to the fenced code block in
.agents/skills/supabase-postgres-best-practices/SKILL.md (the triple-backtick
block listing the three reference files) to satisfy markdownlint MD040; update
the opening fence from ``` to ```text so the block is explicitly marked as plain
text.
In `@src/outbox_worker/repository_ext.rs`:
- Around line 263-269: The async outbox ordering currently sorts only by
created_at (messages.sort_by_key(|message| message.created_at)), which yields
unstable order when timestamps tie and diverges from the sync path; change the
sort to deterministically tie-break on id as well (e.g., sort by (created_at,
id) or use sort_by with created_at then id) so the messages Vec produced from
storage.values() has the same ordering as the sync implementation; update the
messages.sort_by_key call to a composite sort that includes message.id as the
secondary key.
In `@src/outbox/message.rs`:
- Around line 336-345: The re-initialization block that sets self.id,
self.event_type, self.payload, etc. must also clear delivery and source state to
avoid carrying stale state; update the same initializer (the block that assigns
self.id/event_type/payload) to reset self.attempts = 0, self.last_error = None,
self.worker_id = None, self.leased_until = None and clear any source-related
fields (e.g., self.source_id/self.source_type or whichever source_* fields
exist) so a reused OutboxMessage instance starts with fresh delivery/source
state.
In `@src/postgres_repo/mod.rs`:
- Around line 864-917: The outbox_message_from_row function selects
correlation_id and causation_id but never maps them into the returned
OutboxMessage; modify the constructor near the end (the Ok(OutboxMessage { ...
}) block) to extract both fields from the row using
row.try_get("correlation_id") and row.try_get("causation_id"), mapping errors
with repository_storage_error (e.g., "decode outbox correlation_id row" and
"decode outbox causation_id row"), and include the resulting values as the
correlation_id and causation_id fields on the OutboxMessage so the metadata is
preserved.
In `@src/sqlite_repo/mod.rs`:
- Around line 925-987: outbox_message_from_row is selecting correlation_id and
causation_id (via outbox_message_select_sql) but never assigns them to the
OutboxMessage; update outbox_message_from_row to read the columns (e.g.
row.try_get::<Option<String>, _>("correlation_id") and
row.try_get::<Option<String>, _>("causation_id")), map as needed
(as_deref()/map(...) or transpose()) and assign into message.correlation_id and
message.causation_id (or into the metadata map if OutboxMessage stores them
there) just before returning (after computing source_sequence and before
Ok(message)); ensure you use repository_storage_error for any try_get errors and
preserve the Option semantics and types that OutboxMessage expects.
---
Outside diff comments:
In `@src/commit_builder/mod.rs`:
- Around line 232-243: StagedCommitBuilder::commit is forwarding outbox_messages
unchanged which loses the aggregate/source linkage that CommitBuilder::commit
sets; before calling self.repo.commit_batch(CommitBatch { ... }) map/transform
self.outbox_messages the same way as in CommitBuilder::commit to populate the
outbox message source (use the same field/logic used there, e.g. set
message.source = self.source or derive from the staged entity/aggregate id),
then pass the transformed outbox_messages into CommitBatch so staged commits
preserve the outbox source.
In `@tests/bomberman/commands.rs`:
- Around line 221-230: The current code always uses saga.detonations.last()
(variable det) for every killed player which can misattribute kills when
multiple detonations occur in the same tick; instead locate the specific
detonation that actually caused this killed_id by searching saga.detonations
(e.g., saga.detonations.iter().find(...)) for a detonation that lists or affects
that player (match on a field like affected_players, killed_players, bomb_id, or
timestamp relative to the kill event) and use that det to populate
"killed_by_bomb" and "bomb_owner" in the OutboxMessage::create payload, falling
back to "unknown" only if no matching detonation is found.
---
Nitpick comments:
In `@migrations/postgres/0001_initial.sql`:
- Around line 39-68: Add table-level CHECK constraints to outbox_messages to
enforce cross-column invariants for status-driven states: add a CHECK using a
CASE/WHEN expression that for status='pending' requires claimed_by IS NULL AND
claimed_until IS NULL AND published_at IS NULL AND failed_at IS NULL AND
attempts = 0; for status='in_flight' requires claimed_by IS NOT NULL AND
claimed_until IS NOT NULL AND published_at IS NULL AND failed_at IS NULL; for
status='published' requires published_at IS NOT NULL AND failed_at IS NULL; and
for status='failed' requires failed_at IS NOT NULL. Modify the CREATE TABLE
outbox_messages definition to include these CHECK(...) constraints referencing
status, claimed_by, claimed_until, published_at, failed_at, and attempts so
impossible combinations are rejected by the DB.
In `@tests/todos/main.rs`:
- Around line 73-74: Replace the ignored commit results for test setup (the
calls to repo.outbox(init_message).commit(&mut todo) and the similar call around
line 94) with explicit error handling so failures surface immediately; for
example, call .expect("commit failed in test setup") or propagate the Result
instead of using let _ = ..., ensuring the commit() return value is checked and
the test aborts with a clear message if the commit fails.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: d65501a4-50e8-4990-87ec-373603962123
📒 Files selected for processing (89)
.agents/skills/supabase-postgres-best-practices/SKILL.md.agents/skills/supabase-postgres-best-practices/references/_contributing.md.agents/skills/supabase-postgres-best-practices/references/_sections.md.agents/skills/supabase-postgres-best-practices/references/_template.md.agents/skills/supabase-postgres-best-practices/references/advanced-full-text-search.md.agents/skills/supabase-postgres-best-practices/references/advanced-jsonb-indexing.md.agents/skills/supabase-postgres-best-practices/references/conn-idle-timeout.md.agents/skills/supabase-postgres-best-practices/references/conn-limits.md.agents/skills/supabase-postgres-best-practices/references/conn-pooling.md.agents/skills/supabase-postgres-best-practices/references/conn-prepared-statements.md.agents/skills/supabase-postgres-best-practices/references/data-batch-inserts.md.agents/skills/supabase-postgres-best-practices/references/data-n-plus-one.md.agents/skills/supabase-postgres-best-practices/references/data-pagination.md.agents/skills/supabase-postgres-best-practices/references/data-upsert.md.agents/skills/supabase-postgres-best-practices/references/lock-advisory.md.agents/skills/supabase-postgres-best-practices/references/lock-deadlock-prevention.md.agents/skills/supabase-postgres-best-practices/references/lock-short-transactions.md.agents/skills/supabase-postgres-best-practices/references/lock-skip-locked.md.agents/skills/supabase-postgres-best-practices/references/monitor-explain-analyze.md.agents/skills/supabase-postgres-best-practices/references/monitor-pg-stat-statements.md.agents/skills/supabase-postgres-best-practices/references/monitor-vacuum-analyze.md.agents/skills/supabase-postgres-best-practices/references/query-composite-indexes.md.agents/skills/supabase-postgres-best-practices/references/query-covering-indexes.md.agents/skills/supabase-postgres-best-practices/references/query-index-types.md.agents/skills/supabase-postgres-best-practices/references/query-missing-indexes.md.agents/skills/supabase-postgres-best-practices/references/query-partial-indexes.md.agents/skills/supabase-postgres-best-practices/references/schema-constraints.md.agents/skills/supabase-postgres-best-practices/references/schema-data-types.md.agents/skills/supabase-postgres-best-practices/references/schema-foreign-key-indexes.md.agents/skills/supabase-postgres-best-practices/references/schema-lowercase-identifiers.md.agents/skills/supabase-postgres-best-practices/references/schema-partitioning.md.agents/skills/supabase-postgres-best-practices/references/schema-primary-keys.md.agents/skills/supabase-postgres-best-practices/references/security-privileges.md.agents/skills/supabase-postgres-best-practices/references/security-rls-basics.md.agents/skills/supabase-postgres-best-practices/references/security-rls-performance.md.claude/skills/supabase-postgres-best-practicesREADME.mdmigrations/postgres/0001_initial.sqlmigrations/sqlite/0001_initial.sqlskills-lock.jsonsrc/bus/mod.rssrc/commit_builder/mod.rssrc/hashmap_repo/repository.rssrc/lib.rssrc/outbox/commit.rssrc/outbox/message.rssrc/outbox/mod.rssrc/outbox_worker/mod.rssrc/outbox_worker/repository_ext.rssrc/outbox_worker/thread.rssrc/outbox_worker/worker.rssrc/postgres_repo/mod.rssrc/repository/async_repository.rssrc/repository/batch.rssrc/repository/error.rssrc/snapshot/repository.rssrc/sqlite_repo/mod.rssrc/sqlx_repo/mod.rstests/async_repository/main.rstests/bomberman/commands.rstests/distributed_read_model/checkout_saga_service/handlers/record_seat_reserved.rstests/distributed_read_model/checkout_saga_service/handlers/start.rstests/distributed_read_model/seat_inventory_service/handlers/add.rstests/distributed_read_model/seat_inventory_service/handlers/reserve_started_checkout_seat.rstests/distributed_read_model_board/board_service/handlers/board_add_card.rstests/distributed_read_model_board/board_service/handlers/board_move_card.rstests/distributed_read_model_board/board_service/handlers/board_open.rstests/distributed_read_model_board/board_service/handlers/board_remove_card.rstests/distributed_read_model_board/projections_service/handlers/board.rstests/hashmap_repository_conformance/main.rstests/microsvc/handlers/counter_create.rstests/microsvc/handlers/counter_increment.rstests/persistent_repository_conformance/outbox.rstests/persistent_repository_conformance/scenario.rstests/postgres_repository/main.rstests/postgres_repository_conformance/main.rstests/sagas/distributed.rstests/sagas/handlers/inventory/reserve.rstests/sagas/handlers/orders/complete.rstests/sagas/handlers/orders/create.rstests/sagas/handlers/payments/process.rstests/sagas/handlers/saga/on_inventory_reserved.rstests/sagas/handlers/saga/on_order_created.rstests/sagas/handlers/saga/on_payment_succeeded.rstests/sagas/handlers/saga/start.rstests/sourced_snapshot/main.rstests/sqlite_repository/main.rstests/sqlite_repository_conformance/main.rstests/todos/main.rs
✅ Files skipped from review due to trivial changes (34)
- .agents/skills/supabase-postgres-best-practices/references/query-missing-indexes.md
- .agents/skills/supabase-postgres-best-practices/references/data-batch-inserts.md
- .agents/skills/supabase-postgres-best-practices/references/schema-constraints.md
- .agents/skills/supabase-postgres-best-practices/references/data-pagination.md
- .agents/skills/supabase-postgres-best-practices/references/query-partial-indexes.md
- .agents/skills/supabase-postgres-best-practices/references/lock-short-transactions.md
- .agents/skills/supabase-postgres-best-practices/references/data-n-plus-one.md
- .agents/skills/supabase-postgres-best-practices/references/query-index-types.md
- .agents/skills/supabase-postgres-best-practices/references/conn-limits.md
- .agents/skills/supabase-postgres-best-practices/references/lock-skip-locked.md
- .agents/skills/supabase-postgres-best-practices/references/lock-advisory.md
- .claude/skills/supabase-postgres-best-practices
- .agents/skills/supabase-postgres-best-practices/references/_template.md
- .agents/skills/supabase-postgres-best-practices/references/data-upsert.md
- .agents/skills/supabase-postgres-best-practices/references/security-rls-performance.md
- src/bus/mod.rs
- .agents/skills/supabase-postgres-best-practices/references/conn-prepared-statements.md
- .agents/skills/supabase-postgres-best-practices/references/schema-lowercase-identifiers.md
- .agents/skills/supabase-postgres-best-practices/references/monitor-explain-analyze.md
- .agents/skills/supabase-postgres-best-practices/references/query-composite-indexes.md
- .agents/skills/supabase-postgres-best-practices/references/conn-pooling.md
- .agents/skills/supabase-postgres-best-practices/references/schema-primary-keys.md
- .agents/skills/supabase-postgres-best-practices/references/_sections.md
- .agents/skills/supabase-postgres-best-practices/references/schema-data-types.md
- src/outbox_worker/worker.rs
- .agents/skills/supabase-postgres-best-practices/references/schema-foreign-key-indexes.md
- .agents/skills/supabase-postgres-best-practices/references/conn-idle-timeout.md
- .agents/skills/supabase-postgres-best-practices/references/monitor-pg-stat-statements.md
- .agents/skills/supabase-postgres-best-practices/references/advanced-jsonb-indexing.md
- .agents/skills/supabase-postgres-best-practices/references/query-covering-indexes.md
- skills-lock.json
- .agents/skills/supabase-postgres-best-practices/references/lock-deadlock-prevention.md
- .agents/skills/supabase-postgres-best-practices/references/monitor-vacuum-analyze.md
- .agents/skills/supabase-postgres-best-practices/references/schema-partitioning.md
Implements specs/outbox-worker-store-boundary
|
@coderabbitai addressed the open review comments on this branch. Implemented the still-valid inline, outside-diff, and nitpick items across docs, stream identity validation, macro parsing, SQLite/Postgres persistence, outbox state/source/metadata handling, conformance tests, Todo setup commits, and Bomberman kill attribution. Notes:
Validation passed:
|
|
Triggering a re-review to verify the updated state of the PR. 🐇✨ ✅ Actions performedComments resolved. Approval is disabled; enable Full review triggered. |
Implements [[address-coderabbit-review-comments]]
Summary
Verification
Note: cargo clippy --all-targets --all-features -- -D warnings still reports pre-existing warnings in unrelated integration tests under tests/todos, tests/sagas, and tests/bomberman.
Summary by CodeRabbit
New Features
Documentation
Database
Tests