Skip to content

Add relational read-model ORM write plans#35

Merged
patrickleet merged 26 commits into
mainfrom
feat/orm
May 24, 2026
Merged

Add relational read-model ORM write plans#35
patrickleet merged 26 commits into
mainfrom
feat/orm

Conversation

@patrickleet
Copy link
Copy Markdown
Collaborator

@patrickleet patrickleet commented May 23, 2026

Summary

  • add relational read-model metadata, schema registry/bootstrap contracts, sessions, and write-plan support
  • add direct ReadModel helper attributes for collection/table/column/id/index/unique, including compound index metadata
  • add distributed read-model idempotency/document conformance coverage and reorganize the distributed read-model test into account, projection, and query service modules

Verification

  • cargo fmt --all -- --check
  • cargo test --workspace
  • cargo test -p sourced_rust_macros read_model
  • cargo test --test distributed_read_model
  • cargo test --test distributed_read_model --features http
  • cargo test --test distributed_read_model --features grpc
  • cargo test --test read_model_metadata
  • cargo test --test read_model_schema_bootstrap
  • cargo test --test read_model_session
  • cargo clippy -p sourced_rust --lib -- -D warnings
  • cargo clippy -p sourced_rust_macros --lib -- -D warnings
  • cargo clippy --test distributed_read_model -- -D warnings
  • cargo clippy --test read_model_metadata -- -D warnings
  • cargo clippy --test read_model_schema_bootstrap -- -D warnings
  • cargo clippy --test read_model_session -- -D warnings

Summary by CodeRabbit

  • New Features

    • Session-based read-model commits with processed-message idempotency, staged commit APIs, and session+aggregate commit extension.
    • First-class normalized relational read-models: schema registry/validation, relational includes, typed row mutations, and convenience primary-key lookup API.
    • Expanded derive/attribute support for read-model mapping.
  • Documentation

    • README/docs updated to clarify document vs relational models, atomic commits, sessions, and consistency guidance.
  • Tests

    • Extensive new tests and integration examples for sessions, schemas, projections, idempotency, and relationship includes.

Review Change Stack

Implements [[specs/read-model-orm-unification]].

Covers [[tasks/read-model-orm-01-inventory]], [[tasks/read-model-orm-02-metadata]], [[tasks/read-model-orm-03-session-write-plan]], [[tasks/read-model-orm-04-commit-builder-bridge]], [[tasks/read-model-orm-05-compat-conformance]], [[tasks/read-model-orm-06-distributed-idempotency]], [[tasks/read-model-orm-07-schema-bootstrap]], and [[tasks/read-model-orm-08-test-migration-docs]].
Adds direct ReadModel helper attributes for collection, table, column, id, field indexes, unique indexes, and struct-level compound indexes. Updates distributed, metadata, session, schema, and docs coverage.

Implements [[tasks/read-model-orm-09-compound-indexes]].
Moves the distributed read-model integration test into account_service, projections_service, and query_service modules while preserving existing behavior.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 23, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 4c1bcfe6-ce6d-474e-a3f5-862dcd6a210a

📥 Commits

Reviewing files that changed from the base of the PR and between 5dc2b77 and 8b1ce83.

📒 Files selected for processing (12)
  • tests/distributed_read_model/catalog_service/handlers/product_add.rs
  • tests/distributed_read_model/catalog_service/models/product.rs
  • tests/distributed_read_model/inventory_service/handlers/release.rs
  • tests/distributed_read_model/inventory_service/handlers/reserve.rs
  • tests/distributed_read_model/inventory_service/models/inventory.rs
  • tests/distributed_read_model/order_service/handlers/order_add_line.rs
  • tests/distributed_read_model/order_service/handlers/order_change_quantity.rs
  • tests/distributed_read_model/order_service/handlers/order_remove_line.rs
  • tests/distributed_read_model/order_service/models/order.rs
  • tests/distributed_read_model/projections_service/handlers/order.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/read_model_distributed_idempotency/main.rs

📝 Walkthrough

Walkthrough

Adds ReadModelSession write plans, relational schema/registry, derive macro attribute expansion, plan-based commit pipeline, store/repo session support, commit-builder staged commits, updated locking APIs, and extensive tests and docs.

Changes

Read-model sessions and relational schema

Layer / File(s) Summary
Docs and examples
README.md, docs/read-models.md
Rewrites read-model docs; distinguishes document vs relational models, adds session/commit examples, projector guidance, schema registry, locking helpers, and non-goals.
ReadModel derive macro
sourced_rust_macros/src/lib.rs, sourced_rust_macros/src/read_model.rs
Derive expanded to accept collection, table, column, id, index, unique attributes; generates relational schema/row/key conversion and relationship metadata.
Relational metadata & registry
src/read_model/metadata.rs, src/read_model/schema.rs
Introduces Column/Index/ForeignKey/PrimaryKey/ReadModelSchema, validation rules, schema registry, adapter artifact/verification/bootstrap types and trait.
Session planning & UoW
src/read_model/session.rs
Adds ReadModelSession, ReadModelWritePlan, mutation types (Document/Row/Patch/Delete), deterministic ordering, processed-message marks, and unit-of-work include/diff/save_changes flow.
In-memory store & plan applier
src/read_model/in_memory.rs
Adds StoredRow, ProcessedMessageSet, apply_document_write_plan/apply_read_model_write_plan, relational_rows, processed_messages, and RelationalReadModelQueryStore support.
Queued & HashMap stores
src/read_model/queued.rs, src/hashmap_repo/repository.rs
Queued store forwards get_by_primary_key, locks derived from write-plan lock keys; HashMapRepository applies write plans and publishes processed-message marks; both implement ReadModelSessionStore.
Commit builder & batch
src/commit_builder/mod.rs, src/repository/batch.rs
CommitBuilder stages ReadModelWritePlan list, adds StagedCommitBuilder and ReadModelSessionCommitExt, replaces ReadModelWrite with ReadModelWritePlan in CommitBatch and repository batch paths.
Public API re-exports
src/lib.rs, src/read_model/mod.rs, src/read_model/store.rs, src/read_model/repository.rs
Adds many read-model/session/schema types to crate exports; ReadModelStore gains get_by_primary_key; ReadModelRepository gains get_by_primary_key.
Tests & examples
tests/*
Extensive test additions/updates: session/unit-of-work, schema bootstrap, metadata, document conformance, distributed projection/services (catalog/order/inventory/payment/sagas/board), idempotency, and integration scenarios.

Sequence Diagram

sequenceDiagram
  participant Producer
  participant ReadModelSession
  participant CommitBuilder
  participant Store
  participant Repository
  Producer->>ReadModelSession: stage documents/rows + mark processed
  ReadModelSession->>CommitBuilder: produce ReadModelWritePlan
  CommitBuilder->>Store: commit(write_plan)
  Store-->>CommitBuilder: ReadModelCommitOutcome (applied/skipped)
  CommitBuilder->>Repository: optionally chain aggregate commit
Loading

Estimated code review effort
🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

I hop through schemas, keys in a row,
Plans in my paws where projections grow.
Locks gently lifted, duplicates slipped,
Sessions committed, idempotent-tripped.
Docs and macros stitched — the tests all glow. 🥕🐇

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/orm

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
src/read_model/session.rs (1)

163-167: 💤 Low value

Consider using length-prefixed format for consistency with relational lock keys.

The key() method uses a simple collection:id format, which could collide if either contains : (e.g., collection "a:b" with id "c" vs collection "a" with id "b:c"). Relational mutations use key_fingerprint() with length-prefixing to avoid this.

Since collection names are typically static constants, this is unlikely to cause issues in practice, but a consistent approach would prevent subtle bugs if IDs ever contain colons.

♻️ Optional: Use fingerprint-style format
 impl DocumentMutation {
     pub fn key(&self) -> String {
-        format!("{}:{}", self.collection, self.id)
+        format!("{}:{}|{}:{}", 
+            self.collection.len(), self.collection,
+            self.id.len(), self.id)
     }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/read_model/session.rs` around lines 163 - 167, The current
DocumentMutation::key() builds "collection:id" which can collide when either
contains ':', so change key() to use a length-prefixed/fingerprint style like
the existing key_fingerprint() used for relational mutations: build the key by
prefixing the collection name with its byte/char length (e.g.
"{len}:{collection}:{id}") or delegate to the shared key_fingerprint helper so
the format matches other lock keys and avoids collisions when collection or id
contain ':'.
src/read_model/queued.rs (1)

268-287: 💤 Low value

Lock release iterates non-deduped keys while acquisition dedupes.

lock_ids_in_order (line 277) sorts and deduplicates keys before locking. However, the release loop (lines 281-283) iterates the original read_model_keys vector which may contain duplicates. This causes redundant release() calls for the same key.

While release() is idempotent and this doesn't cause correctness issues, it's inconsistent with acquisition and slightly wasteful.

♻️ Suggested improvement for consistency
     fn commit_write_plan(
         &self,
         plan: ReadModelWritePlan,
     ) -> Result<ReadModelCommitOutcome, ReadModelError> {
-        let read_model_keys: Vec<String> = plan
+        let mut read_model_keys: Vec<String> = plan
             .mutations
             .iter()
             .map(|mutation| mutation.lock_key())
             .collect();
+        read_model_keys.sort_unstable();
+        read_model_keys.dedup();
         let _locks = self.lock_ids_in_order(&read_model_keys)?;

         let result = self.inner.commit_write_plan(plan);
         if result.is_ok() {
             for key in read_model_keys {
                 self.release(&key);
             }
         }

         result
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/read_model/queued.rs` around lines 268 - 287, commit_write_plan collects
read_model_keys, but lock_ids_in_order returns a sorted/deduped list which is
stored in _locks; change the release loop to iterate the deduped lock list
returned by lock_ids_in_order (the value currently assigned to _locks) instead
of the original read_model_keys to avoid redundant release() calls—i.e., capture
the result of lock_ids_in_order (rename _locks to something like deduped_keys)
and call self.release(&key) for each key in that deduped list.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@sourced_rust_macros/src/read_model.rs`:
- Around line 174-176: The generated to_row() uses
row.insert_serde(`#column_name`, &self.#ident) for all fields, which serializes
Vec<u8> as JSON arrays despite column_type_tokens emitting ColumnType::Bytes;
update the code that builds row_inserts so when column_type_tokens indicates
ColumnType::Bytes (i.e., the Rust field type is Vec<u8>) you generate
row.insert_bytes(`#column_name`, &self.#ident) (or the library method that
produces RowValue::Bytes) instead of insert_serde; adjust the same logic in the
other generation block that produces the same inserts (the one referenced around
the other occurrence) so binary columns are bound as RowValue::Bytes
consistently.
- Around line 470-476: Duplicate relationship attributes (foreign_key/through)
are currently overwritten; update the parsing in the meta handling (the branch
where meta.path.is_ident("foreign_key") and the similar "through" branch) to
detect duplicates and return a parse error instead of silently replacing values:
when attrs.relationship.is_some() use relationship_mut(&mut attrs,
"foreign_key")? to get the relationship and if its .foreign_key is already
Some(...) produce and return a syn::Error (e.g.,
syn::Error::new_spanned(meta.path, "duplicate foreign_key attribute"));
likewise, if attrs.relationship.is_none() and pending_foreign_key is already
Some(...) return an error for duplicate pending foreign_key; apply the same
duplicate-detection logic for the "through" handling (checking
relationship.through and pending_through) so repeated attributes fail fast.

In `@src/read_model/in_memory.rs`:
- Around line 39-74: apply_document_write_plan currently validates the plan
against document_capabilities but then silently ignores any non-Document
variants in the loop over plan.mutations; change the function to explicitly
detect unsupported mutation kinds before applying changes (after
plan.validate_for returns) by iterating plan.mutations and if any variant is not
ReadModelMutation::Document return a clear ReadModelError (or map to an existing
UnsupportedMutation error) referencing the offending mutation or capability
mismatch, otherwise proceed to apply Document mutations as now; ensure the error
path references apply_document_write_plan, ReadModelMutation, and
document_capabilities so callers get an explicit failure instead of silent drop.

In `@src/read_model/metadata.rs`:
- Around line 242-257: In validate() ensure duplicate explicit index names are
rejected early: iterate self.indexes and track seen index.name values (skip
None/unnamed), and if an index.name is already in the set return
Err(ReadModelError::Metadata(...)) with a clear message like "read model `{}`
declares duplicate index name `{}`"; update the validation loop that currently
checks index.columns to also check names before proceeding to column membership
checks.

---

Nitpick comments:
In `@src/read_model/queued.rs`:
- Around line 268-287: commit_write_plan collects read_model_keys, but
lock_ids_in_order returns a sorted/deduped list which is stored in _locks;
change the release loop to iterate the deduped lock list returned by
lock_ids_in_order (the value currently assigned to _locks) instead of the
original read_model_keys to avoid redundant release() calls—i.e., capture the
result of lock_ids_in_order (rename _locks to something like deduped_keys) and
call self.release(&key) for each key in that deduped list.

In `@src/read_model/session.rs`:
- Around line 163-167: The current DocumentMutation::key() builds
"collection:id" which can collide when either contains ':', so change key() to
use a length-prefixed/fingerprint style like the existing key_fingerprint() used
for relational mutations: build the key by prefixing the collection name with
its byte/char length (e.g. "{len}:{collection}:{id}") or delegate to the shared
key_fingerprint helper so the format matches other lock keys and avoids
collisions when collection or id contain ':'.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 4faa0400-f5f8-4cc1-9194-2c8638ec6c93

📥 Commits

Reviewing files that changed from the base of the PR and between dc885dc and 5f23048.

📒 Files selected for processing (43)
  • README.md
  • docs/read-models.md
  • sourced_rust_macros/src/lib.rs
  • sourced_rust_macros/src/read_model.rs
  • src/commit_builder/mod.rs
  • src/hashmap_repo/repository.rs
  • src/lib.rs
  • src/read_model/in_memory.rs
  • src/read_model/metadata.rs
  • src/read_model/mod.rs
  • src/read_model/queued.rs
  • src/read_model/repository.rs
  • src/read_model/schema.rs
  • src/read_model/session.rs
  • src/read_model/store.rs
  • src/repository/batch.rs
  • src/repository/mod.rs
  • src/snapshot/repository.rs
  • tests/bomberman/commands.rs
  • tests/bomberman/main.rs
  • tests/bomberman/sim.rs
  • tests/bomberman/views.rs
  • tests/distributed_read_model/account_service/account.rs
  • tests/distributed_read_model/account_service/handlers/account_deposit.rs
  • tests/distributed_read_model/account_service/handlers/account_open.rs
  • tests/distributed_read_model/account_service/handlers/mod.rs
  • tests/distributed_read_model/account_service/mod.rs
  • tests/distributed_read_model/handlers/mod.rs
  • tests/distributed_read_model/main.rs
  • tests/distributed_read_model/models/aggregates/mod.rs
  • tests/distributed_read_model/models/mod.rs
  • tests/distributed_read_model/models/readmodels/mod.rs
  • tests/distributed_read_model/projections_service/mod.rs
  • tests/distributed_read_model/query_process.rs
  • tests/distributed_read_model/query_service/account_summary.rs
  • tests/distributed_read_model/query_service/mod.rs
  • tests/read_model_commit_bridge/main.rs
  • tests/read_model_distributed_idempotency/main.rs
  • tests/read_model_document_conformance/main.rs
  • tests/read_model_metadata/main.rs
  • tests/read_model_schema_bootstrap/main.rs
  • tests/read_model_session/main.rs
  • tests/read_models/main.rs
💤 Files with no reviewable changes (5)
  • tests/distributed_read_model/models/mod.rs
  • tests/distributed_read_model/models/readmodels/mod.rs
  • tests/distributed_read_model/handlers/mod.rs
  • tests/distributed_read_model/query_process.rs
  • tests/distributed_read_model/models/aggregates/mod.rs

Comment thread sourced_rust_macros/src/read_model.rs Outdated
Comment thread sourced_rust_macros/src/read_model.rs
Comment thread src/read_model/in_memory.rs
Comment thread src/read_model/metadata.rs
patrickleet and others added 8 commits May 23, 2026 20:26
Make `save_changes` reconcile included collections to the struct: an owned
has_many child dropped from the loaded Vec is deleted, lowering to an explicit
DeleteRow with the loaded expected version. belongs_to clear-to-None stays a
no-op on the target. Safe because has_many includes load the complete owned set.

Replaces the prior "removal does not delete by default" behavior, which was
asymmetric (auto-persisted adds/edits but silently dropped removals).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rework tests/distributed_read_model into a Catalog + Order CQRS slice over
normalized relational read models (ProductView, OrderView has_many
OrderLineView belongs_to ProductView, JSONB columns), and add a kanban
Board + Cards example. Add an order-fulfillment saga (inventory, payment,
saga orchestrator) driving confirm/cancel with a compensation path, projected
into an OrderFulfillmentStepView has_many child for a multi-include query.

Conventions: each write service is a microsvc::Service with service.rs +
handlers/ (one file per message) + models/ (aggregate); the projection service
is one dispatcher organized into handler modules; published domain events are
lowercase dot-namespaced. Services publish via the outbox and subscribe via
microsvc::subscribe — no bespoke transport.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (2)
sourced_rust_macros/src/read_model.rs (2)

507-513: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject duplicate foreign_key / through relationship metadata during parse.

In nested #[readmodel(...)] parsing, repeated relationship keys can still overwrite earlier values instead of failing fast.

Proposed change
                 } else if meta.path.is_ident("foreign_key") {
                     let value = meta.value()?.parse::<LitStr>()?.value();
                     if attrs.relationship.is_some() {
-                        relationship_mut(&mut attrs, "foreign_key")?.foreign_key = Some(value);
+                        let relationship = relationship_mut(&mut attrs, "foreign_key")?;
+                        if relationship.foreign_key.is_some() {
+                            return Err(meta.error("relationship foreign_key declared more than once"));
+                        }
+                        relationship.foreign_key = Some(value);
                     } else {
+                        if pending_foreign_key.is_some() {
+                            return Err(meta.error("relationship foreign_key declared more than once"));
+                        }
                         pending_foreign_key = Some(value);
                     }
...
                 } else if meta.path.is_ident("through") {
                     let through = meta.value()?.parse::<LitStr>()?.value();
                     if attrs.relationship.is_some() {
-                        relationship_mut(&mut attrs, "through")?.through = Some(through);
+                        let relationship = relationship_mut(&mut attrs, "through")?;
+                        if relationship.through.is_some() {
+                            return Err(meta.error("relationship through declared more than once"));
+                        }
+                        relationship.through = Some(through);
                     } else {
+                        if pending_through.is_some() {
+                            return Err(meta.error("relationship through declared more than once"));
+                        }
                         pending_through = Some(through);
                     }

Also applies to: 540-546

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@sourced_rust_macros/src/read_model.rs` around lines 507 - 513, The parser
currently allows repeated relationship metadata keys (e.g., "foreign_key" and
"through") to overwrite earlier values; update the parsing logic in the
readmodel attribute handling (the branch checking
meta.path.is_ident("foreign_key") and the similar "through" branch) to detect
duplicates and return an error instead of overwriting: when
meta.path.is_ident("foreign_key") or "through", check both attrs.relationship
(via relationship_mut(&mut attrs, "...")) and the corresponding pending_*
variable (e.g., pending_foreign_key / pending_through) and if a value is already
set in either place, return a syn::Error indicating a duplicate relationship
key; otherwise set the value as you currently do. Ensure you use the same error
reporting mechanism used elsewhere so parsing fails fast on duplicate keys.

180-182: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Generate RowValue::Bytes for Vec<u8> fields instead of insert_serde.

ColumnType::Bytes is emitted for Vec<u8>, but this insertion path serializes bytes as JSON arrays, which breaks adapter expectations for binary columns.

Proposed change
-        row_inserts.push(quote! {
-            row.insert_serde(`#column_name`, &self.#ident)?;
-        });
+        let value_ty = option_inner_type(&field.ty).unwrap_or(&field.ty);
+        let is_vec_u8 = last_type_segment(value_ty)
+            .is_some_and(|segment| segment.ident == "Vec" && vec_inner_is_u8(segment));
+        if is_vec_u8 {
+            if option_inner_type(&field.ty).is_some() {
+                row_inserts.push(quote! {
+                    row.insert(
+                        `#column_name`,
+                        self.#ident
+                            .as_ref()
+                            .map(|v| sourced_rust::RowValue::Bytes(v.clone()))
+                            .unwrap_or(sourced_rust::RowValue::Null),
+                    );
+                });
+            } else {
+                row_inserts.push(quote! {
+                    row.insert(`#column_name`, sourced_rust::RowValue::Bytes(self.#ident.clone()));
+                });
+            }
+        } else {
+            row_inserts.push(quote! {
+                row.insert_serde(`#column_name`, &self.#ident)?;
+            });
+        }
#!/bin/bash
# Verify that bytes typing exists while row insertion still uses serde for all fields.
rg -n -C2 'ColumnType::Bytes|row\.insert_serde\(`#column_name`' sourced_rust_macros/src/read_model.rs
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@sourced_rust_macros/src/read_model.rs` around lines 180 - 182, The generated
code currently uses row.insert_serde(`#column_name`, &self.#ident)? for all fields
which causes Vec<u8> (ColumnType::Bytes) to be serialized as JSON arrays; change
the row insertion generation in read_model.rs so that when the field type is
Vec<u8> (i.e., when you emit ColumnType::Bytes) you emit code to insert a binary
value via RowValue::Bytes (or the equivalent binary insert helper) instead of
insert_serde: update the logic that builds row_inserts to branch on the column
type/field type, detect Vec<u8> (ColumnType::Bytes) and generate an insertion
using RowValue::Bytes with the field identifier (`#ident`) and the column name
(`#column_name`) while leaving other types to use row.insert_serde.
🧹 Nitpick comments (1)
tests/read_model_distributed_idempotency/main.rs (1)

117-117: ⚡ Quick win

Assert on error variants instead of error message text.

These assertions are brittle because they depend on wording; matching the enum variant is more stable.

Proposed change
-    assert!(err.to_string().contains("not found"));
+    assert!(matches!(err, sourced_rust::ReadModelError::NotFound { .. }));
...
-    assert!(err.to_string().contains("not found"));
+    assert!(matches!(err, sourced_rust::ReadModelError::NotFound { .. }));

Also applies to: 152-152

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/read_model_distributed_idempotency/main.rs` at line 117, Replace
brittle string-based checks on the error text (the
assert!(err.to_string().contains("not found")) assertions) with checks that
match the actual error enum variant (use matches! or pattern matching against
the concrete error type returned by the operation) so the test asserts the
specific variant (e.g., NotFound) rather than message contents; update both
occurrences around the failing calls that bind err to assert matches!(err,
YourErrorType::NotFound { .. }) or use if let YourErrorType::NotFound { .. } =
err { } else { panic!(...) } and import the error enum type if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/distributed_read_model_board/projections_service/handlers/board.rs`:
- Around line 80-87: The current event_version(Event) silently returns 0 on
parse failure; change event_version to return Result<i64, anyhow::Error> (or a
suitable error type) and return an explicit error when Event.id's trailing
segment fails to parse instead of unwrap_or(0); update callers to propagate or
handle the Result. Locate the event_version function and replace the
rsplit/...parse().ok().unwrap_or(0) chain with parse().map_err(|e|
anyhow::anyhow!("invalid event version in id {}: {}", event.id, e))? (or
equivalent) so malformed IDs surface as failures rather than defaulting to 0.

In `@tests/distributed_read_model/catalog_service/models/product.rs`:
- Around line 13-18: The add method currently allows non-positive unit_cents;
mirror the validation in reprice by enforcing unit_cents > 0 inside add (in
tests/distributed_read_model/catalog_service/models/product.rs) and reject the
event the same way reprice does (e.g., return the same Err/Result or
panic/validation error used in reprice) so creation and updates share the same
pricing rule; update the add function to check unit_cents <= 0 and handle it
identically to reprice.

In `@tests/distributed_read_model/inventory_service/models/inventory.rs`:
- Around line 14-30: Validate and reject non-positive quantities and prevent
releasing more than reserved: in set_stock(&mut self, sku, quantity) ensure
quantity >= 0 before assigning available (or clamp to 0) so stock cannot be
initialized negative; in reserve(&mut self, quantity) add a guard to require
quantity > 0 and that self.available >= quantity (otherwise no change / early
return) before decrementing self.available and incrementing self.reserved; in
release(&mut self, quantity) require quantity > 0 and that self.reserved >=
quantity (otherwise no change / early return) and then decrement self.reserved
and increment self.available accordingly (don’t simply apply .max(0) which can
inflate available when releasing more than reserved).

In `@tests/distributed_read_model/order_service/models/order.rs`:
- Around line 29-54: add guards to prevent mutations when the order is not open
and to reject non-positive money/quantity values: in add_line (fn add_line)
validate unit_cents > 0 and quantity > 0 before mutating or emitting the event
(return early or no-op if invalid) and keep the existing when =
self.status.as_str() == "open" condition; in change_quantity (fn
change_quantity) add the same open-order guard (when = self.status.as_str() ==
"open") and require quantity > 0 before updating line.quantity; in remove_line
(fn remove_line) add the open-order guard (when = self.status.as_str() ==
"open") so removals are no-ops after submit. Ensure all checks are performed
before mutating self.lines so invalid inputs never change state.

In `@tests/distributed_read_model/projections_service/handlers/order.rs`:
- Around line 103-109: The helper event_version currently hides malformed IDs by
returning 0 via unwrap_or(0); change event_version to return a Result<i64, E>
(e.g., Result<i64, std::num::ParseIntError> or anyhow::Error) instead of i64,
parse the last segment with raw.parse()? and propagate parse errors rather than
defaulting, and update callers of event_version to handle the Result
(map/expect/propagate) so malformed event.id values fail fast instead of being
coerced to version 0; keep the function name event_version and ensure error
types are consistent with the surrounding codebase.

---

Duplicate comments:
In `@sourced_rust_macros/src/read_model.rs`:
- Around line 507-513: The parser currently allows repeated relationship
metadata keys (e.g., "foreign_key" and "through") to overwrite earlier values;
update the parsing logic in the readmodel attribute handling (the branch
checking meta.path.is_ident("foreign_key") and the similar "through" branch) to
detect duplicates and return an error instead of overwriting: when
meta.path.is_ident("foreign_key") or "through", check both attrs.relationship
(via relationship_mut(&mut attrs, "...")) and the corresponding pending_*
variable (e.g., pending_foreign_key / pending_through) and if a value is already
set in either place, return a syn::Error indicating a duplicate relationship
key; otherwise set the value as you currently do. Ensure you use the same error
reporting mechanism used elsewhere so parsing fails fast on duplicate keys.
- Around line 180-182: The generated code currently uses
row.insert_serde(`#column_name`, &self.#ident)? for all fields which causes
Vec<u8> (ColumnType::Bytes) to be serialized as JSON arrays; change the row
insertion generation in read_model.rs so that when the field type is Vec<u8>
(i.e., when you emit ColumnType::Bytes) you emit code to insert a binary value
via RowValue::Bytes (or the equivalent binary insert helper) instead of
insert_serde: update the logic that builds row_inserts to branch on the column
type/field type, detect Vec<u8> (ColumnType::Bytes) and generate an insertion
using RowValue::Bytes with the field identifier (`#ident`) and the column name
(`#column_name`) while leaving other types to use row.insert_serde.

---

Nitpick comments:
In `@tests/read_model_distributed_idempotency/main.rs`:
- Line 117: Replace brittle string-based checks on the error text (the
assert!(err.to_string().contains("not found")) assertions) with checks that
match the actual error enum variant (use matches! or pattern matching against
the concrete error type returned by the operation) so the test asserts the
specific variant (e.g., NotFound) rather than message contents; update both
occurrences around the failing calls that bind err to assert matches!(err,
YourErrorType::NotFound { .. }) or use if let YourErrorType::NotFound { .. } =
err { } else { panic!(...) } and import the error enum type if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 90853aba-e465-4667-b315-f2a306deff81

📥 Commits

Reviewing files that changed from the base of the PR and between 5f23048 and 7b719b6.

📒 Files selected for processing (83)
  • docs/read-models.md
  • sourced_rust_macros/src/read_model.rs
  • src/lib.rs
  • src/read_model/in_memory.rs
  • src/read_model/metadata.rs
  • src/read_model/mod.rs
  • src/read_model/session.rs
  • tests/distributed_read_model/catalog_service/handlers/mod.rs
  • tests/distributed_read_model/catalog_service/handlers/product_add.rs
  • tests/distributed_read_model/catalog_service/handlers/product_reprice.rs
  • tests/distributed_read_model/catalog_service/mod.rs
  • tests/distributed_read_model/catalog_service/models/mod.rs
  • tests/distributed_read_model/catalog_service/models/product.rs
  • tests/distributed_read_model/catalog_service/service.rs
  • tests/distributed_read_model/fulfillment.rs
  • tests/distributed_read_model/inventory_service/handlers/mod.rs
  • tests/distributed_read_model/inventory_service/handlers/release.rs
  • tests/distributed_read_model/inventory_service/handlers/reserve.rs
  • tests/distributed_read_model/inventory_service/mod.rs
  • tests/distributed_read_model/inventory_service/models/inventory.rs
  • tests/distributed_read_model/inventory_service/models/mod.rs
  • tests/distributed_read_model/inventory_service/service.rs
  • tests/distributed_read_model/main.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/mod.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/record_inventory_released.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/record_inventory_reserved.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/record_payment_declined.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/record_payment_succeeded.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/start.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/mod.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/models/mod.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/models/saga.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/service.rs
  • tests/distributed_read_model/order_service/handlers/mod.rs
  • tests/distributed_read_model/order_service/handlers/order_add_line.rs
  • tests/distributed_read_model/order_service/handlers/order_cancel.rs
  • tests/distributed_read_model/order_service/handlers/order_change_quantity.rs
  • tests/distributed_read_model/order_service/handlers/order_confirm.rs
  • tests/distributed_read_model/order_service/handlers/order_place.rs
  • tests/distributed_read_model/order_service/handlers/order_remove_line.rs
  • tests/distributed_read_model/order_service/handlers/order_submit.rs
  • tests/distributed_read_model/order_service/mod.rs
  • tests/distributed_read_model/order_service/models/mod.rs
  • tests/distributed_read_model/order_service/models/order.rs
  • tests/distributed_read_model/order_service/service.rs
  • tests/distributed_read_model/payment_service/handlers/charge.rs
  • tests/distributed_read_model/payment_service/handlers/mod.rs
  • tests/distributed_read_model/payment_service/mod.rs
  • tests/distributed_read_model/payment_service/models/mod.rs
  • tests/distributed_read_model/payment_service/models/payment.rs
  • tests/distributed_read_model/payment_service/service.rs
  • tests/distributed_read_model/projections_service/handlers/fulfillment.rs
  • tests/distributed_read_model/projections_service/handlers/mod.rs
  • tests/distributed_read_model/projections_service/handlers/order.rs
  • tests/distributed_read_model/projections_service/handlers/product.rs
  • tests/distributed_read_model/projections_service/mod.rs
  • tests/distributed_read_model/projections_service/service.rs
  • tests/distributed_read_model/query_service/mod.rs
  • tests/distributed_read_model/read_models/mod.rs
  • tests/distributed_read_model/read_models/order_fulfillment_step_view.rs
  • tests/distributed_read_model/read_models/order_line_view.rs
  • tests/distributed_read_model/read_models/order_view.rs
  • tests/distributed_read_model/read_models/product_view.rs
  • tests/distributed_read_model_board/board_service/handlers/board_add_card.rs
  • tests/distributed_read_model_board/board_service/handlers/board_move_card.rs
  • tests/distributed_read_model_board/board_service/handlers/board_open.rs
  • tests/distributed_read_model_board/board_service/handlers/board_remove_card.rs
  • tests/distributed_read_model_board/board_service/handlers/mod.rs
  • tests/distributed_read_model_board/board_service/mod.rs
  • tests/distributed_read_model_board/board_service/models/board.rs
  • tests/distributed_read_model_board/board_service/models/mod.rs
  • tests/distributed_read_model_board/board_service/service.rs
  • tests/distributed_read_model_board/main.rs
  • tests/distributed_read_model_board/projections_service/handlers/board.rs
  • tests/distributed_read_model_board/projections_service/handlers/mod.rs
  • tests/distributed_read_model_board/projections_service/mod.rs
  • tests/distributed_read_model_board/query_service/mod.rs
  • tests/distributed_read_model_board/read_models/board_view.rs
  • tests/distributed_read_model_board/read_models/card_view.rs
  • tests/distributed_read_model_board/read_models/mod.rs
  • tests/read_model_distributed_idempotency/main.rs
  • tests/read_model_relationship_includes/main.rs
  • tests/read_model_session/main.rs
✅ Files skipped from review due to trivial changes (10)
  • tests/distributed_read_model/order_fulfillment_saga_service/models/mod.rs
  • tests/distributed_read_model/order_service/models/mod.rs
  • tests/distributed_read_model/catalog_service/models/mod.rs
  • tests/distributed_read_model/inventory_service/models/mod.rs
  • tests/distributed_read_model/inventory_service/handlers/reserve.rs
  • tests/distributed_read_model/order_fulfillment_saga_service/handlers/mod.rs
  • tests/distributed_read_model/payment_service/mod.rs
  • tests/distributed_read_model_board/board_service/handlers/mod.rs
  • tests/distributed_read_model_board/board_service/mod.rs
  • tests/distributed_read_model_board/read_models/mod.rs

Comment thread tests/distributed_read_model/catalog_service/models/product.rs Outdated
Comment thread tests/distributed_read_model/inventory_service/models/inventory.rs
Comment thread tests/distributed_read_model/order_service/models/order.rs Outdated
Comment thread tests/distributed_read_model/projections_service/handlers/order.rs Outdated
@patrickleet patrickleet merged commit 9c6f5c2 into main May 24, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant