From d2669cd67a452d125e86fa1c6186c502486199da Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Wed, 27 May 2026 12:44:32 -0500 Subject: [PATCH 1/4] feat: microsvc repo + readmodel context config --- README.md | 59 ++++---- docs/async-repositories.md | 2 +- docs/read-models.md | 43 +++--- src/commit_builder/mod.rs | 68 ++++----- src/hashmap_repo/repository.rs | 20 +-- src/lib.rs | 15 +- src/microsvc/context.rs | 55 +++++--- src/microsvc/dependencies.rs | 131 ++++++++++++++++++ src/microsvc/grpc.rs | 26 ++-- src/microsvc/http.rs | 16 +-- src/microsvc/mod.rs | 23 +-- src/microsvc/service.rs | 100 +++++++++---- src/read_model/in_memory.rs | 14 +- src/read_model/metadata.rs | 2 +- src/read_model/mod.rs | 20 +-- src/read_model/queued.rs | 4 +- src/read_model/session.rs | 131 ++++++++++++------ src/repository/async_repository.rs | 2 +- src/repository/mod.rs | 2 +- src/sqlite_repo/mod.rs | 4 +- tests/async_repository/main.rs | 6 +- .../checkout_saga_service/service.rs | 2 +- tests/distributed_read_model/main.rs | 6 +- .../projection_service/handlers/checkout.rs | 32 +++-- .../projection_service/handlers/mod.rs | 6 +- .../projection_service/handlers/seat.rs | 26 ++-- .../projection_service/mod.rs | 2 +- .../projection_service/service.rs | 14 +- .../query_service/mod.rs | 6 +- .../seat_inventory_service/service.rs | 2 +- .../board_service/service.rs | 2 +- tests/distributed_read_model_board/main.rs | 12 +- .../projections_service/handlers/board.rs | 54 +++++--- .../projections_service/handlers/mod.rs | 43 ++++-- .../projections_service/mod.rs | 119 ++++++++++------ .../query_service/mod.rs | 6 +- tests/microsvc/basic.rs | 2 +- tests/microsvc/convention.rs | 12 +- tests/microsvc/session.rs | 5 +- tests/microsvc/transport_grpc.rs | 2 +- tests/microsvc/transport_http.rs | 2 +- tests/microsvc/transport_listen.rs | 6 +- tests/microsvc/transport_subscribe.rs | 2 +- tests/postgres_repository/main.rs | 4 +- tests/read_model_commit_bridge/main.rs | 6 +- .../main.rs | 16 +-- tests/read_model_document_conformance/main.rs | 15 +- .../read_model_relationship_includes/main.rs | 72 +++++----- tests/read_model_session/main.rs | 48 +++---- tests/read_models/main.rs | 8 +- tests/sagas/microsvc_saga.rs | 18 +-- tests/sqlite_repository/main.rs | 12 +- 52 files changed, 817 insertions(+), 488 deletions(-) create mode 100644 src/microsvc/dependencies.rs diff --git a/README.md b/README.md index 5ed093b..db950f9 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ Each handler is a module with `COMMAND`, `guard`, and `handle`: // handlers/todo_create.rs use serde::Deserialize; use serde_json::{json, Value}; -use sourced_rust::microsvc::{Context, HandlerError}; +use sourced_rust::microsvc::{Context, HandlerError, HasRepo}; use sourced_rust::{AggregateBuilder, OutboxCommitExt, OutboxMessage, Repository}; pub const COMMAND: &str = "todo.create"; @@ -98,11 +98,15 @@ pub const COMMAND: &str = "todo.create"; #[derive(Deserialize)] struct Input { id: String, user_id: String, task: String } -pub fn guard(ctx: &Context) -> bool { +pub fn guard(ctx: &Context) -> bool { ctx.has_fields(&["id", "user_id", "task"]) } -pub fn handle(ctx: &Context) -> Result { +pub fn handle(ctx: &Context) -> Result +where + D: HasRepo, + D::Repo: Repository + Clone, +{ let input = ctx.input::()?; let repo = ctx.repo().clone().aggregate::(); @@ -126,7 +130,7 @@ use sourced_rust::{microsvc, HashMapRepository, Queueable}; fn main() -> Result<(), Box> { let service = Arc::new(sourced_rust::register_handlers!( - microsvc::Service::new(HashMapRepository::new().queued()), + microsvc::Service::with_repo(HashMapRepository::new().queued()), handlers::todo_create, handlers::todo_complete, )); @@ -929,11 +933,11 @@ let _stats = worker.stop()?; ## Microservice Framework (`microsvc`) -The `microsvc` module provides a convention-based command handler framework for building microservices. Register command handlers on a `Service`, then expose them over HTTP, bus transports, or direct dispatch. +The `microsvc` module provides a convention-based command handler framework for building microservices. Register command handlers on a `Service`, then expose them over HTTP, bus transports, or direct dispatch. ### Defining a Service -A `Service` is generic over a repository type. Register commands with closures or handler modules: +A `Service` is generic over a dependency type. Use `Service::with_repo` for aggregate command handlers, `Service::with_read_model_store` for projection handlers, and `Service::with_repo_and_read_model_store` when a handler genuinely needs both: ```rust use std::sync::Arc; @@ -941,7 +945,7 @@ use sourced_rust::{microsvc, HashMapRepository, AggregateBuilder, Queueable}; use serde_json::json; let service = Arc::new( - microsvc::Service::new(HashMapRepository::new().queued()) + microsvc::Service::with_repo(HashMapRepository::new().queued()) .command("counter.create", |ctx| { let input = ctx.input::()?; let counter_repo = ctx.repo().clone().aggregate::(); @@ -978,7 +982,7 @@ Add input validation with `command_guarded`. The guard runs before the handler use serde_json::json; use sourced_rust::{microsvc, HashMapRepository, Queueable}; -let service = microsvc::Service::new(HashMapRepository::new().queued()) +let service = microsvc::Service::with_repo(HashMapRepository::new().queued()) .command_guarded( "admin.reset", |ctx| ctx.role() == Some("admin"), @@ -994,6 +998,7 @@ For larger services, organize handlers into separate files following a conventio // src/handlers/counter_create.rs use serde::Deserialize; use serde_json::{json, Value}; +use sourced_rust::microsvc::HasRepo; use sourced_rust::{microsvc, AggregateBuilder, Repository}; pub const COMMAND: &str = "counter.create"; @@ -1003,13 +1008,15 @@ struct Input { id: String, } -pub fn guard(ctx: µsvc::Context) -> bool { +pub fn guard(ctx: µsvc::Context) -> bool { ctx.has_fields(&["id"]) } -pub fn handle( - ctx: µsvc::Context, -) -> Result { +pub fn handle(ctx: µsvc::Context) -> Result +where + D: HasRepo, + D::Repo: Repository + Clone, +{ let input = ctx.input::()?; let counter_repo = ctx.repo().clone().aggregate::(); let mut counter = Counter::default(); @@ -1023,7 +1030,7 @@ Register them with the `register_handlers!` macro: ```rust let service = sourced_rust::register_handlers!( - microsvc::Service::new(HashMapRepository::new().queued()), + microsvc::Service::with_repo(HashMapRepository::new().queued()), handlers::counter_create, handlers::counter_increment, ); @@ -1039,7 +1046,7 @@ use serde_json::json; use sourced_rust::{microsvc, HashMapRepository, Queueable}; let service = Arc::new( - microsvc::Service::new(HashMapRepository::new().queued()) + microsvc::Service::with_repo(HashMapRepository::new().queued()) .command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) }) ); @@ -1083,7 +1090,7 @@ use serde_json::json; use sourced_rust::{microsvc, HashMapRepository, Queueable}; let service = Arc::new( - microsvc::Service::new(HashMapRepository::new().queued()) + microsvc::Service::with_repo(HashMapRepository::new().queued()) .command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) }) ); @@ -1132,7 +1139,7 @@ use sourced_rust::{microsvc, bus::{InMemoryQueue, Sender, Event}, HashMapReposit let queue = InMemoryQueue::new(); let service = Arc::new( - microsvc::Service::new(HashMapRepository::new().queued()) + microsvc::Service::with_repo(HashMapRepository::new().queued()) .command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) }) ); @@ -1163,7 +1170,7 @@ A single service can handle commands from multiple transports simultaneously — ```rust let service = Arc::new( - microsvc::Service::new(HashMapRepository::new().queued()) + microsvc::Service::with_repo(HashMapRepository::new().queued()) .command("counter.create", |ctx| { /* ... */ Ok(json!({})) }) ); @@ -1194,7 +1201,7 @@ microsvc::serve(service.clone(), "0.0.0.0:3000").await?; ## Read Models -Read models are query-optimized projections derived from aggregates, event records, or published messages. Document read models store a whole view in a document payload column; normalized relational read models use table metadata plus `ReadModelSession` write plans. +Read models are query-optimized projections derived from aggregates, event records, or published messages. Document read models store a whole view in a document payload column; normalized relational read models use table metadata plus `ReadModelWritePlanBuilder` write plans. ### Defining a Read Model @@ -1231,22 +1238,22 @@ repo.readmodel(&view).commit(&mut game)?; // Return `view` to the client — it reflects the committed state ``` -For relational read models, stage structured row mutations in a session: +For relational read models, build a structured read-model write plan: ```rust -use sourced_rust::{ReadModelSession, ReadModelSessionCommitExt}; +use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt}; -let mut read_models = ReadModelSession::new(); -read_models.save(&player_view)?; -read_models.save_related(&player_view, "weapons", &weapon_view)?; +let mut read_models = ReadModelWritePlanBuilder::new(); +read_models.upsert(&player_view)?; +read_models.upsert_related(&player_view, "weapons", &weapon_view)?; repo.read_models(read_models).commit(&mut game)?; ``` -Distributed projectors can commit the same session shape directly against a read-model adapter and mark messages processed in the same adapter transaction: +Distributed projectors can commit the same write-plan shape directly against a read-model adapter and mark messages processed in the same adapter transaction: ```rust -let mut read_models = ReadModelSession::new(); +let mut read_models = ReadModelWritePlanBuilder::new(); read_models.document(&view)?.mark_processed("game-view-projector", event_id); let outcome = read_models.commit(&read_store)?; ``` @@ -1255,7 +1262,7 @@ This is a deliberate consistency tradeoff. The read model is in sync with the ag Bomberman `BoardView` remains a document-row example backed by a whole-view payload, not a normalized relational ORM example. -See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, document rows, session commits, schema bootstrap, distributed idempotency, and non-goals. +See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, document rows, workspace commits, schema bootstrap, distributed idempotency, and non-goals. ## Snapshots diff --git a/docs/async-repositories.md b/docs/async-repositories.md index 473a762..8571411 100644 --- a/docs/async-repositories.md +++ b/docs/async-repositories.md @@ -17,7 +17,7 @@ persistence should override it with an explicit durable name through - `AsyncGetStream` loads one or more event streams by full identity. - `AsyncTransactionalCommit` commits `AsyncCommitBatch` values with stream writes, read-model write plans, and snapshots under one backend transaction. -- `AsyncReadModelStore`, `AsyncReadModelSessionStore`, and +- `AsyncReadModelStore`, `AsyncReadModelWritePlanStore`, and `AsyncRelationalReadModelQueryStore` mirror the current document and relational read-model surfaces for async adapters. - `AsyncSnapshotStore` keys rebuildable snapshot cache records by full stream diff --git a/docs/read-models.md b/docs/read-models.md index c23ccdd..b80577f 100644 --- a/docs/read-models.md +++ b/docs/read-models.md @@ -8,9 +8,9 @@ The current implementation keeps these paths explicit: | Path | API | Use when | |---|---|---| -| Document rows | `ReadModelStore`, `.readmodel(&view)`, `ReadModelSession::document` | Whole-view JSON documents backed by a document payload column | -| Relational write mapping | `RelationalReadModel`, `ReadModelSession`, `ReadModelWritePlan` | Normalized tables, composite keys, foreign keys, JSONB columns | -| Explicit relationship includes | `store.session().load(...).include(...).one()`, `save_changes` | Internal primary-key reads with declared one-level relationships | +| Document rows | `ReadModelStore`, `.readmodel(&view)`, `ReadModelWritePlanBuilder::document` | Whole-view JSON documents backed by a document payload column | +| Relational write mapping | `RelationalReadModel`, `ReadModelWritePlanBuilder`, `ReadModelWritePlan` | Normalized tables, composite keys, foreign keys, JSONB columns | +| Explicit relationship includes | `store.workspace().load(...).include(...).one()`, `sync` | Internal primary-key reads with declared one-level relationships | | Schema lifecycle | `ReadModelSchemaRegistry`, `ReadModelSchemaAdapter` | Migration artifact generation, startup verification, explicit dev/test bootstrap | ## Document Read Models @@ -100,7 +100,7 @@ pub struct PlayerWeaponView { The derive emits `RelationalReadModel` metadata, row conversion, primary-key metadata, JSONB column metadata, indexes, and an adapter-owned version column. -Composite and delegated keys are represented in the schema and in session row +Composite and delegated keys are represented in the schema and in write-plan row mutations. Use `#[index]` or `#[index("index_name")]` for a secondary field index. Use @@ -113,17 +113,17 @@ For compound indexes, put `#[index(columns = ["field_a", "field_b"])]` or Relationship includes are primary-key anchored and opt-in. Register the relational schemas with an adapter, load one root row, ask for each relationship -explicitly, mutate the hydrated struct, then save the tracked changes: +explicitly, mutate the hydrated struct, then sync the tracked workspace: ```rust -use sourced_rust::{InMemoryReadModelStore, ReadModelUnitOfWorkExt, RowKey, RowValue}; +use sourced_rust::{InMemoryReadModelStore, ReadModelWorkspaceExt, RowKey, RowValue}; let store = InMemoryReadModelStore::new(); store.register_schema::()?; store.register_schema::()?; -let mut read_models = store.session(); -let mut player = read_models +let mut workspace = store.workspace(); +let mut player = workspace .load::(RowKey::new([("player_id", RowValue::String("player-1".into()))])) .include("weapons") .one()? @@ -137,14 +137,14 @@ player.weapons.push(PlayerWeaponView { acquired_at: "2026-05-23".into(), }); -read_models.save_changes(player)?; -read_models.commit()?; +workspace.sync(player)?; +workspace.commit()?; ``` `has_many` relationships hydrate `Vec` fields. `belongs_to` relationships hydrate `Option` fields. -`save_changes` makes storage match the struct: added items are inserted, changed +`sync` makes storage match the struct: added items are inserted, changed items updated, and **removed items deleted**. For an included `has_many` collection, dropping a child from the `Vec` deletes that child row (the loaded collection is the complete owned set, so the struct is the source of truth). @@ -163,15 +163,16 @@ for normalized Postgres read models. ## Command-Side Atomic Writes -Use `ReadModelSession` when a command or projector stages multiple document or -normalized row mutations. The current repository APIs are synchronous: +Use `ReadModelWritePlanBuilder` when a command or projector stages multiple +document or normalized row mutations. The current repository APIs are +synchronous: ```rust -use sourced_rust::{ReadModelSession, ReadModelSessionCommitExt}; +use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt}; -let mut read_models = ReadModelSession::new(); -read_models.save(&player)?; -read_models.save_related(&player, "weapons", &weapon)?; +let mut read_models = ReadModelWritePlanBuilder::new(); +read_models.upsert(&player)?; +read_models.upsert_related(&player, "weapons", &weapon)?; repo.read_models(read_models).commit(&mut aggregate)?; ``` @@ -207,18 +208,18 @@ repo.readmodel(&board_view).commit(&mut game)?; ## Standalone Distributed Projectors -A read-model service can commit a session without owning an aggregate +A read-model service can commit a write plan without owning an aggregate repository: ```rust -use sourced_rust::{ReadModelError, ReadModelSession, ReadModelSessionStore}; +use sourced_rust::{ReadModelError, ReadModelWritePlanBuilder, ReadModelWritePlanStore}; fn project_message( - store: &impl ReadModelSessionStore, + store: &impl ReadModelWritePlanStore, event_id: &str, view: &GameView, ) -> Result<(), ReadModelError> { - let mut read_models = ReadModelSession::new(); + let mut read_models = ReadModelWritePlanBuilder::new(); read_models .document(view)? .mark_processed("game-view-projector", event_id); diff --git a/src/commit_builder/mod.rs b/src/commit_builder/mod.rs index e9d2f6e..15d0887 100644 --- a/src/commit_builder/mod.rs +++ b/src/commit_builder/mod.rs @@ -1,4 +1,4 @@ -//! CommitBuilder - chain read models, sessions, outbox, and aggregates into one transactional batch. +//! CommitBuilder - chain read models, write plans, outbox, and aggregates into one transactional batch. //! //! ## Example //! @@ -9,10 +9,10 @@ //! .outbox(message) //! .commit(&mut game)?; //! -//! // Relational/session read models. -//! let mut read_models = sourced_rust::ReadModelSession::new(); -//! read_models.save(&player)?; -//! read_models.save_related(&player, "weapons", &weapon)?; +//! // Relational read-model write plans. +//! let mut read_models = sourced_rust::ReadModelWritePlanBuilder::new(); +//! read_models.upsert(&player)?; +//! read_models.upsert_related(&player, "weapons", &weapon)?; //! //! repo //! .read_models(read_models) @@ -34,7 +34,7 @@ use crate::aggregate::Aggregate; use crate::entity::Entity; use crate::outbox::OutboxMessage; -use crate::read_model::{ReadModel, ReadModelError, ReadModelSession, ReadModelWritePlan}; +use crate::read_model::{ReadModel, ReadModelError, ReadModelWritePlan, ReadModelWritePlanBuilder}; use crate::repository::{CommitBatch, RepositoryError, TransactionalCommit}; /// Builder for chaining multiple items into a single transactional commit batch. @@ -125,13 +125,13 @@ impl<'a, R> CommitBuilder<'a, R> { self } - /// Add a read-model session to the commit. - pub fn read_models(mut self, session: ReadModelSession) -> Self { + /// Add a read-model write plan builder to the commit. + pub fn read_models(mut self, read_models: ReadModelWritePlanBuilder) -> Self { if self.error.is_some() { return self; } - match session.into_write_plan() { + match read_models.into_write_plan() { Ok(plan) => self.read_model_plans.push(plan), Err(err) => self.error = Some(err.into()), } @@ -258,12 +258,12 @@ impl<'a, R> StagedCommitBuilder<'a, R> { self } - pub fn read_models(mut self, session: ReadModelSession) -> Self { + pub fn read_models(mut self, read_models: ReadModelWritePlanBuilder) -> Self { if self.error.is_some() { return self; } - match session.into_write_plan() { + match read_models.into_write_plan() { Ok(plan) => self.read_model_plans.push(plan), Err(err) => self.error = Some(err.into()), } @@ -328,8 +328,8 @@ pub trait CommitBuilderExt: TransactionalCommit + Sized { impl CommitBuilderExt for R {} fn document_plan(model: &M) -> Result { - let mut session = ReadModelSession::new(); - session.document(model).map_err(|err| match err { + let mut read_models = ReadModelWritePlanBuilder::new(); + read_models.document(model).map_err(|err| match err { ReadModelError::Serde(message) => RepositoryError::Model(format!( "failed to serialize read model {}:{}: {}", M::COLLECTION, @@ -338,18 +338,18 @@ fn document_plan(model: &M) -> Result other.into(), })?; - Ok(session.into_write_plan()?) + Ok(read_models.into_write_plan()?) } -/// Extension trait for the new relational read-model session commit entrypoints. +/// Extension trait for relational read-model write-plan commit entrypoints. /// /// Kept separate from `CommitBuilderExt` so the existing /// `ReadModelsExt::read_models::()` query accessor remains unambiguous unless -/// callers explicitly opt into the session starter. -pub trait ReadModelSessionCommitExt: TransactionalCommit + Sized { - /// Start a commit builder chain with a relational read-model session. - fn read_models(&self, session: ReadModelSession) -> CommitBuilder<'_, Self> { - CommitBuilder::new(self).read_models(session) +/// callers explicitly opt into the write-plan starter. +pub trait ReadModelWritePlanCommitExt: TransactionalCommit + Sized { + /// Start a commit builder chain with a relational read-model write plan. + fn read_models(&self, read_models: ReadModelWritePlanBuilder) -> CommitBuilder<'_, Self> { + CommitBuilder::new(self).read_models(read_models) } /// Start a staged commit builder with an aggregate. @@ -361,7 +361,7 @@ pub trait ReadModelSessionCommitExt: TransactionalCommit + Sized { } } -impl ReadModelSessionCommitExt for R {} +impl ReadModelWritePlanCommitExt for R {} #[cfg(test)] mod tests { @@ -491,8 +491,8 @@ mod tests { } } - fn raw_session(view: &TestView) -> crate::read_model::ReadModelSession { - let mut session = crate::read_model::ReadModelSession::new(); + fn raw_session(view: &TestView) -> crate::read_model::ReadModelWritePlanBuilder { + let mut session = crate::read_model::ReadModelWritePlanBuilder::new(); session.document(view).unwrap(); session } @@ -557,7 +557,7 @@ mod tests { let mut agg = TestAggregate::default(); agg.touch(); - ReadModelSessionCommitExt::read_models(&repo, raw_session(&view)) + ReadModelWritePlanCommitExt::read_models(&repo, raw_session(&view)) .commit(&mut agg) .unwrap(); @@ -734,7 +734,7 @@ mod tests { agg.touch(); match order { - 0 => ReadModelSessionCommitExt::read_models(&repo, raw_session(&view)) + 0 => ReadModelWritePlanCommitExt::read_models(&repo, raw_session(&view)) .outbox(outbox) .aggregate(&mut agg) .commit() @@ -745,7 +745,7 @@ mod tests { .aggregate(&mut agg) .commit() .unwrap(), - _ => ReadModelSessionCommitExt::aggregate(&repo, &mut agg) + _ => ReadModelWritePlanCommitExt::aggregate(&repo, &mut agg) .read_models(raw_session(&view)) .outbox(outbox) .commit() @@ -771,7 +771,7 @@ mod tests { agg.touch(); let outbox = OutboxMessage::create("sourced-msg", "TestEvent", b"{}".to_vec()).unwrap(); - ReadModelSessionCommitExt::aggregate(&repo, &mut agg) + ReadModelWritePlanCommitExt::aggregate(&repo, &mut agg) .outbox(outbox) .commit() .unwrap(); @@ -801,7 +801,7 @@ mod tests { agg2.touch(); agg2.entity.set_id("agg-2"); - ReadModelSessionCommitExt::read_models(&repo, raw_session(&view)) + ReadModelWritePlanCommitExt::read_models(&repo, raw_session(&view)) .aggregate(&mut agg1) .aggregate(&mut agg2) .commit() @@ -839,7 +839,7 @@ mod tests { let mut agg = TestAggregate::default(); agg.touch(); - ReadModelSessionCommitExt::read_models(&repo, raw_session(&updated)) + ReadModelWritePlanCommitExt::read_models(&repo, raw_session(&updated)) .commit(&mut agg) .unwrap(); @@ -854,12 +854,12 @@ mod tests { #[test] fn invalid_session_plan_does_not_commit_aggregate() { let repo = RecordingBatchRepo::default(); - let mut session = crate::read_model::ReadModelSession::new(); + let mut session = crate::read_model::ReadModelWritePlanBuilder::new(); session.mark_processed("", "message-1"); let mut agg = TestAggregate::default(); agg.touch(); - let err = ReadModelSessionCommitExt::read_models(&repo, session) + let err = ReadModelWritePlanCommitExt::read_models(&repo, session) .commit(&mut agg) .unwrap_err(); @@ -877,12 +877,12 @@ mod tests { id: "relational".into(), counter: 3, }; - let mut session = crate::read_model::ReadModelSession::new(); - session.save(&view).unwrap(); + let mut session = crate::read_model::ReadModelWritePlanBuilder::new(); + session.upsert(&view).unwrap(); let mut agg = TestAggregate::default(); agg.touch(); - let err = ReadModelSessionCommitExt::read_models(&repo, session) + let err = ReadModelWritePlanCommitExt::read_models(&repo, session) .commit(&mut agg) .unwrap_err(); diff --git a/src/hashmap_repo/repository.rs b/src/hashmap_repo/repository.rs index 814853a..fe3c408 100644 --- a/src/hashmap_repo/repository.rs +++ b/src/hashmap_repo/repository.rs @@ -15,10 +15,10 @@ use crate::outbox::OutboxMessage; use crate::read_model::in_memory::apply_document_write_plan; use crate::read_model::{ InMemoryReadModelStore, ReadModel, ReadModelAdapterCapabilities, ReadModelCommitOutcome, - ReadModelError, ReadModelSessionStore, ReadModelStore, ReadModelWritePlan, Versioned, + ReadModelError, ReadModelStore, ReadModelWritePlan, ReadModelWritePlanStore, Versioned, }; use crate::repository::{ - AsyncCommitBatch, AsyncGetStream, AsyncReadModelSessionStore, AsyncReadModelStore, + AsyncCommitBatch, AsyncGetStream, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncSnapshotStore, AsyncSnapshotWrite, AsyncStreamWrite, AsyncTransactionalCommit, Commit, CommitBatch, GetMany, GetOne, PreparedEventAppend, RepositoryError, SnapshotWrite, StreamIdentity, TransactionalCommit, @@ -596,33 +596,33 @@ impl AsyncReadModelStore for HashMapRepository { } } -impl ReadModelSessionStore for HashMapRepository { +impl ReadModelWritePlanStore for HashMapRepository { fn read_model_capabilities(&self) -> ReadModelAdapterCapabilities { - ReadModelSessionStore::read_model_capabilities(&self.model_store) + ReadModelWritePlanStore::read_model_capabilities(&self.model_store) } fn commit_write_plan( &self, plan: ReadModelWritePlan, ) -> Result { - ReadModelSessionStore::commit_write_plan(&self.model_store, plan) + ReadModelWritePlanStore::commit_write_plan(&self.model_store, plan) } fn is_processed(&self, consumer_name: &str, message_id: &str) -> Result { - ReadModelSessionStore::is_processed(&self.model_store, consumer_name, message_id) + ReadModelWritePlanStore::is_processed(&self.model_store, consumer_name, message_id) } } -impl AsyncReadModelSessionStore for HashMapRepository { +impl AsyncReadModelWritePlanStore for HashMapRepository { fn read_model_capabilities_async(&self) -> ReadModelAdapterCapabilities { - ReadModelSessionStore::read_model_capabilities(self) + ReadModelWritePlanStore::read_model_capabilities(self) } fn commit_write_plan_async( &self, plan: ReadModelWritePlan, ) -> impl Future> + Send + '_ { - async move { ReadModelSessionStore::commit_write_plan(self, plan) } + async move { ReadModelWritePlanStore::commit_write_plan(self, plan) } } fn is_processed_async<'a>( @@ -630,7 +630,7 @@ impl AsyncReadModelSessionStore for HashMapRepository { consumer_name: &'a str, message_id: &'a str, ) -> impl Future> + Send + 'a { - async move { ReadModelSessionStore::is_processed(self, consumer_name, message_id) } + async move { ReadModelWritePlanStore::is_processed(self, consumer_name, message_id) } } } diff --git a/src/lib.rs b/src/lib.rs index a30942a..7260350 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ pub type SourcedResult = std::result::Result; // Re-export repository traits at crate root for convenience pub use repository::{ - AsyncCommitBatch, AsyncGetStream, AsyncReadModelSessionStore, AsyncReadModelStore, + AsyncCommitBatch, AsyncGetStream, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, AsyncRepository, AsyncSnapshotStore, AsyncSnapshotWrite, AsyncStreamWrite, AsyncTransactionalCommit, Commit, CommitBatch, Get, GetMany, GetOne, Gettable, PreparedEventAppend, Repository, RepositoryError, SnapshotWrite, StreamIdentity, @@ -120,12 +120,11 @@ pub use read_model::{ ReadModelLoadRequest, ReadModelMigrationArtifact, ReadModelMutation, ReadModelQueryCapabilities, ReadModelSchema, ReadModelSchemaAdapter, ReadModelSchemaAdapterCapabilities, ReadModelSchemaBootstrap, ReadModelSchemaIssue, - ReadModelSchemaIssueKind, ReadModelSchemaRegistry, ReadModelSchemaVerification, - ReadModelSession, ReadModelSessionStore, ReadModelSessionUnitOfWork, ReadModelStore, - ReadModelUnitOfWorkExt, ReadModelWritePlan, ReadModelsExt, RelationalReadModel, - RelationalReadModelIncludes, RelationalReadModelQueryStore, RelationshipDef, RelationshipKind, - RowKey, RowMutation, RowPatch, RowValue, RowValues, RowWriteMode, Versioned, - DEFAULT_READ_MODEL_VERSION_COLUMN, + ReadModelSchemaIssueKind, ReadModelSchemaRegistry, ReadModelSchemaVerification, ReadModelStore, + ReadModelWorkspace, ReadModelWorkspaceExt, ReadModelWritePlan, ReadModelWritePlanBuilder, + ReadModelWritePlanStore, ReadModelsExt, RelationalReadModel, RelationalReadModelIncludes, + RelationalReadModelQueryStore, RelationshipDef, RelationshipKind, RowKey, RowMutation, + RowPatch, RowValue, RowValues, RowWriteMode, Versioned, DEFAULT_READ_MODEL_VERSION_COLUMN, }; // Neutral table/row primitives shared by read models and operational tables. @@ -141,7 +140,7 @@ pub use table::{ // CommitBuilder: transactional batches of read models, outbox, and aggregates pub use commit_builder::{ - CommitBuilder, CommitBuilderExt, ReadModelSessionCommitExt, StagedCommitBuilder, + CommitBuilder, CommitBuilderExt, ReadModelWritePlanCommitExt, StagedCommitBuilder, }; // Snapshot: state snapshot payloads and rebuildable cache records for hydration diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index 2ad132d..50c4feb 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -1,49 +1,56 @@ //! Context passed to command handlers. //! -//! Carries the parsed input, session variables, and a reference to the -//! repository. Handlers access everything they need through the context. +//! Carries the parsed input, session variables, and a reference to the service +//! dependencies. Handlers access everything they need through the context. use serde::de::DeserializeOwned; use serde_json::Value; +use super::dependencies::{HasReadModelStore, HasRepo}; use super::error::HandlerError; use super::session::Session; /// The context passed to every command handler. /// -/// Generic over `R` (the repository type) so handlers can access -/// whatever repository implementation the service is configured with. +/// Generic over `D` (the service dependency type) so handlers can access the +/// repository, read-model store, or custom dependencies the service is +/// configured with. /// /// ## Example /// /// ```ignore -/// pub fn handle( -/// ctx: &Context, +/// pub fn handle( +/// ctx: &Context, /// ) -> Result { /// let user_id = ctx.user_id()?; /// let input = ctx.input::()?; -/// // ... +/// let repo = ctx.repo(); /// } /// ``` -pub struct Context<'a, R> { +pub struct Context<'a, D> { /// The command name being handled. command_name: String, /// Raw JSON input from the request. input: Value, /// Session variables (user ID, role, etc.). session: Session, - /// Reference to the repository. - repo: &'a R, + /// Reference to the service dependencies. + dependencies: &'a D, } -impl<'a, R> Context<'a, R> { +impl<'a, D> Context<'a, D> { /// Create a new context. - pub(crate) fn new(command_name: String, input: Value, session: Session, repo: &'a R) -> Self { + pub(crate) fn new( + command_name: String, + input: Value, + session: Session, + dependencies: &'a D, + ) -> Self { Self { command_name, input, session, - repo, + dependencies, } } @@ -80,9 +87,25 @@ impl<'a, R> Context<'a, R> { self.session.role() } - /// Get a reference to the repository. - pub fn repo(&self) -> &R { - self.repo + /// Get a reference to the service dependencies. + pub fn dependencies(&self) -> &D { + self.dependencies + } + + /// Get the aggregate repository for handlers whose dependencies expose one. + pub fn repo(&self) -> &D::Repo + where + D: HasRepo, + { + self.dependencies.repo() + } + + /// Get the read-model store for handlers whose dependencies expose one. + pub fn read_model_store(&self) -> &D::ReadModelStore + where + D: HasReadModelStore, + { + self.dependencies.read_model_store() } /// Check if the raw input contains a field. diff --git a/src/microsvc/dependencies.rs b/src/microsvc/dependencies.rs new file mode 100644 index 0000000..383f618 --- /dev/null +++ b/src/microsvc/dependencies.rs @@ -0,0 +1,131 @@ +//! Typed dependency wrappers for microsvc handlers. + +use crate::aggregate::AggregateRepository; +use crate::read_model::{ReadModelWritePlanStore, RelationalReadModelQueryStore}; +use crate::repository::Repository; +use crate::snapshot::SnapshotAggregateRepository; + +/// Dependency capability for services that expose an aggregate repository. +pub trait HasRepo { + type Repo; + + fn repo(&self) -> &Self::Repo; +} + +/// Dependency capability for services that expose a read-model store. +pub trait HasReadModelStore { + type ReadModelStore; + + fn read_model_store(&self) -> &Self::ReadModelStore; +} + +impl HasRepo for R +where + R: Repository, +{ + type Repo = R; + + fn repo(&self) -> &Self::Repo { + self + } +} + +impl HasRepo for AggregateRepository { + type Repo = Self; + + fn repo(&self) -> &Self::Repo { + self + } +} + +impl HasRepo for SnapshotAggregateRepository { + type Repo = Self; + + fn repo(&self) -> &Self::Repo { + self + } +} + +impl HasReadModelStore for S +where + S: ReadModelWritePlanStore + RelationalReadModelQueryStore, +{ + type ReadModelStore = S; + + fn read_model_store(&self) -> &Self::ReadModelStore { + self + } +} + +/// Dependencies for a service that only needs an aggregate repository. +#[derive(Clone)] +pub struct RepoDependencies { + repo: R, +} + +impl RepoDependencies { + pub fn new(repo: R) -> Self { + Self { repo } + } +} + +impl HasRepo for RepoDependencies { + type Repo = R; + + fn repo(&self) -> &Self::Repo { + &self.repo + } +} + +/// Dependencies for a service that only needs a read-model store. +#[derive(Clone)] +pub struct ReadModelStoreDependencies { + read_model_store: S, +} + +impl ReadModelStoreDependencies { + pub fn new(read_model_store: S) -> Self { + Self { read_model_store } + } +} + +impl HasReadModelStore for ReadModelStoreDependencies { + type ReadModelStore = S; + + fn read_model_store(&self) -> &Self::ReadModelStore { + &self.read_model_store + } +} + +/// Dependencies for a service that needs both an aggregate repository and a +/// read-model store. +#[derive(Clone)] +pub struct RepoReadModelDependencies { + repo: R, + read_model_store: S, +} + +impl RepoReadModelDependencies { + pub fn new(repo: R, read_model_store: S) -> Self { + Self { + repo, + read_model_store, + } + } +} + +impl HasRepo for RepoReadModelDependencies { + type Repo = R; + + fn repo(&self) -> &Self::Repo { + &self.repo + } +} + +impl HasReadModelStore for RepoReadModelDependencies { + type ReadModelStore = S; + + fn read_model_store(&self) -> &Self::ReadModelStore { + &self.read_model_store + } +} diff --git a/src/microsvc/grpc.rs b/src/microsvc/grpc.rs index 7b0b92b..6ea2c6c 100644 --- a/src/microsvc/grpc.rs +++ b/src/microsvc/grpc.rs @@ -15,7 +15,7 @@ //! use sourced_rust::{microsvc, HashMapRepository}; //! //! let service = Arc::new( -//! microsvc::Service::new(HashMapRepository::new()) +//! microsvc::Service::with_repo(HashMapRepository::new()) //! .command("counter.create", |ctx| { /* ... */ }) //! ); //! @@ -127,20 +127,20 @@ impl From for GrpcServeError { // Handler implementation // --------------------------------------------------------------------------- -/// gRPC handler that wraps a `Service` and implements the generated +/// gRPC handler that wraps a `Service` and implements the generated /// `CommandService` trait. Mirrors the HTTP transport pattern. -pub struct GrpcHandler { - service: Arc>, +pub struct GrpcHandler { + service: Arc>, } -impl GrpcHandler { - pub fn new(service: Arc>) -> Self { +impl GrpcHandler { + pub fn new(service: Arc>) -> Self { Self { service } } } #[tonic::async_trait] -impl CommandService for GrpcHandler { +impl CommandService for GrpcHandler { async fn dispatch( &self, request: Request, @@ -224,10 +224,10 @@ fn build_session( // Convenience constructors // --------------------------------------------------------------------------- -/// Create a `CommandServiceServer` from a shared `Service`. -pub fn grpc_server( - service: Arc>, -) -> CommandServiceServer> { +/// Create a `CommandServiceServer` from a shared `Service`. +pub fn grpc_server( + service: Arc>, +) -> CommandServiceServer> { CommandServiceServer::new(GrpcHandler::new(service)) } @@ -236,8 +236,8 @@ pub fn grpc_server( /// Returns [`GrpcServeError::InvalidAddress`] when `addr` is not a valid socket /// address, and [`GrpcServeError::Transport`] for errors returned by tonic while /// serving. -pub async fn serve_grpc( - service: Arc>, +pub async fn serve_grpc( + service: Arc>, addr: &str, ) -> Result<(), GrpcServeError> { let addr: SocketAddr = addr diff --git a/src/microsvc/http.rs b/src/microsvc/http.rs index 399f946..5c4e2f3 100644 --- a/src/microsvc/http.rs +++ b/src/microsvc/http.rs @@ -14,7 +14,7 @@ //! use sourced_rust::{microsvc, HashMapRepository}; //! //! let service = Arc::new( -//! microsvc::Service::new(HashMapRepository::new()) +//! microsvc::Service::with_repo(HashMapRepository::new()) //! .command("counter.create", |ctx| { /* ... */ }) //! ); //! @@ -39,7 +39,7 @@ use super::service::Service; use super::session::Session; /// Build an axum `Router` that dispatches commands via the given service. -pub fn router(service: Arc>) -> Router { +pub fn router(service: Arc>) -> Router { Router::new() .route("/health", get(health_handler)) .route("/:command", axum::routing::post(command_handler)) @@ -47,8 +47,8 @@ pub fn router(service: Arc>) -> Router { } /// Serve the service over HTTP at the given address (e.g. `"0.0.0.0:3000"`). -pub async fn serve( - service: Arc>, +pub async fn serve( + service: Arc>, addr: &str, ) -> Result<(), std::io::Error> { let app = router(service); @@ -57,16 +57,16 @@ pub async fn serve( } /// `GET /health` — returns `{ "ok": true, "commands": [...] }`. -async fn health_handler( - State(service): State>>, +async fn health_handler( + State(service): State>>, ) -> impl IntoResponse { let commands: Vec<&str> = service.commands(); Json(json!({ "ok": true, "commands": commands })) } /// `POST /:command` — dispatch a command with JSON body and headers as session. -async fn command_handler( - State(service): State>>, +async fn command_handler( + State(service): State>>, Path(command): Path, headers: HeaderMap, Json(input): Json, diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 5a8ac9a..489e763 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -1,8 +1,8 @@ //! microsvc — Convention-based microservice command handler framework. //! //! Build microservices by registering command handlers on a `Service`. -//! Each handler receives a `Context` with access to the input payload, -//! session variables, and the repository. +//! Each handler receives a `Context` with access to the input payload, +//! session variables, and the service dependencies. //! //! ## Quick Start //! @@ -12,7 +12,7 @@ //! use serde_json::json; //! //! let service = Arc::new( -//! microsvc::Service::new(HashMapRepository::new()) +//! microsvc::Service::with_repo(HashMapRepository::new()) //! .command("order.create", |ctx| { //! let input = ctx.input::()?; //! Ok(json!({ "id": input.id })) @@ -35,13 +35,15 @@ //! //! pub const COMMAND: &str = "order.create"; //! -//! pub fn guard(ctx: µsvc::Context) -> bool { +//! pub fn guard(ctx: µsvc::Context) -> bool { //! ctx.has_fields(&["id", "product_id"]) //! } //! -//! pub fn handle( -//! ctx: µsvc::Context, -//! ) -> Result { +//! pub fn handle(ctx: µsvc::Context) -> Result +//! where +//! D: microsvc::HasRepo, +//! D::Repo: CommitAggregate, +//! { //! let input = ctx.input::()?; //! let mut order = Order::default(); //! order.create(input.id); @@ -51,11 +53,16 @@ //! ``` mod context; +mod dependencies; mod error; mod service; mod session; pub use context::Context; +pub use dependencies::{ + HasReadModelStore, HasRepo, ReadModelStoreDependencies, RepoDependencies, + RepoReadModelDependencies, +}; pub use error::HandlerError; pub use service::{CommandRequest, CommandResponse, Service}; pub use session::Session; @@ -86,7 +93,7 @@ pub use grpc::{grpc_server, serve_grpc, GrpcServeError}; /// # Example /// ```ignore /// let service = sourced_rust::register_handlers!( -/// microsvc::Service::new(HashMapRepository::new()), +/// microsvc::Service::with_repo(HashMapRepository::new()), /// handlers::counter_create, /// handlers::counter_increment, /// ); diff --git a/src/microsvc/service.rs b/src/microsvc/service.rs index d77d8d7..ba71bcd 100644 --- a/src/microsvc/service.rs +++ b/src/microsvc/service.rs @@ -1,7 +1,7 @@ //! Service — command handler registry and dispatch for microsvc. //! -//! `Service` holds a repository and a set of named command handlers. -//! Each handler receives a `Context` and returns `Result`. +//! `Service` holds service dependencies and a set of named command handlers. +//! Each handler receives a `Context` and returns `Result`. //! //! ## Example //! @@ -24,42 +24,60 @@ use std::{error::Error, fmt}; use serde_json::Value; use super::context::Context; +use super::dependencies::{HasReadModelStore, HasRepo, RepoReadModelDependencies}; use super::error::HandlerError; use super::session::Session; -type GuardFn = dyn Fn(&Context) -> bool + Send + Sync; -type HandlerFn = dyn Fn(&Context) -> Result + Send + Sync; +type GuardFn = dyn Fn(&Context) -> bool + Send + Sync; +type HandlerFn = dyn Fn(&Context) -> Result + Send + Sync; /// A registered command handler with optional guard. -struct CommandHandler { - guard: Option>>, - handle: Box>, +struct CommandHandler { + guard: Option>>, + handle: Box>, } /// A microservice that routes commands to handler functions. /// -/// Generic over `R`, the repository type. Handlers receive a `Context` -/// and can access the repo via `ctx.repo()`. -pub struct Service { - repo: R, - handlers: HashMap>, +/// Generic over `D`, the service dependency type. Prefer +/// [`Service::with_repo`], [`Service::with_read_model_store`], or +/// [`Service::with_repo_and_read_model_store`] for common dependency shapes. +pub struct Service { + dependencies: D, + handlers: HashMap>, } -impl Service { - /// Create a new service with the given repository. - pub fn new(repo: R) -> Self { +impl Service { + /// Create a new service with custom dependencies. + pub fn new(dependencies: D) -> Self { Self { - repo, + dependencies, handlers: HashMap::new(), } } + /// Create a service whose dependency type is an aggregate repository. + pub fn with_repo(repo: D) -> Self + where + D: HasRepo, + { + Self::new(repo) + } + + /// Create a service whose dependency type is a read-model store. + pub fn with_read_model_store(read_model_store: D) -> Self + where + D: HasReadModelStore, + { + Self::new(read_model_store) + } + /// Register a command handler. /// /// Uses builder pattern — returns `self` for chaining. pub fn command(mut self, name: &str, handler: F) -> Self where - F: Fn(&Context) -> Result + Send + Sync + 'static, + F: Fn(&Context) -> Result + Send + Sync + 'static, { self.handlers.insert( name.to_string(), @@ -77,8 +95,8 @@ impl Service { /// the command is rejected with `HandlerError::GuardRejected`. pub fn command_guarded(mut self, name: &str, guard: G, handler: F) -> Self where - G: Fn(&Context) -> bool + Send + Sync + 'static, - F: Fn(&Context) -> Result + Send + Sync + 'static, + G: Fn(&Context) -> bool + Send + Sync + 'static, + F: Fn(&Context) -> Result + Send + Sync + 'static, { self.handlers.insert( name.to_string(), @@ -105,7 +123,7 @@ impl Service { .get(command) .ok_or_else(|| HandlerError::UnknownCommand(command.to_string()))?; - let ctx = Context::new(command.to_string(), input, session, &self.repo); + let ctx = Context::new(command.to_string(), input, session, &self.dependencies); // Run guard if present if let Some(guard) = &handler.guard { @@ -150,9 +168,33 @@ impl Service { self.handlers.keys().map(|s| s.as_str()).collect() } - /// Get a reference to the repository. - pub fn repo(&self) -> &R { - &self.repo + /// Get a reference to the service dependencies. + pub fn dependencies(&self) -> &D { + &self.dependencies + } + + /// Get the aggregate repository for services whose dependencies expose one. + pub fn repo(&self) -> &D::Repo + where + D: HasRepo, + { + self.dependencies.repo() + } + + /// Get the read-model store for services whose dependencies expose one. + pub fn read_model_store(&self) -> &D::ReadModelStore + where + D: HasReadModelStore, + { + self.dependencies.read_model_store() + } +} + +impl Service> { + /// Create a service whose handlers need both an aggregate repository and a + /// read-model store. + pub fn with_repo_and_read_model_store(repo: R, read_model_store: S) -> Self { + Self::new(RepoReadModelDependencies::new(repo, read_model_store)) } } @@ -259,14 +301,14 @@ impl Drop for TransportHandle { /// let stats = handle.stop()?; /// ``` #[cfg(feature = "bus")] -pub fn listen( - service: std::sync::Arc>, +pub fn listen( + service: std::sync::Arc>, queue_name: &str, listener: L, poll_interval: std::time::Duration, ) -> TransportHandle where - R: Send + Sync + 'static, + D: Send + Sync + 'static, L: crate::bus::Listener + 'static, { let queue_name = queue_name.to_string(); @@ -332,13 +374,13 @@ where /// let stats = handle.stop()?; /// ``` #[cfg(feature = "bus")] -pub fn subscribe( - service: std::sync::Arc>, +pub fn subscribe( + service: std::sync::Arc>, subscriber: S, poll_interval: std::time::Duration, ) -> TransportHandle where - R: Send + Sync + 'static, + D: Send + Sync + 'static, S: crate::bus::Subscriber + 'static, { let (stop_tx, stop_rx) = std::sync::mpsc::channel(); diff --git a/src/read_model/in_memory.rs b/src/read_model/in_memory.rs index 615e60c..9c556a7 100644 --- a/src/read_model/in_memory.rs +++ b/src/read_model/in_memory.rs @@ -16,12 +16,12 @@ use super::{ ExpectedVersion, PatchMode, ProcessedMessageMark, ReadModel, ReadModelAdapterCapabilities, ReadModelCommitOutcome, ReadModelError, ReadModelIncludeRows, ReadModelLoadGraph, ReadModelLoadRequest, ReadModelMutation, ReadModelQueryCapabilities, ReadModelSchema, - ReadModelSchemaRegistry, ReadModelSessionStore, ReadModelStore, ReadModelWritePlan, + ReadModelSchemaRegistry, ReadModelStore, ReadModelWritePlan, ReadModelWritePlanStore, RelationalReadModel, RelationalReadModelQueryStore, RelationshipDef, RelationshipKind, RowKey, RowValue, RowValues, RowWriteMode, Versioned, }; use crate::repository::{ - AsyncReadModelSessionStore, AsyncReadModelStore, AsyncRelationalReadModelQueryStore, + AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, }; /// Internal stored representation of a read model. @@ -420,7 +420,7 @@ impl InMemoryReadModelStore { } } -impl ReadModelSessionStore for InMemoryReadModelStore { +impl ReadModelWritePlanStore for InMemoryReadModelStore { fn read_model_capabilities(&self) -> ReadModelAdapterCapabilities { relational_capabilities() } @@ -470,16 +470,16 @@ impl ReadModelSessionStore for InMemoryReadModelStore { } } -impl AsyncReadModelSessionStore for InMemoryReadModelStore { +impl AsyncReadModelWritePlanStore for InMemoryReadModelStore { fn read_model_capabilities_async(&self) -> ReadModelAdapterCapabilities { - ReadModelSessionStore::read_model_capabilities(self) + ReadModelWritePlanStore::read_model_capabilities(self) } fn commit_write_plan_async( &self, plan: ReadModelWritePlan, ) -> impl Future> + Send + '_ { - async move { ReadModelSessionStore::commit_write_plan(self, plan) } + async move { ReadModelWritePlanStore::commit_write_plan(self, plan) } } fn is_processed_async<'a>( @@ -487,7 +487,7 @@ impl AsyncReadModelSessionStore for InMemoryReadModelStore { consumer_name: &'a str, message_id: &'a str, ) -> impl Future> + Send + 'a { - async move { ReadModelSessionStore::is_processed(self, consumer_name, message_id) } + async move { ReadModelWritePlanStore::is_processed(self, consumer_name, message_id) } } } diff --git a/src/read_model/metadata.rs b/src/read_model/metadata.rs index d0da462..b4ef724 100644 --- a/src/read_model/metadata.rs +++ b/src/read_model/metadata.rs @@ -141,7 +141,7 @@ impl IndexDef { } } -/// Relationship category for later session/write-plan lowering. +/// Relationship category for later workspace/write-plan lowering. #[derive(Clone, Debug, PartialEq, Eq)] pub enum RelationshipKind { HasMany, diff --git a/src/read_model/mod.rs b/src/read_model/mod.rs index e0eb9b2..f1dc427 100644 --- a/src/read_model/mod.rs +++ b/src/read_model/mod.rs @@ -4,7 +4,7 @@ //! //! - document rows through [`ReadModelStore`] and collection/id JSON payloads; //! - normalized relational rows through [`RelationalReadModel`], -//! [`ReadModelSession`], [`ReadModelWritePlan`], and schema metadata. +//! [`ReadModelWritePlanBuilder`], [`ReadModelWritePlan`], and schema metadata. //! //! Document views can use typed key/value CRUD: //! @@ -27,19 +27,19 @@ //! Relational models stage explicit row mutations: //! //! ```ignore -//! use sourced_rust::{ReadModelSession, ReadModelSessionCommitExt}; +//! use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt}; //! -//! let mut read_models = ReadModelSession::new(); -//! read_models.save(&player)?; -//! read_models.save_related(&player, "weapons", &weapon)?; +//! let mut read_models = ReadModelWritePlanBuilder::new(); +//! read_models.upsert(&player)?; +//! read_models.upsert_related(&player, "weapons", &weapon)?; //! repo.read_models(read_models).commit(&mut aggregate)?; //! ``` //! -//! Distributed projectors can commit a session directly against a read-model +//! Distributed projectors can commit a write plan directly against a read-model //! adapter and mark messages processed in the same adapter transaction: //! //! ```ignore -//! let mut read_models = ReadModelSession::new(); +//! let mut read_models = ReadModelWritePlanBuilder::new(); //! read_models.document(&view)?.mark_processed("projection", event_id); //! let outcome = read_models.commit(&read_store)?; //! ``` @@ -143,8 +143,8 @@ pub use session::{ DeleteRowMutation, DocumentMutation, ExpectedVersion, PatchMode, PatchRowMutation, ProcessedMessageMark, ReadModelAdapterCapabilities, ReadModelCommitOutcome, ReadModelIncludeRows, ReadModelLoadGraph, ReadModelLoadRequest, ReadModelMutation, - ReadModelQueryCapabilities, ReadModelSession, ReadModelSessionStore, - ReadModelSessionUnitOfWork, ReadModelUnitOfWorkExt, ReadModelWritePlan, - RelationalReadModelQueryStore, RowMutation, RowPatch, RowWriteMode, + ReadModelQueryCapabilities, ReadModelWorkspace, ReadModelWorkspaceExt, ReadModelWritePlan, + ReadModelWritePlanBuilder, ReadModelWritePlanStore, RelationalReadModelQueryStore, RowMutation, + RowPatch, RowWriteMode, }; pub use store::ReadModelStore; diff --git a/src/read_model/queued.rs b/src/read_model/queued.rs index 8f5c8bf..d45d6ac 100644 --- a/src/read_model/queued.rs +++ b/src/read_model/queued.rs @@ -14,7 +14,7 @@ use crate::repository::{Commit, CommitBatch, RepositoryError, TransactionalCommi use super::session::document_key; use super::{ ReadModel, ReadModelAdapterCapabilities, ReadModelCommitOutcome, ReadModelError, - ReadModelSessionStore, ReadModelStore, ReadModelWritePlan, Versioned, + ReadModelStore, ReadModelWritePlan, ReadModelWritePlanStore, Versioned, }; /// A `ReadModelStore` wrapper that provides per-instance locking. @@ -259,7 +259,7 @@ impl TransactionalCommit for QueuedReadM } } -impl ReadModelSessionStore +impl ReadModelWritePlanStore for QueuedReadModelStore { fn read_model_capabilities(&self) -> ReadModelAdapterCapabilities { diff --git a/src/read_model/session.rs b/src/read_model/session.rs index 04fbb03..5dd2658 100644 --- a/src/read_model/session.rs +++ b/src/read_model/session.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use serde::Serialize; -use crate::repository::AsyncReadModelSessionStore; +use crate::repository::AsyncReadModelWritePlanStore; use super::{ ReadModel, ReadModelError, ReadModelSchema, RelationalReadModel, RelationalReadModelIncludes, @@ -93,8 +93,8 @@ impl ReadModelCommitOutcome { } } -/// Adapter contract for committing read-model sessions without an aggregate repository. -pub trait ReadModelSessionStore: Send + Sync { +/// Adapter contract for committing read-model write plans without an aggregate repository. +pub trait ReadModelWritePlanStore: Send + Sync { fn read_model_capabilities(&self) -> ReadModelAdapterCapabilities; fn commit_write_plan( @@ -417,16 +417,16 @@ struct StagedMutation { mutation: ReadModelMutation, } -/// Short-lived unit of work that stages read-model mutations before commit. +/// Detached builder for read-model write plans that are applied at commit. #[derive(Clone, Debug, Default)] -pub struct ReadModelSession { +pub struct ReadModelWritePlanBuilder { mutations: Vec, processed_messages: Vec, expected_versions: BTreeMap, next_sequence: u64, } -impl ReadModelSession { +impl ReadModelWritePlanBuilder { pub fn new() -> Self { Self::default() } @@ -514,18 +514,11 @@ impl ReadModelSession { ) } - pub fn save(&mut self, model: &M) -> Result<&mut Self, ReadModelError> - where - M: RelationalReadModel, - { - self.stage_full_row(model, RowWriteMode::Upsert, None) - } - pub fn upsert(&mut self, model: &M) -> Result<&mut Self, ReadModelError> where M: RelationalReadModel, { - self.save(model) + self.stage_full_row(model, RowWriteMode::Upsert, None) } pub fn insert_related( @@ -541,7 +534,7 @@ impl ReadModelSession { self.stage_related_row(parent, relationship_field, child, RowWriteMode::Insert) } - pub fn save_related( + pub fn upsert_related( &mut self, parent: &P, relationship_field: &str, @@ -640,14 +633,14 @@ impl ReadModelSession { pub fn commit(self, store: &S) -> Result where - S: ReadModelSessionStore + ?Sized, + S: ReadModelWritePlanStore + ?Sized, { store.commit_write_plan(self.into_write_plan()?) } pub async fn commit_async(self, store: &S) -> Result where - S: AsyncReadModelSessionStore + ?Sized, + S: AsyncReadModelWritePlanStore + ?Sized, { store.commit_write_plan_async(self.into_write_plan()?).await } @@ -797,21 +790,21 @@ struct TrackedModelBaseline { includes: BTreeMap, } -/// Store-bound read-model unit of work for load, mutate, save-changes, commit workflows. -pub struct ReadModelSessionUnitOfWork<'a, S> { +/// Store-bound read-model workspace for load, mutate, sync, commit workflows. +pub struct ReadModelWorkspace<'a, S> { store: &'a S, - writes: ReadModelSession, + writes: ReadModelWritePlanBuilder, baselines: Vec, } -impl<'a, S> ReadModelSessionUnitOfWork<'a, S> +impl<'a, S> ReadModelWorkspace<'a, S> where - S: ReadModelSessionStore + RelationalReadModelQueryStore, + S: ReadModelWritePlanStore + RelationalReadModelQueryStore, { pub fn new(store: &'a S) -> Self { Self { store, - writes: ReadModelSession::new(), + writes: ReadModelWritePlanBuilder::new(), baselines: Vec::new(), } } @@ -832,7 +825,7 @@ where } } - pub fn save_changes(&mut self, model: M) -> Result<&mut Self, ReadModelError> + pub fn sync(&mut self, model: M) -> Result<&mut Self, ReadModelError> where M: RelationalReadModel + RelationalReadModelIncludes, { @@ -853,7 +846,7 @@ where .cloned() .ok_or_else(|| { ReadModelError::Metadata(format!( - "read model `{}` has no tracked baseline for save_changes", + "read model `{}` has no tracked baseline for sync", schema.model_name )) })?; @@ -875,11 +868,11 @@ where Ok(self) } - pub fn save(&mut self, model: &M) -> Result<&mut Self, ReadModelError> + pub fn upsert(&mut self, model: &M) -> Result<&mut Self, ReadModelError> where M: RelationalReadModel, { - self.writes.save(model)?; + self.writes.upsert(model)?; Ok(self) } @@ -891,6 +884,36 @@ where Ok(self) } + pub fn upsert_related( + &mut self, + parent: &P, + relationship_field: &str, + child: &C, + ) -> Result<&mut Self, ReadModelError> + where + P: RelationalReadModel, + C: RelationalReadModel, + { + self.writes + .upsert_related(parent, relationship_field, child)?; + Ok(self) + } + + pub fn insert_related( + &mut self, + parent: &P, + relationship_field: &str, + child: &C, + ) -> Result<&mut Self, ReadModelError> + where + P: RelationalReadModel, + C: RelationalReadModel, + { + self.writes + .insert_related(parent, relationship_field, child)?; + Ok(self) + } + pub fn patch(&mut self, key: RowKey, patch: RowPatch) -> Result<&mut Self, ReadModelError> where M: RelationalReadModel, @@ -899,6 +922,18 @@ where Ok(self) } + pub fn upsert_patch( + &mut self, + key: RowKey, + patch: RowPatch, + ) -> Result<&mut Self, ReadModelError> + where + M: RelationalReadModel, + { + self.writes.upsert_patch::(key, patch)?; + Ok(self) + } + pub fn delete(&mut self, key: RowKey) -> Result<&mut Self, ReadModelError> where M: RelationalReadModel, @@ -907,6 +942,22 @@ where Ok(self) } + pub fn delete_model(&mut self, model: &M) -> Result<&mut Self, ReadModelError> + where + M: RelationalReadModel, + { + self.writes.delete_model(model)?; + Ok(self) + } + + pub fn document(&mut self, model: &M) -> Result<&mut Self, ReadModelError> + where + M: ReadModel, + { + self.writes.document(model)?; + Ok(self) + } + pub fn mark_processed( &mut self, consumer_name: impl Into, @@ -989,7 +1040,7 @@ where && current_rows.len() > 1 { return Err(ReadModelError::Metadata(format!( - "belongs_to relationship `{}` can save at most one related row", + "belongs_to relationship `{}` can sync at most one related row", baseline.relationship.field_name ))); } @@ -1029,7 +1080,7 @@ where } } - // `save_changes` makes storage match the struct: an owned `has_many` child + // `sync` makes storage match the struct: an owned `has_many` child // dropped from the loaded collection is deleted. `belongs_to` clears never // delete the target, which is the owner that other rows may reference. if matches!(baseline.relationship.kind, RelationshipKind::HasMany) { @@ -1108,19 +1159,19 @@ where } /// Builder for one explicit primary-key read-model load. -pub struct ReadModelLoadBuilder<'session, 'store, S, M> +pub struct ReadModelLoadBuilder<'workspace, 'store, S, M> where - S: ReadModelSessionStore + RelationalReadModelQueryStore, + S: ReadModelWritePlanStore + RelationalReadModelQueryStore, { - unit: &'session mut ReadModelSessionUnitOfWork<'store, S>, + unit: &'workspace mut ReadModelWorkspace<'store, S>, key: RowKey, includes: Vec, _marker: PhantomData, } -impl<'session, 'store, S, M> ReadModelLoadBuilder<'session, 'store, S, M> +impl<'workspace, 'store, S, M> ReadModelLoadBuilder<'workspace, 'store, S, M> where - S: ReadModelSessionStore + RelationalReadModelQueryStore, + S: ReadModelWritePlanStore + RelationalReadModelQueryStore, M: RelationalReadModel + RelationalReadModelIncludes, { pub fn include(mut self, relationship: impl Into) -> Self { @@ -1157,16 +1208,16 @@ where } } -/// Extension trait that starts a friendly tracked read-model session from a store. -pub trait ReadModelUnitOfWorkExt: - ReadModelSessionStore + RelationalReadModelQueryStore + Sized +/// Extension trait that starts a tracked read-model workspace from a store. +pub trait ReadModelWorkspaceExt: + ReadModelWritePlanStore + RelationalReadModelQueryStore + Sized { - fn session(&self) -> ReadModelSessionUnitOfWork<'_, Self> { - ReadModelSessionUnitOfWork::new(self) + fn workspace(&self) -> ReadModelWorkspace<'_, Self> { + ReadModelWorkspace::new(self) } } -impl ReadModelUnitOfWorkExt for S where S: ReadModelSessionStore + RelationalReadModelQueryStore {} +impl ReadModelWorkspaceExt for S where S: ReadModelWritePlanStore + RelationalReadModelQueryStore {} fn diff_rows(before: &RowValues, after: &RowValues) -> RowPatch { let mut patch = RowPatch::new(); diff --git a/src/repository/async_repository.rs b/src/repository/async_repository.rs index 72c93d2..c788cc2 100644 --- a/src/repository/async_repository.rs +++ b/src/repository/async_repository.rs @@ -146,7 +146,7 @@ pub trait AsyncReadModelStore: Send + Sync { } /// Async adapter contract for committing read-model write plans. -pub trait AsyncReadModelSessionStore: Send + Sync { +pub trait AsyncReadModelWritePlanStore: Send + Sync { fn read_model_capabilities_async(&self) -> ReadModelAdapterCapabilities; fn commit_write_plan_async( diff --git a/src/repository/mod.rs b/src/repository/mod.rs index d4f29e8..d2099ed 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -6,7 +6,7 @@ mod identity; mod repository; pub use async_repository::{ - AsyncCommitBatch, AsyncGetStream, AsyncReadModelSessionStore, AsyncReadModelStore, + AsyncCommitBatch, AsyncGetStream, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, AsyncRepository, AsyncSnapshotStore, AsyncSnapshotWrite, AsyncStreamWrite, AsyncTransactionalCommit, PreparedEventAppend, }; diff --git a/src/sqlite_repo/mod.rs b/src/sqlite_repo/mod.rs index 178fe16..4e9a6e1 100644 --- a/src/sqlite_repo/mod.rs +++ b/src/sqlite_repo/mod.rs @@ -26,7 +26,7 @@ use crate::read_model::{ ReadModelError, ReadModelMutation, ReadModelWritePlan, Versioned, }; use crate::repository::{ - AsyncCommitBatch, AsyncGetStream, AsyncReadModelSessionStore, AsyncReadModelStore, + AsyncCommitBatch, AsyncGetStream, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncSnapshotStore, AsyncSnapshotWrite, AsyncTransactionalCommit, PreparedEventAppend, RepositoryError, StreamIdentity, }; @@ -504,7 +504,7 @@ impl SqliteRepository { } } -impl AsyncReadModelSessionStore for SqliteRepository { +impl AsyncReadModelWritePlanStore for SqliteRepository { fn read_model_capabilities_async(&self) -> ReadModelAdapterCapabilities { document_capabilities() } diff --git a/tests/async_repository/main.rs b/tests/async_repository/main.rs index c3be416..95ccacf 100644 --- a/tests/async_repository/main.rs +++ b/tests/async_repository/main.rs @@ -3,10 +3,10 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use sourced_rust::{ impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelSessionStore, AsyncReadModelStore, AsyncSnapshotStore, + AsyncOutboxStore, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, ClaimOutboxMessages, Entity, EventRecord, HashMapRepository, InMemorySnapshotStore, OutboxMessage, ProcessedMessageMark, ReadModel, - ReadModelSession, ReadModelWritePlan, RepositoryError, SnapshotRecord, Snapshottable, + ReadModelWritePlan, ReadModelWritePlanBuilder, RepositoryError, SnapshotRecord, Snapshottable, StreamIdentity, }; @@ -189,7 +189,7 @@ async fn read_model_session_can_commit_against_async_store() { id: "view-1".into(), value: 42, }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .document(&view) .unwrap() diff --git a/tests/distributed_read_model/checkout_saga_service/service.rs b/tests/distributed_read_model/checkout_saga_service/service.rs index d208bb4..337ce10 100644 --- a/tests/distributed_read_model/checkout_saga_service/service.rs +++ b/tests/distributed_read_model/checkout_saga_service/service.rs @@ -5,7 +5,7 @@ use sourced_rust::microsvc::Service; use super::{handlers, CheckoutRepo}; pub fn service(repo: CheckoutRepo) -> Arc> { - let service = sourced_rust::register_handlers!(Service::new(repo), handlers::start); + let service = sourced_rust::register_handlers!(Service::with_repo(repo), handlers::start); Arc::new(service.command_guarded( handlers::record_seat_reserved::EVENT, handlers::record_seat_reserved::guard, diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index a6cdd76..08ea276 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -39,12 +39,12 @@ use sourced_rust::bus::Subscribable; use sourced_rust::microsvc::{self, Service, Session}; use sourced_rust::{ AggregateBuilder, HashMapRepository, InMemoryQueue, InMemoryReadModelStore, OutboxWorkerThread, - Queueable, ReadModelSessionStore, + Queueable, ReadModelWritePlanStore, }; -fn dispatch(service: &Service, command: &str, input: C) +fn dispatch(service: &Service, command: &str, input: C) where - R: Send + Sync + 'static, + D: Send + Sync + 'static, C: Serialize, { service diff --git a/tests/distributed_read_model/projection_service/handlers/checkout.rs b/tests/distributed_read_model/projection_service/handlers/checkout.rs index 1a6665c..a017343 100644 --- a/tests/distributed_read_model/projection_service/handlers/checkout.rs +++ b/tests/distributed_read_model/projection_service/handlers/checkout.rs @@ -1,12 +1,12 @@ use serde_json::{json, Value}; use sourced_rust::microsvc::{Context, HandlerError}; -use sourced_rust::{InMemoryReadModelStore, ReadModelUnitOfWorkExt}; +use sourced_rust::ReadModelWorkspaceExt; use crate::checkout::{ checkout_event, CheckoutStarted, SeatReservationCompleted, CHECKOUT_SEAT_RESERVED, CHECKOUT_STARTED, RESERVING_SEAT_MESSAGE, SEAT_RESERVED_MESSAGE, }; -use crate::projection_service::CHECKOUT_SCREEN_CONSUMER; +use crate::projection_service::{ProjectionDependencies, CHECKOUT_SCREEN_CONSUMER}; use crate::read_models::{CheckoutStepView, CheckoutView}; pub const EVENTS: &[&str] = &[ @@ -14,11 +14,11 @@ pub const EVENTS: &[&str] = &[ checkout_event::SEAT_RESERVATION_COMPLETED, ]; -pub fn guard(ctx: &Context) -> bool { +pub fn guard(ctx: &Context) -> bool { ctx.has_fields(&["id", "event_type", "payload"]) } -pub fn handle(ctx: &Context) -> Result { +pub fn handle(ctx: &Context) -> Result { let event = super::event(ctx)?; match event.event_type.as_str() { @@ -37,11 +37,13 @@ pub fn handle(ctx: &Context) -> Result { let msg: SeatReservationCompleted = event.json_decode().map_err(|err| { @@ -62,11 +64,13 @@ pub fn handle(ctx: &Context) -> Result return Err(HandlerError::UnknownCommand(other.to_string())), } diff --git a/tests/distributed_read_model/projection_service/handlers/mod.rs b/tests/distributed_read_model/projection_service/handlers/mod.rs index 6e20dfe..49bef32 100644 --- a/tests/distributed_read_model/projection_service/handlers/mod.rs +++ b/tests/distributed_read_model/projection_service/handlers/mod.rs @@ -7,7 +7,9 @@ pub mod seat; use serde::{Deserialize, Serialize}; use sourced_rust::bus::Event; use sourced_rust::microsvc::{Context, HandlerError}; -use sourced_rust::{InMemoryReadModelStore, ReadModelError}; +use sourced_rust::ReadModelError; + +use crate::projection_service::ProjectionDependencies; #[derive(Debug, Deserialize, Serialize)] pub struct ProjectionMessage { @@ -39,7 +41,7 @@ impl From for Event { } } -pub fn event(ctx: &Context) -> Result { +pub fn event(ctx: &Context) -> Result { Ok(ctx.input::()?.into()) } diff --git a/tests/distributed_read_model/projection_service/handlers/seat.rs b/tests/distributed_read_model/projection_service/handlers/seat.rs index ebe156e..ce496ba 100644 --- a/tests/distributed_read_model/projection_service/handlers/seat.rs +++ b/tests/distributed_read_model/projection_service/handlers/seat.rs @@ -1,18 +1,18 @@ use serde_json::{json, Value}; use sourced_rust::microsvc::{Context, HandlerError}; -use sourced_rust::{InMemoryReadModelStore, ReadModelUnitOfWorkExt}; +use sourced_rust::ReadModelWorkspaceExt; use crate::checkout::{seat_event, SeatAdded, SeatReserved, SEAT_AVAILABLE, SEAT_RESERVED}; -use crate::projection_service::CHECKOUT_SCREEN_CONSUMER; +use crate::projection_service::{ProjectionDependencies, CHECKOUT_SCREEN_CONSUMER}; use crate::read_models::{CheckoutStepView, SeatView}; pub const EVENTS: &[&str] = &[seat_event::ADDED, seat_event::RESERVED]; -pub fn guard(ctx: &Context) -> bool { +pub fn guard(ctx: &Context) -> bool { ctx.has_fields(&["id", "event_type", "payload"]) } -pub fn handle(ctx: &Context) -> Result { +pub fn handle(ctx: &Context) -> Result { let event = super::event(ctx)?; match event.event_type.as_str() { @@ -27,10 +27,10 @@ pub fn handle(ctx: &Context) -> Result { let msg: SeatReserved = event @@ -48,11 +48,11 @@ pub fn handle(ctx: &Context) -> Result return Err(HandlerError::UnknownCommand(other.to_string())), } diff --git a/tests/distributed_read_model/projection_service/mod.rs b/tests/distributed_read_model/projection_service/mod.rs index c10b762..39ddec0 100644 --- a/tests/distributed_read_model/projection_service/mod.rs +++ b/tests/distributed_read_model/projection_service/mod.rs @@ -2,6 +2,6 @@ mod service; pub mod handlers; -pub use service::{projects, service, subscriber}; +pub use service::{projects, service, subscriber, ProjectionDependencies}; pub const CHECKOUT_SCREEN_CONSUMER: &str = "checkout-screen-projection"; diff --git a/tests/distributed_read_model/projection_service/service.rs b/tests/distributed_read_model/projection_service/service.rs index 6419161..2e5b8e2 100644 --- a/tests/distributed_read_model/projection_service/service.rs +++ b/tests/distributed_read_model/projection_service/service.rs @@ -7,8 +7,10 @@ use sourced_rust::InMemoryReadModelStore; use super::handlers::{self, ProjectionMessage}; -type ProjectionGuard = fn(&Context) -> bool; -type ProjectionHandler = fn(&Context) -> Result; +pub type ProjectionDependencies = InMemoryReadModelStore; + +type ProjectionGuard = fn(&Context) -> bool; +type ProjectionHandler = fn(&Context) -> Result; pub struct ProjectionSubscriber { inner: S, @@ -41,8 +43,8 @@ where } } -pub fn service(store: InMemoryReadModelStore) -> Arc> { - let service = Service::new(store); +pub fn service(store: InMemoryReadModelStore) -> Arc> { + let service = Service::with_read_model_store(store); let service = register_handler_events( service, handlers::checkout::EVENTS, @@ -71,11 +73,11 @@ pub fn projects(event_type: &str) -> bool { } fn register_handler_events( - mut service: Service, + mut service: Service, events: &[&str], guard: ProjectionGuard, handle: ProjectionHandler, -) -> Service { +) -> Service { for event in events { service = service.command_guarded(event, guard, handle); } diff --git a/tests/distributed_read_model/query_service/mod.rs b/tests/distributed_read_model/query_service/mod.rs index 3799ed4..5050d9e 100644 --- a/tests/distributed_read_model/query_service/mod.rs +++ b/tests/distributed_read_model/query_service/mod.rs @@ -2,7 +2,7 @@ //! projected relational tables through primary-key loads plus explicit //! relationship includes. -use sourced_rust::{InMemoryReadModelStore, ReadModelError, ReadModelUnitOfWorkExt}; +use sourced_rust::{InMemoryReadModelStore, ReadModelError, ReadModelWorkspaceExt}; use crate::read_models::{checkout_key, seat_key, CheckoutView, SeatView}; @@ -21,7 +21,7 @@ impl CheckoutQueryService { &self, checkout_id: &str, ) -> Result, ReadModelError> { - let mut session = self.store.session(); + let mut session = self.store.workspace(); Ok(session .load::(checkout_key(checkout_id)) .include("steps") @@ -31,7 +31,7 @@ impl CheckoutQueryService { } pub fn seat(&self, seat_id: &str) -> Result, ReadModelError> { - let mut session = self.store.session(); + let mut session = self.store.workspace(); Ok(session .load::(seat_key(seat_id)) .one()? diff --git a/tests/distributed_read_model/seat_inventory_service/service.rs b/tests/distributed_read_model/seat_inventory_service/service.rs index d1cd880..6d88f73 100644 --- a/tests/distributed_read_model/seat_inventory_service/service.rs +++ b/tests/distributed_read_model/seat_inventory_service/service.rs @@ -5,7 +5,7 @@ use sourced_rust::microsvc::Service; use super::{handlers, SeatRepo}; pub fn service(repo: SeatRepo) -> Arc> { - let service = sourced_rust::register_handlers!(Service::new(repo), handlers::add); + let service = sourced_rust::register_handlers!(Service::with_repo(repo), handlers::add); Arc::new(service.command_guarded( handlers::reserve_started_checkout_seat::EVENT, handlers::reserve_started_checkout_seat::guard, diff --git a/tests/distributed_read_model_board/board_service/service.rs b/tests/distributed_read_model_board/board_service/service.rs index f79b699..f5313a7 100644 --- a/tests/distributed_read_model_board/board_service/service.rs +++ b/tests/distributed_read_model_board/board_service/service.rs @@ -6,7 +6,7 @@ use super::{handlers, BoardRepo}; pub fn model_service(repo: BoardRepo) -> Arc> { Arc::new(sourced_rust::register_handlers!( - Service::new(repo), + Service::with_repo(repo), handlers::board_open, handlers::board_add_card, handlers::board_move_card, diff --git a/tests/distributed_read_model_board/main.rs b/tests/distributed_read_model_board/main.rs index c5d4c5b..bb72d12 100644 --- a/tests/distributed_read_model_board/main.rs +++ b/tests/distributed_read_model_board/main.rs @@ -3,7 +3,7 @@ //! //! - the **board service** owns the `Board` aggregate (cards are aggregate //! state) and its outbox; -//! - a **projection service** reconciles the relational rows via `save_changes` +//! - a **projection service** reconciles the relational rows via `sync` //! collection sync, so removing a card deletes its `cards` row, and each card //! carries a JSONB `payload` column; //! - a **query service** reads the board with cards (`has_many`) and a card with @@ -25,12 +25,12 @@ use serde::Serialize; use sourced_rust::microsvc::{Service, Session}; use sourced_rust::{ AggregateBuilder, HashMapRepository, InMemoryQueue, InMemoryReadModelStore, OutboxWorkerThread, - Queueable, ReadModelSessionStore, + Queueable, ReadModelWritePlanStore, }; -fn dispatch(service: &Service, command: &str, input: C) +fn dispatch(service: &Service, command: &str, input: C) where - R: Send + Sync + 'static, + D: Send + Sync + 'static, C: Serialize, { service @@ -182,7 +182,9 @@ fn board_service_feeds_a_normalized_card_read_model() { .expect("write-side board should exist"); assert_eq!(write_side.cards.len(), 1); - projection.stop(); + projection + .stop() + .expect("projection service should stop cleanly"); let stats = worker.stop().expect("worker should stop cleanly"); assert!(stats.messages_published >= 5); } diff --git a/tests/distributed_read_model_board/projections_service/handlers/board.rs b/tests/distributed_read_model_board/projections_service/handlers/board.rs index 4d53d21..c441a22 100644 --- a/tests/distributed_read_model_board/projections_service/handlers/board.rs +++ b/tests/distributed_read_model_board/projections_service/handlers/board.rs @@ -1,12 +1,15 @@ //! Projects board events into `boards` + `cards`. The board snapshot is the -//! desired state, so `save_changes` collection sync reflects added/moved/removed -//! cards as inserts/patches/deletes. A monotonic `source_version` guard ignores -//! stale snapshots under out-of-order delivery. +//! updated view, so workspace sync reflects added/moved/removed cards as +//! inserts/patches/deletes. A monotonic `source_version` guard ignores stale +//! snapshots under out-of-order delivery. +use serde_json::{json, Value}; use sourced_rust::bus::Event; -use sourced_rust::{InMemoryReadModelStore, ReadModelCommitOutcome, ReadModelUnitOfWorkExt}; +use sourced_rust::microsvc::{Context, HandlerError}; +use sourced_rust::ReadModelWorkspaceExt; use crate::board_service::BoardSnapshot; +use crate::projections_service::{read_model_error, ProjectionDependencies}; use crate::read_models::{board_key, BoardView, CardPayload, CardView}; pub const CONSUMER: &str = "board-detail-projection"; @@ -17,40 +20,45 @@ pub const EVENTS: &[&str] = &[ "board.card_removed", ]; -pub fn handle(store: &InMemoryReadModelStore, event: &Event) -> ReadModelCommitOutcome { - let snapshot: BoardSnapshot = event.decode().expect("board snapshot should decode"); - let version = event_version(event); - let desired = desired_board_view(&snapshot, version); +pub fn guard(ctx: &Context) -> bool { + ctx.has_fields(&["id", "event_type", "payload"]) +} + +pub fn handle(ctx: &Context) -> Result { + let event = super::event(ctx)?; + let snapshot: BoardSnapshot = event + .decode() + .map_err(|err| HandlerError::DecodeFailed(format!("board snapshot: {err}")))?; + let version = event_version(&event); + let updated_view = updated_board_view(&snapshot, version); - let mut session = store.session(); - let existing = session - .load::(board_key(&desired.board_id)) + let mut workspace = ctx.read_model_store().workspace(); + let existing = workspace + .load::(board_key(&updated_view.board_id)) .include("cards") .one() - .expect("board load should succeed"); + .map_err(read_model_error)?; match existing { Some(current) if current.data.source_version >= version => {} Some(_) => { - session - .save_changes(desired) - .expect("board save_changes should stage"); + workspace.sync(updated_view).map_err(read_model_error)?; } None => { - session - .save(&desired) - .expect("board root save should stage"); - for card in &desired.cards { - session.save(card).expect("board card save should stage"); + workspace.upsert(&updated_view).map_err(read_model_error)?; + for card in &updated_view.cards { + workspace.upsert(card).map_err(read_model_error)?; } } } - session.mark_processed(CONSUMER, &event.id); - session.commit().expect("board projection should commit") + workspace.mark_processed(CONSUMER, &event.id); + workspace.commit().map_err(read_model_error)?; + + Ok(json!({ "event_id": event.id })) } -fn desired_board_view(snapshot: &BoardSnapshot, version: i64) -> BoardView { +fn updated_board_view(snapshot: &BoardSnapshot, version: i64) -> BoardView { let cards = snapshot .cards .iter() diff --git a/tests/distributed_read_model_board/projections_service/handlers/mod.rs b/tests/distributed_read_model_board/projections_service/handlers/mod.rs index fcf6701..c00caba 100644 --- a/tests/distributed_read_model_board/projections_service/handlers/mod.rs +++ b/tests/distributed_read_model_board/projections_service/handlers/mod.rs @@ -3,19 +3,42 @@ pub mod board; +use serde::{Deserialize, Serialize}; use sourced_rust::bus::Event; -use sourced_rust::{InMemoryReadModelStore, ReadModelCommitOutcome}; +use sourced_rust::microsvc::{Context, HandlerError}; -/// Every event type any projection handler consumes. -pub fn event_types() -> Vec<&'static str> { - board::EVENTS.to_vec() +use crate::projections_service::ProjectionDependencies; + +#[derive(Debug, Deserialize, Serialize)] +pub struct ProjectionMessage { + pub id: String, + pub event_type: String, + pub payload: Vec, + pub metadata: Option>, +} + +impl From<&Event> for ProjectionMessage { + fn from(event: &Event) -> Self { + Self { + id: event.id.clone(), + event_type: event.event_type.clone(), + payload: event.payload.clone(), + metadata: event.metadata.clone(), + } + } } -/// Route one event to the handler that owns its rows. -pub fn project(store: &InMemoryReadModelStore, event: &Event) -> Option { - if board::EVENTS.contains(&event.event_type.as_str()) { - Some(board::handle(store, event)) - } else { - None +impl From for Event { + fn from(message: ProjectionMessage) -> Self { + Self { + id: message.id, + event_type: message.event_type, + payload: message.payload, + metadata: message.metadata, + } } } + +pub fn event(ctx: &Context) -> Result { + Ok(ctx.input::()?.into()) +} diff --git a/tests/distributed_read_model_board/projections_service/mod.rs b/tests/distributed_read_model_board/projections_service/mod.rs index 88b7cad..670f1c0 100644 --- a/tests/distributed_read_model_board/projections_service/mod.rs +++ b/tests/distributed_read_model_board/projections_service/mod.rs @@ -5,68 +5,95 @@ mod handlers; pub use handlers::board::CONSUMER as BOARD_CONSUMER; -use std::sync::mpsc::{self, TryRecvError}; +use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; -use sourced_rust::bus::Bus; -use sourced_rust::{InMemoryQueue, InMemoryReadModelStore, ReadModelUnitOfWorkExt}; +use serde_json::Value; +use sourced_rust::bus::{Event, PublishError, Subscribable, Subscriber}; +use sourced_rust::microsvc::{self, Context, HandlerError, Service}; +use sourced_rust::{InMemoryQueue, InMemoryReadModelStore, ReadModelError, ReadModelWorkspaceExt}; use crate::read_models::{board_key, BoardView}; -pub struct ProjectionServiceHandle { - stop_tx: mpsc::Sender<()>, - handle: thread::JoinHandle<()>, +pub type ProjectionDependencies = InMemoryReadModelStore; + +type ProjectionGuard = fn(&Context) -> bool; +type ProjectionHandler = fn(&Context) -> Result; + +pub struct ProjectionSubscriber { + inner: S, } -impl ProjectionServiceHandle { - pub fn stop(self) { - let _ = self.stop_tx.send(()); - self.handle - .join() - .expect("projection service should stop cleanly"); +impl Subscriber for ProjectionSubscriber +where + S: Subscriber, +{ + fn poll(&self, timeout_ms: u64) -> Result, PublishError> { + self.inner.poll(timeout_ms).and_then(|event| { + event + .map(|event| { + let payload = serde_json::to_vec(&handlers::ProjectionMessage::from(&event)) + .map_err(|err| PublishError::SerializationFailed(err.to_string()))?; + let mut wrapped = Event::new(event.id, event.event_type, payload); + wrapped.metadata = event.metadata; + Ok(wrapped) + }) + .transpose() + }) + } + + fn ack(&self, event_id: &str) -> Result<(), PublishError> { + self.inner.ack(event_id) + } + + fn nack(&self, event_id: &str, reason: &str) -> Result<(), PublishError> { + self.inner.nack(event_id, reason) } } pub fn start_board_projection_service( queue: InMemoryQueue, store: InMemoryReadModelStore, -) -> ProjectionServiceHandle { - let (stop_tx, stop_rx) = mpsc::channel(); - let (ready_tx, ready_rx) = mpsc::channel(); - - let handle = thread::spawn(move || { - let bus = Bus::from_queue(queue); - let event_types = handlers::event_types(); - let events = bus.subscribe(&event_types); - ready_tx - .send(()) - .expect("projection service should signal readiness"); - - loop { - match stop_rx.try_recv() { - Ok(()) | Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => {} - } +) -> microsvc::TransportHandle { + microsvc::subscribe( + service(store), + subscriber(queue.new_subscriber()), + Duration::from_millis(10), + ) +} - match events.recv(10) { - Ok(Some(event)) => { - handlers::project(&store, &event); - events - .ack(&event.id) - .expect("projection service should ack projected events"); - } - Ok(None) => {} - Err(err) => panic!("projection service failed to receive event: {err}"), - } - } - }); +pub fn service(store: InMemoryReadModelStore) -> Arc> { + let service = register_handler_events( + Service::with_read_model_store(store), + handlers::board::EVENTS, + handlers::board::guard, + handlers::board::handle, + ); + Arc::new(service) +} - ready_rx - .recv_timeout(Duration::from_secs(3)) - .expect("projection service should subscribe before accepting writes"); +pub fn subscriber(subscriber: S) -> ProjectionSubscriber +where + S: Subscriber, +{ + ProjectionSubscriber { inner: subscriber } +} + +fn register_handler_events( + mut service: Service, + events: &[&str], + guard: ProjectionGuard, + handle: ProjectionHandler, +) -> Service { + for event in events { + service = service.command_guarded(event, guard, handle); + } + service +} - ProjectionServiceHandle { stop_tx, handle } +fn read_model_error(err: ReadModelError) -> HandlerError { + HandlerError::Repository(err.into()) } pub fn wait_for_board( @@ -77,7 +104,7 @@ pub fn wait_for_board( let deadline = Instant::now() + Duration::from_secs(10); loop { - let mut session = store.session(); + let mut session = store.workspace(); if let Some(board) = session .load::(board_key(board_id)) .include("cards") diff --git a/tests/distributed_read_model_board/query_service/mod.rs b/tests/distributed_read_model_board/query_service/mod.rs index 568404d..9472635 100644 --- a/tests/distributed_read_model_board/query_service/mod.rs +++ b/tests/distributed_read_model_board/query_service/mod.rs @@ -1,7 +1,7 @@ //! Read-only query service for the board read model. Primary-key loads plus //! `has_many` / `belongs_to` relationship includes. -use sourced_rust::{InMemoryReadModelStore, ReadModelError, ReadModelUnitOfWorkExt}; +use sourced_rust::{InMemoryReadModelStore, ReadModelError, ReadModelWorkspaceExt}; use crate::read_models::{board_key, card_key, BoardView, CardView}; @@ -17,7 +17,7 @@ impl BoardQueryService { /// Load a board with its cards (`has_many` include). pub fn board_with_cards(&self, board_id: &str) -> Result, ReadModelError> { - let mut session = self.store.session(); + let mut session = self.store.workspace(); Ok(session .load::(board_key(board_id)) .include("cards") @@ -31,7 +31,7 @@ impl BoardQueryService { board_id: &str, card_id: &str, ) -> Result, ReadModelError> { - let mut session = self.store.session(); + let mut session = self.store.workspace(); Ok(session .load::(card_key(board_id, card_id)) .include("board") diff --git a/tests/microsvc/basic.rs b/tests/microsvc/basic.rs index 3a24c45..cd0403b 100644 --- a/tests/microsvc/basic.rs +++ b/tests/microsvc/basic.rs @@ -8,7 +8,7 @@ use crate::models::counter::{Counter, CreateCounter, DecrementCounter, Increment #[test] fn full_lifecycle() { - let service = Service::new(HashMapRepository::new()) + let service = Service::with_repo(HashMapRepository::new()) .command("counter.create", |ctx| { let input = ctx.input::()?; let counter_repo = ctx.repo().clone().aggregate::(); diff --git a/tests/microsvc/convention.rs b/tests/microsvc/convention.rs index 971fa4f..4770bc6 100644 --- a/tests/microsvc/convention.rs +++ b/tests/microsvc/convention.rs @@ -21,7 +21,7 @@ use crate::models::counter::Counter; #[test] fn register_handlers_and_dispatch() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, ); @@ -54,7 +54,7 @@ fn register_handlers_and_dispatch() { #[test] fn guard_rejects_bad_input() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, ); @@ -65,7 +65,7 @@ fn guard_rejects_bad_input() { #[test] fn handler_rejects_duplicate_create() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, ); @@ -84,7 +84,7 @@ fn handler_rejects_duplicate_create() { #[test] fn create_persists_outbox_message() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, ); @@ -108,7 +108,7 @@ fn create_persists_outbox_message() { #[test] fn duplicate_create_leaves_single_outbox_message() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, ); @@ -133,7 +133,7 @@ fn duplicate_create_leaves_single_outbox_message() { #[test] fn increment_persists_outbox_message() { let service = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, ); diff --git a/tests/microsvc/session.rs b/tests/microsvc/session.rs index 49db403..2ad5912 100644 --- a/tests/microsvc/session.rs +++ b/tests/microsvc/session.rs @@ -2,12 +2,11 @@ use serde_json::json; use sourced_rust::microsvc::{HandlerError, Service, Session}; -use sourced_rust::HashMapRepository; use std::collections::HashMap; #[test] fn handler_accesses_user_id() { - let service = Service::new(HashMapRepository::new()).command("whoami", |ctx| { + let service = Service::new(()).command("whoami", |ctx| { let user_id = ctx.user_id()?; Ok(json!({ "user_id": user_id })) }); @@ -22,7 +21,7 @@ fn handler_accesses_user_id() { #[test] fn missing_user_id_returns_unauthorized() { - let service = Service::new(HashMapRepository::new()).command("whoami", |ctx| { + let service = Service::new(()).command("whoami", |ctx| { let _user_id = ctx.user_id()?; Ok(json!({})) }); diff --git a/tests/microsvc/transport_grpc.rs b/tests/microsvc/transport_grpc.rs index e571b19..59f23de 100644 --- a/tests/microsvc/transport_grpc.rs +++ b/tests/microsvc/transport_grpc.rs @@ -19,7 +19,7 @@ use crate::models::counter::Counter; fn counter_service() -> Arc> { Arc::new(sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, handlers::whoami, diff --git a/tests/microsvc/transport_http.rs b/tests/microsvc/transport_http.rs index d73c26e..d9962e1 100644 --- a/tests/microsvc/transport_http.rs +++ b/tests/microsvc/transport_http.rs @@ -14,7 +14,7 @@ use crate::models::counter::Counter; fn counter_service() -> Arc> { Arc::new(sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, handlers::whoami, diff --git a/tests/microsvc/transport_listen.rs b/tests/microsvc/transport_listen.rs index fac6cf8..45ebddc 100644 --- a/tests/microsvc/transport_listen.rs +++ b/tests/microsvc/transport_listen.rs @@ -18,7 +18,7 @@ use crate::models::counter::Counter; fn counter_service() -> Arc> { Arc::new(sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, handlers::whoami, @@ -154,12 +154,12 @@ fn multiple_services_on_different_queues() { let store = HashMapRepository::new(); let service_a = Arc::new(sourced_rust::register_handlers!( - Service::new(store.clone().queued().aggregate::()), + Service::with_repo(store.clone().queued().aggregate::()), handlers::counter_create, )); let service_b = Arc::new(sourced_rust::register_handlers!( - Service::new(store.queued().aggregate::()), + Service::with_repo(store.queued().aggregate::()), handlers::counter_increment, )); diff --git a/tests/microsvc/transport_subscribe.rs b/tests/microsvc/transport_subscribe.rs index 966194e..cd138cb 100644 --- a/tests/microsvc/transport_subscribe.rs +++ b/tests/microsvc/transport_subscribe.rs @@ -17,7 +17,7 @@ use crate::models::counter::Counter; fn counter_service() -> Arc> { Arc::new(sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::counter_create, handlers::counter_increment, handlers::whoami, diff --git a/tests/postgres_repository/main.rs b/tests/postgres_repository/main.rs index 38eb421..2d34458 100644 --- a/tests/postgres_repository/main.rs +++ b/tests/postgres_repository/main.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, AsyncOutboxStore, AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, Entity, - EventRecord, OutboxMessageStatus, PostgresRepository, ReadModel, ReadModelSession, + EventRecord, OutboxMessageStatus, PostgresRepository, ReadModel, ReadModelWritePlanBuilder, RepositoryError, SnapshotRecord, StreamIdentity, }; @@ -267,7 +267,7 @@ async fn read_model_plans_are_rejected_in_first_pass() { let mut entity = Entity::with_id(&id); entity.digest_empty("Touched").unwrap(); let identity = StreamIdentity::new(Counter::aggregate_type(), &id).unwrap(); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session.document(&CounterView { id, value: 1 }).unwrap(); let err = repo diff --git a/tests/read_model_commit_bridge/main.rs b/tests/read_model_commit_bridge/main.rs index e43d9f3..3b140fb 100644 --- a/tests/read_model_commit_bridge/main.rs +++ b/tests/read_model_commit_bridge/main.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Entity, EventRecord, HashMapRepository, ReadModel, ReadModelSession, - ReadModelSessionCommitExt, ReadModelStore, + impl_aggregate, Entity, EventRecord, HashMapRepository, ReadModel, ReadModelStore, + ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt, }; #[derive(Default)] @@ -39,7 +39,7 @@ fn repo_first_read_models_session_commit_form_is_available() { id: "view-1".into(), value: 42, }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session.document(&view).unwrap(); let mut aggregate = TestAggregate::default(); aggregate.touch(); diff --git a/tests/read_model_distributed_idempotency/main.rs b/tests/read_model_distributed_idempotency/main.rs index d96b1b0..88b2b37 100644 --- a/tests/read_model_distributed_idempotency/main.rs +++ b/tests/read_model_distributed_idempotency/main.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; use sourced_rust::bus::{Event, Publisher, Subscriber}; use sourced_rust::{ - InMemoryQueue, InMemoryReadModelStore, ReadModel, ReadModelError, ReadModelSession, - ReadModelSessionStore, ReadModelStore, RowKey, RowValue, + InMemoryQueue, InMemoryReadModelStore, ReadModel, ReadModelError, ReadModelStore, + ReadModelWritePlanBuilder, ReadModelWritePlanStore, RowKey, RowValue, }; const CONSUMER: &str = "counter-projection"; @@ -23,8 +23,8 @@ struct RelationalCounter { value: i32, } -fn counter_session(view: &CounterView, message_id: &str) -> ReadModelSession { - let mut session = ReadModelSession::new(); +fn counter_session(view: &CounterView, message_id: &str) -> ReadModelWritePlanBuilder { + let mut session = ReadModelWritePlanBuilder::new(); session .document(view) .unwrap() @@ -102,14 +102,14 @@ fn read_model_write_and_processed_mark_are_atomic() { id: "counter-1".into(), value: 1, }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .document(&view) .unwrap() .mark_processed(CONSUMER, "message-1") .expect_version::(relational_counter_key("counter-1"), 99) .unwrap() - .save(&row) + .upsert(&row) .unwrap(); let err = session.commit(&store).unwrap_err(); @@ -139,11 +139,11 @@ fn ack_happens_only_after_successful_standalone_commit() { id: "counter-1".into(), value: 1, }; - let mut failed_session = ReadModelSession::new(); + let mut failed_session = ReadModelWritePlanBuilder::new(); failed_session .expect_version::(relational_counter_key("counter-1"), 99) .unwrap() - .save(&row) + .upsert(&row) .unwrap() .mark_processed(CONSUMER, &failed.id); diff --git a/tests/read_model_document_conformance/main.rs b/tests/read_model_document_conformance/main.rs index 8fe7a92..303fe00 100644 --- a/tests/read_model_document_conformance/main.rs +++ b/tests/read_model_document_conformance/main.rs @@ -1,7 +1,8 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ HashMapRepository, InMemoryReadModelStore, Lock, LockManager, QueuedReadModelStore, ReadModel, - ReadModelError, ReadModelSession, ReadModelSessionCommitExt, ReadModelStore, ReadOpts, + ReadModelError, ReadModelStore, ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt, + ReadOpts, }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] @@ -67,8 +68,8 @@ fn document_view(id: &str, value: i32, category: &str) -> DocumentView { } } -fn document_session(view: &DocumentView) -> ReadModelSession { - let mut session = ReadModelSession::new(); +fn document_session(view: &DocumentView) -> ReadModelWritePlanBuilder { + let mut session = ReadModelWritePlanBuilder::new(); session.document(view).unwrap(); session } @@ -101,7 +102,7 @@ fn document_session_keys_distinguish_colons_in_collection_and_id() { id: "b:c".into(), value: 2, }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session.document(&left).unwrap(); session.document(&right).unwrap(); @@ -134,7 +135,7 @@ fn unsupported_row_plan_rejection_does_not_apply_prior_document_write() { value: 20, }; let mut session = document_session(&document); - session.save(&relational).unwrap(); + session.upsert(&relational).unwrap(); let err = repo.read_models(session).commit_all().unwrap_err(); @@ -262,8 +263,8 @@ fn queued_session_commit_failure_keeps_lock_until_explicit_abort() { id: "relational".into(), value: 20, }; - let mut session = ReadModelSession::new(); - session.save(&relational).unwrap(); + let mut session = ReadModelWritePlanBuilder::new(); + session.upsert(&relational).unwrap(); let err = store.read_models(session).commit_all().unwrap_err(); diff --git a/tests/read_model_relationship_includes/main.rs b/tests/read_model_relationship_includes/main.rs index 9c35542..a9f4e95 100644 --- a/tests/read_model_relationship_includes/main.rs +++ b/tests/read_model_relationship_includes/main.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ InMemoryReadModelStore, ReadModel, ReadModelAdapterCapabilities, ReadModelCommitOutcome, ReadModelError, ReadModelLoadGraph, ReadModelLoadRequest, ReadModelQueryCapabilities, - ReadModelSessionStore, ReadModelUnitOfWorkExt, ReadModelWritePlan, + ReadModelWorkspaceExt, ReadModelWritePlan, ReadModelWritePlanStore, RelationalReadModelQueryStore, RowKey, RowValue, }; @@ -75,7 +75,7 @@ impl NoIncludeStore { } } -impl ReadModelSessionStore for NoIncludeStore { +impl ReadModelWritePlanStore for NoIncludeStore { fn read_model_capabilities(&self) -> ReadModelAdapterCapabilities { self.inner.read_model_capabilities() } @@ -141,10 +141,10 @@ fn store_with_player_and_weapons( store.register_schema::().unwrap(); store.register_schema::().unwrap(); - let mut session = sourced_rust::ReadModelSession::new(); - session.save(&player("player-1", "Ada")).unwrap(); + let mut session = sourced_rust::ReadModelWritePlanBuilder::new(); + session.upsert(&player("player-1", "Ada")).unwrap(); for weapon in weapons { - session.save(&weapon).unwrap(); + session.upsert(&weapon).unwrap(); } session.commit(&store).unwrap(); store @@ -153,7 +153,7 @@ fn store_with_player_and_weapons( #[test] fn friendly_session_loads_one_root_by_primary_key_without_includes() { let store = store_with_player_and_weapons([]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(player_key("player-1")) @@ -166,7 +166,7 @@ fn friendly_session_loads_one_root_by_primary_key_without_includes() { #[test] fn friendly_session_hydrates_has_many_include() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(player_key("player-1")) @@ -181,7 +181,7 @@ fn friendly_session_hydrates_has_many_include() { #[test] fn friendly_session_hydrates_belongs_to_include() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(weapon_key("player-1", "sword")) @@ -194,9 +194,9 @@ fn friendly_session_hydrates_belongs_to_include() { } #[test] -fn save_changes_persists_loaded_scalar_field_without_manual_patch() { +fn sync_persists_loaded_scalar_field_without_manual_patch() { let store = store_with_player_and_weapons([]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let mut loaded = read_models .load::(player_key("player-1")) .one() @@ -205,10 +205,10 @@ fn save_changes_persists_loaded_scalar_field_without_manual_patch() { .data; loaded.display_name = "Ada Lovelace".into(); - read_models.save_changes(loaded).unwrap(); + read_models.sync(loaded).unwrap(); read_models.commit().unwrap(); - let mut check = store.session(); + let mut check = store.workspace(); let reloaded = check .load::(player_key("player-1")) .one() @@ -218,9 +218,9 @@ fn save_changes_persists_loaded_scalar_field_without_manual_patch() { } #[test] -fn save_changes_persists_added_and_modified_related_rows() { +fn sync_persists_added_and_modified_related_rows() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let mut loaded = read_models .load::(player_key("player-1")) .include("weapons") @@ -231,10 +231,10 @@ fn save_changes_persists_added_and_modified_related_rows() { loaded.weapons[0].acquired_at = "2026-05-24".into(); loaded.weapons.push(weapon("", "shield", "2026-05-25")); - read_models.save_changes(loaded).unwrap(); + read_models.sync(loaded).unwrap(); read_models.commit().unwrap(); - let mut check = store.session(); + let mut check = store.workspace(); let mut reloaded = check .load::(player_key("player-1")) .include("weapons") @@ -251,12 +251,12 @@ fn save_changes_persists_added_and_modified_related_rows() { } #[test] -fn save_changes_deletes_removed_related_rows() { +fn sync_deletes_removed_related_rows() { let store = store_with_player_and_weapons([ weapon("player-1", "shield", "2026-05-24"), weapon("player-1", "sword", "2026-05-23"), ]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let mut loaded = read_models .load::(player_key("player-1")) .include("weapons") @@ -266,10 +266,10 @@ fn save_changes_deletes_removed_related_rows() { .data; loaded.weapons.retain(|weapon| weapon.weapon_id == "sword"); - read_models.save_changes(loaded).unwrap(); + read_models.sync(loaded).unwrap(); read_models.commit().unwrap(); - let mut check = store.session(); + let mut check = store.workspace(); let reloaded = check .load::(player_key("player-1")) .include("weapons") @@ -281,9 +281,9 @@ fn save_changes_deletes_removed_related_rows() { } #[test] -fn save_changes_clearing_belongs_to_does_not_delete_target() { +fn sync_clearing_belongs_to_does_not_delete_target() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let mut loaded = read_models .load::(weapon_key("player-1", "sword")) .include("player") @@ -294,10 +294,10 @@ fn save_changes_clearing_belongs_to_does_not_delete_target() { assert!(loaded.player.is_some()); loaded.player = None; - read_models.save_changes(loaded).unwrap(); + read_models.sync(loaded).unwrap(); read_models.commit().unwrap(); - let mut check = store.session(); + let mut check = store.workspace(); let player = check.load::(player_key("player-1")).one().unwrap(); assert_eq!(player.unwrap().data.display_name, "Ada"); } @@ -305,7 +305,7 @@ fn save_changes_clearing_belongs_to_does_not_delete_target() { #[test] fn missing_root_returns_none_without_include_loading() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(player_key("missing")) @@ -320,10 +320,10 @@ fn missing_root_returns_none_without_include_loading() { fn unregistered_relationship_target_fails_before_loading() { let store = InMemoryReadModelStore::new(); store.register_schema::().unwrap(); - let mut session = sourced_rust::ReadModelSession::new(); - session.save(&player("player-1", "Ada")).unwrap(); + let mut session = sourced_rust::ReadModelWritePlanBuilder::new(); + session.upsert(&player("player-1", "Ada")).unwrap(); session.commit(&store).unwrap(); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(player_key("player-1")) @@ -339,7 +339,7 @@ fn unregistered_relationship_target_fails_before_loading() { #[test] fn unregistered_root_schema_fails_before_loading() { let store = InMemoryReadModelStore::new(); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(player_key("player-1")) @@ -355,7 +355,7 @@ fn unregistered_root_schema_fails_before_loading() { fn adapter_without_include_capability_rejects_includes() { let inner = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); let store = NoIncludeStore::new(inner); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(player_key("player-1")) @@ -371,7 +371,7 @@ fn adapter_without_include_capability_rejects_includes() { #[test] fn nested_query_style_include_paths_are_not_a_public_query_dsl() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(player_key("player-1")) @@ -388,7 +388,7 @@ fn nested_query_style_include_paths_are_not_a_public_query_dsl() { fn many_to_many_include_fails_until_join_metadata_is_rich_enough() { let store = InMemoryReadModelStore::new(); store.register_schema::().unwrap(); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(player_key("player-1")) @@ -406,22 +406,22 @@ fn belongs_to_include_rejects_composite_target_primary_key() { let store = InMemoryReadModelStore::new(); store.register_schema::().unwrap(); store.register_schema::().unwrap(); - let mut session = sourced_rust::ReadModelSession::new(); + let mut session = sourced_rust::ReadModelWritePlanBuilder::new(); session - .save(&WeaponLabelRef { + .upsert(&WeaponLabelRef { ref_id: "ref-1".into(), player_id: "player-1".into(), label: None, }) .unwrap() - .save(&CompositeWeaponLabel { + .upsert(&CompositeWeaponLabel { player_id: "player-1".into(), weapon_id: "sword".into(), label: "Sword".into(), }) .unwrap(); session.commit(&store).unwrap(); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let err = read_models .load::(RowKey::new([("ref_id", RowValue::String("ref-1".into()))])) diff --git a/tests/read_model_session/main.rs b/tests/read_model_session/main.rs index 1a19685..db3672d 100644 --- a/tests/read_model_session/main.rs +++ b/tests/read_model_session/main.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use sourced_rust::{ ExpectedVersion, InMemoryReadModelStore, PatchMode, ReadModel, ReadModelAdapterCapabilities, - ReadModelError, ReadModelMutation, ReadModelSession, ReadModelUnitOfWorkExt, RowKey, RowPatch, - RowValue, RowWriteMode, Versioned, + ReadModelError, ReadModelMutation, ReadModelWorkspaceExt, ReadModelWritePlanBuilder, RowKey, + RowPatch, RowValue, RowWriteMode, Versioned, }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] @@ -61,7 +61,7 @@ fn account_key(account_id: &str) -> RowKey { #[test] fn session_stages_multiple_read_model_types_in_deterministic_plan() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let weapon = PlayerWeapon { player_id: "player-1".into(), weapon_id: "sword".into(), @@ -69,7 +69,7 @@ fn session_stages_multiple_read_model_types_in_deterministic_plan() { }; let account = AccountSummary::new("acct-1"); - session.save(&weapon).unwrap().save(&account).unwrap(); + session.upsert(&weapon).unwrap().upsert(&account).unwrap(); let plan = session.into_write_plan().unwrap(); @@ -80,13 +80,13 @@ fn session_stages_multiple_read_model_types_in_deterministic_plan() { #[test] fn write_plan_contains_document_and_relational_rows_only() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let account = AccountSummary::new("acct-1"); session .document(&account) .unwrap() - .save(&account) + .upsert(&account) .unwrap() .delete::(account_key("acct-2")) .unwrap(); @@ -100,11 +100,11 @@ fn write_plan_contains_document_and_relational_rows_only() { #[test] fn sparse_patches_and_full_replacements_are_distinct() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let account = AccountSummary::new("acct-1"); let patch = RowPatch::new().set("owner", RowValue::Null); - session.save(&account).unwrap(); + session.upsert(&account).unwrap(); session .patch::(account_key("acct-1"), patch) .unwrap(); @@ -128,7 +128,7 @@ fn sparse_patches_and_full_replacements_are_distinct() { #[test] fn insert_and_upsert_patch_carry_explicit_missing_row_behavior() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let account = AccountSummary::new("acct-1"); let patch = RowPatch::new().set("owner", RowValue::String("Grace".into())); @@ -164,14 +164,14 @@ fn insert_missing_patch_builds_full_row_from_key_before_insert() { "counters_by_game", RowValue::Json(serde_json::json!({"deposits": 1})), ); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .upsert_patch::(account_key("acct-1"), patch) .unwrap(); session.commit(&store).unwrap(); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(account_key("acct-1")) .one() @@ -192,7 +192,7 @@ fn insert_missing_patch_rejects_primary_key_mismatch() { .set("balance_cents", RowValue::I64(250)) .set("deposit_count", RowValue::U64(2)) .set("counters_by_game", RowValue::Json(serde_json::json!({}))); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .upsert_patch::(account_key("acct-1"), patch) @@ -209,7 +209,7 @@ fn insert_missing_patch_rejects_partial_new_row() { let store = InMemoryReadModelStore::new(); store.register_schema::().unwrap(); let patch = RowPatch::new().set("owner", RowValue::String("Grace".into())); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .upsert_patch::(account_key("acct-1"), patch) @@ -220,7 +220,7 @@ fn insert_missing_patch_rejects_partial_new_row() { matches!(err, ReadModelError::Metadata(message) if message.contains("missing required column `balance_cents`")) ); - let mut read_models = store.session(); + let mut read_models = store.workspace(); let loaded = read_models .load::(account_key("acct-1")) .one() @@ -231,13 +231,13 @@ fn insert_missing_patch_rejects_partial_new_row() { #[test] fn existing_patch_rejects_primary_key_mismatch() { let store = InMemoryReadModelStore::new(); - let mut setup = ReadModelSession::new(); - setup.save(&AccountSummary::new("acct-1")).unwrap(); + let mut setup = ReadModelWritePlanBuilder::new(); + setup.upsert(&AccountSummary::new("acct-1")).unwrap(); setup.commit(&store).unwrap(); let patch = RowPatch::new() .set("account_id", RowValue::String("acct-2".into())) .set("owner", RowValue::String("Grace".into())); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .patch::(account_key("acct-1"), patch) @@ -261,9 +261,9 @@ fn relationship_operation_populates_child_foreign_key_in_explicit_row_mutation() weapon_id: "sword".into(), acquired_at: "2026-05-23T00:00:00Z".into(), }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); - session.save_related(&player, "weapons", &weapon).unwrap(); + session.upsert_related(&player, "weapons", &weapon).unwrap(); let plan = session.into_write_plan().unwrap(); @@ -289,12 +289,12 @@ fn expected_versions_and_processed_messages_are_carried_into_plan() { version: 7, }; account.balance_cents = 250; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .track_loaded(&loaded) .unwrap() - .save(&account) + .upsert(&account) .unwrap() .mark_processed("account-projection", "message-1"); @@ -313,7 +313,7 @@ fn expected_versions_and_processed_messages_are_carried_into_plan() { #[test] fn load_requests_validate_primary_keys_and_explicit_relationship_includes() { - let session = ReadModelSession::new(); + let session = ReadModelWritePlanBuilder::new(); let request = session .load_with::( @@ -336,7 +336,7 @@ fn load_requests_validate_primary_keys_and_explicit_relationship_includes() { #[test] fn validation_failures_happen_before_storage_writes() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let patch = RowPatch::new().set("balance_cents", RowValue::Null); session @@ -350,7 +350,7 @@ fn validation_failures_happen_before_storage_writes() { #[test] fn write_plan_validation_reports_unsupported_adapter_capabilities() { - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); let patch = RowPatch::new().set("owner", RowValue::String("Grace".into())); session .patch::(account_key("acct-1"), patch) diff --git a/tests/read_models/main.rs b/tests/read_models/main.rs index 2ae125f..46f59fe 100644 --- a/tests/read_models/main.rs +++ b/tests/read_models/main.rs @@ -11,7 +11,7 @@ use std::time::Duration; use aggregate::Counter; use sourced_rust::{ AggregateBuilder, CommitBuilderExt, HashMapRepository, OutboxMessage, QueuedReadModelStore, - ReadModelSession, ReadModelStore, ReadModelsExt, ReadOpts, + ReadModelStore, ReadModelWritePlanBuilder, ReadModelsExt, ReadOpts, }; use views::{CounterView, UserCountersIndexView}; @@ -243,7 +243,7 @@ fn commit_all_without_aggregate() { fn standalone_session_commit_and_primary_key_read() { let repo = HashMapRepository::new(); let view = CounterView::new("session-standalone", "Session", "user-session"); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session.document(&view).unwrap(); let outcome = session.commit(&repo).unwrap(); @@ -271,10 +271,10 @@ fn read_models_session_commits_with_aggregate() { let mut view = CounterView::new("counter-session", "Session Commit", "user-session"); view.set_value(counter.value()); - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session.document(&view).unwrap(); - sourced_rust::ReadModelSessionCommitExt::read_models(&repo, session) + sourced_rust::ReadModelWritePlanCommitExt::read_models(&repo, session) .commit(&mut counter) .unwrap(); diff --git a/tests/sagas/microsvc_saga.rs b/tests/sagas/microsvc_saga.rs index 3e74e5c..cfad256 100644 --- a/tests/sagas/microsvc_saga.rs +++ b/tests/sagas/microsvc_saga.rs @@ -4,7 +4,7 @@ //! organized by service domain under `handlers/`. //! //! Each service is typed to a specific aggregate via -//! `Service::new(repo.queued().aggregate::())`, so handlers access +//! `Service::with_repo(repo.queued().aggregate::())`, so handlers access //! `ctx.repo().get()`, `ctx.repo().commit()`, etc. directly. //! //! Two tests: @@ -45,7 +45,7 @@ use super::order::{Inventory, Order, OrderFulfillmentSaga, OrderStatus, Payment, #[test] fn saga_orchestrated() { let saga_svc = sourced_rust::register_handlers!( - Service::new( + Service::with_repo( HashMapRepository::new() .queued() .aggregate::() @@ -58,19 +58,19 @@ fn saga_orchestrated() { ); let order_svc = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::orders::create, handlers::orders::complete, ); let inventory_svc = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::inventory::init, handlers::inventory::reserve, ); let payment_svc = sourced_rust::register_handlers!( - Service::new(HashMapRepository::new().queued().aggregate::()), + Service::with_repo(HashMapRepository::new().queued().aggregate::()), handlers::payments::process, ); @@ -242,7 +242,7 @@ fn saga_distributed() { let saga_worker = OutboxWorkerThread::spawn_routed(saga_repo.outbox_store(), queue.clone(), poll); let saga_svc = Arc::new(sourced_rust::register_handlers!( - Service::new(saga_repo.queued().aggregate::()), + Service::with_repo(saga_repo.queued().aggregate::()), handlers::saga::start, handlers::saga::on_order_created, handlers::saga::on_inventory_reserved, @@ -256,7 +256,7 @@ fn saga_distributed() { let order_worker = OutboxWorkerThread::spawn_routed(order_repo.outbox_store(), queue.clone(), poll); let order_svc = Arc::new(sourced_rust::register_handlers!( - Service::new(order_repo.queued().aggregate::()), + Service::with_repo(order_repo.queued().aggregate::()), handlers::orders::create, handlers::orders::complete, )); @@ -276,7 +276,7 @@ fn saga_distributed() { } let inventory_svc = Arc::new(sourced_rust::register_handlers!( - Service::new(inventory_repo.queued().aggregate::()), + Service::with_repo(inventory_repo.queued().aggregate::()), handlers::inventory::init, handlers::inventory::reserve, )); @@ -288,7 +288,7 @@ fn saga_distributed() { let payment_worker = OutboxWorkerThread::spawn_routed(payment_repo.outbox_store(), queue.clone(), poll); let payment_svc = Arc::new(sourced_rust::register_handlers!( - Service::new(payment_repo.queued().aggregate::()), + Service::with_repo(payment_repo.queued().aggregate::()), handlers::payments::process, )); let payment_listen = microsvc::listen(payment_svc.clone(), "payments", queue.clone(), poll); diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index 93fa262..11ef77a 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -3,10 +3,10 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelSessionStore, AsyncReadModelStore, AsyncSnapshotStore, + AsyncOutboxStore, AsyncReadModelStore, AsyncReadModelWritePlanStore, AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, Entity, EventRecord, OutboxMessageStatus, - ReadModel, ReadModelSession, RepositoryError, SnapshotRecord, SqliteRepository, StreamIdentity, - TableSchemaRegistry, OUTBOX_MESSAGES_TABLE, + ReadModel, ReadModelWritePlanBuilder, RepositoryError, SnapshotRecord, SqliteRepository, + StreamIdentity, TableSchemaRegistry, OUTBOX_MESSAGES_TABLE, }; #[derive(Default)] @@ -173,7 +173,7 @@ async fn optimistic_conflict_rolls_back_other_stream_and_read_model_plan() { id: "should-not-commit".into(), value: 99, }; - let mut read_models = ReadModelSession::new(); + let mut read_models = ReadModelWritePlanBuilder::new(); read_models.document(&view).unwrap(); let stale_identity = StreamIdentity::new(Counter::aggregate_type(), "conflict-1").unwrap(); @@ -210,7 +210,7 @@ async fn read_model_session_persists_documents_and_processed_marks() { id: "view-1".into(), value: 42, }; - let mut session = ReadModelSession::new(); + let mut session = ReadModelWritePlanBuilder::new(); session .document(&view) .unwrap() @@ -232,7 +232,7 @@ async fn read_model_session_persists_documents_and_processed_marks() { assert_eq!(loaded.data, view); assert!(processed); - let mut duplicate = ReadModelSession::new(); + let mut duplicate = ReadModelWritePlanBuilder::new(); duplicate .document(&CounterView { id: "view-1".into(), From 8528669373d1893252de8462183e35ccc6255228 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Wed, 27 May 2026 12:45:38 -0500 Subject: [PATCH 2/4] chore: run ci for all prs --- .github/workflows/on-pr-quality.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/on-pr-quality.yaml b/.github/workflows/on-pr-quality.yaml index b2fe6c2..3c45578 100644 --- a/.github/workflows/on-pr-quality.yaml +++ b/.github/workflows/on-pr-quality.yaml @@ -2,8 +2,6 @@ name: Rust Quality Pipeline for PRs on: pull_request: - branches: - - main jobs: quality: From bb854d041991cc181c00c159ff233487430c8249 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Wed, 27 May 2026 13:10:37 -0500 Subject: [PATCH 3/4] docs: refresh read model write plan examples --- src/commit_builder/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/commit_builder/mod.rs b/src/commit_builder/mod.rs index 15d0887..b776dd2 100644 --- a/src/commit_builder/mod.rs +++ b/src/commit_builder/mod.rs @@ -19,11 +19,19 @@ //! .commit(&mut game)?; //! //! // Ordering is semantic staging only. +//! let mut read_models = sourced_rust::ReadModelWritePlanBuilder::new(); +//! read_models.upsert(&player)?; +//! read_models.upsert_related(&player, "weapons", &weapon)?; +//! //! repo //! .outbox(message) //! .read_models(read_models) //! .commit(&mut game)?; //! +//! let mut read_models = sourced_rust::ReadModelWritePlanBuilder::new(); +//! read_models.upsert(&player)?; +//! read_models.upsert_related(&player, "weapons", &weapon)?; +//! //! repo //! .aggregate(&mut game) //! .read_models(read_models) From a0d441849b4e6be051ebbfb8848305063d9fd719 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Wed, 27 May 2026 13:13:51 -0500 Subject: [PATCH 4/4] fix: refresh read model sync baselines --- src/read_model/session.rs | 112 ++++++++++++++---- .../read_model_relationship_includes/main.rs | 55 +++++++++ 2 files changed, 143 insertions(+), 24 deletions(-) diff --git a/src/read_model/session.rs b/src/read_model/session.rs index 5dd2658..11538cb 100644 --- a/src/read_model/session.rs +++ b/src/read_model/session.rs @@ -790,6 +790,8 @@ struct TrackedModelBaseline { includes: BTreeMap, } +const INITIAL_TRACKED_ROW_VERSION: u64 = 1; + /// Store-bound read-model workspace for load, mutate, sync, commit workflows. pub struct ReadModelWorkspace<'a, S> { store: &'a S, @@ -836,35 +838,49 @@ where table_name: schema.table_name.clone(), key: key_fingerprint(&key), }; - let baseline = self + let baseline_index = self .baselines .iter() - .find(|baseline| { + .position(|baseline| { baseline.root_schema.table_name == identity.table_name && key_fingerprint(&baseline.root_key) == identity.key }) - .cloned() .ok_or_else(|| { ReadModelError::Metadata(format!( "read model `{}` has no tracked baseline for sync", schema.model_name )) })?; + let baseline = self.baselines[baseline_index].clone(); let current_row = model.to_row()?; - self.stage_row_diff( - schema, - key, - &baseline.root_row, - ¤t_row, - baseline.root_version, - )?; - + let root_version = self + .stage_row_diff( + schema.clone(), + key.clone(), + &baseline.root_row, + ¤t_row, + baseline.root_version, + )? + .unwrap_or(baseline.root_version); + + let mut refreshed_includes = BTreeMap::new(); for (include_name, include) in &baseline.includes { let current_rows = model.include_rows(include_name)?; - self.stage_include_changes(&baseline.root_schema, ¤t_row, include, current_rows)?; + let refreshed_include = + self.stage_include_changes(&schema, ¤t_row, include, current_rows)?; + refreshed_includes.insert(include_name.clone(), refreshed_include); } + self.writes.expected_versions.insert(identity, root_version); + self.baselines[baseline_index] = TrackedModelBaseline { + root_schema: schema, + root_key: key, + root_row: current_row, + root_version, + includes: refreshed_includes, + }; + Ok(self) } @@ -1035,7 +1051,7 @@ where root_row: &RowValues, baseline: &TrackedIncludeBaseline, current_rows: Vec, - ) -> Result<(), ReadModelError> { + ) -> Result { if matches!(baseline.relationship.kind, RelationshipKind::BelongsTo) && current_rows.len() > 1 { @@ -1046,6 +1062,7 @@ where } let mut current_fingerprints = BTreeSet::new(); + let mut refreshed_rows = BTreeMap::new(); for mut current_row in current_rows { match baseline.relationship.kind { RelationshipKind::HasMany => populate_delegated_relationship_values( @@ -1068,15 +1085,37 @@ where let fingerprint = key_fingerprint(&key); current_fingerprints.insert(fingerprint.clone()); if let Some(loaded) = baseline.rows.get(&fingerprint) { - self.stage_row_diff( + let version = self + .stage_row_diff( + baseline.target_schema.clone(), + loaded.key.clone(), + &loaded.row, + ¤t_row, + loaded.version, + )? + .unwrap_or(loaded.version); + refreshed_rows.insert( + fingerprint, + TrackedRowBaseline { + key, + row: current_row, + version, + }, + ); + } else { + self.stage_upsert_row( baseline.target_schema.clone(), - loaded.key.clone(), - &loaded.row, - ¤t_row, - loaded.version, + key.clone(), + current_row.clone(), )?; - } else { - self.stage_upsert_row(baseline.target_schema.clone(), key, current_row)?; + refreshed_rows.insert( + fingerprint, + TrackedRowBaseline { + key, + row: current_row, + version: INITIAL_TRACKED_ROW_VERSION, + }, + ); } } @@ -1093,9 +1132,19 @@ where )?; } } + } else { + for (fingerprint, loaded) in &baseline.rows { + if !current_fingerprints.contains(fingerprint) { + refreshed_rows.insert(fingerprint.clone(), loaded.clone()); + } + } } - Ok(()) + Ok(TrackedIncludeBaseline { + relationship: baseline.relationship.clone(), + target_schema: baseline.target_schema.clone(), + rows: refreshed_rows, + }) } fn stage_row_diff( @@ -1105,11 +1154,12 @@ where before: &RowValues, after: &RowValues, expected_version: u64, - ) -> Result<(), ReadModelError> { + ) -> Result, ReadModelError> { let patch = diff_rows(before, after); if patch.is_empty() { - return Ok(()); + return Ok(None); } + let next_version = next_tracked_version(&schema, &key, expected_version)?; let mutation = PatchRowMutation { schema, @@ -1120,7 +1170,7 @@ where }; validate_patch_mutation(&mutation)?; self.writes.push(ReadModelMutation::PatchRow(mutation)); - Ok(()) + Ok(Some(next_version)) } fn stage_upsert_row( @@ -1229,6 +1279,20 @@ fn diff_rows(before: &RowValues, after: &RowValues) -> RowPatch { patch } +fn next_tracked_version( + schema: &ReadModelSchema, + key: &RowKey, + current_version: u64, +) -> Result { + current_version.checked_add(1).ok_or_else(|| { + ReadModelError::Storage(format!( + "read model version overflow for {}:{}", + schema.table_name, + key_fingerprint(key) + )) + }) +} + fn validated_schema() -> Result where M: RelationalReadModel, diff --git a/tests/read_model_relationship_includes/main.rs b/tests/read_model_relationship_includes/main.rs index a9f4e95..b216a9f 100644 --- a/tests/read_model_relationship_includes/main.rs +++ b/tests/read_model_relationship_includes/main.rs @@ -217,6 +217,32 @@ fn sync_persists_loaded_scalar_field_without_manual_patch() { assert_eq!(reloaded.data.display_name, "Ada Lovelace"); } +#[test] +fn sync_refreshes_loaded_root_baseline_between_calls() { + let store = store_with_player_and_weapons([]); + let mut read_models = store.workspace(); + let mut loaded = read_models + .load::(player_key("player-1")) + .one() + .unwrap() + .unwrap() + .data; + + loaded.display_name = "Ada Lovelace".into(); + read_models.sync(loaded.clone()).unwrap(); + loaded.display_name = "Countess Lovelace".into(); + read_models.sync(loaded).unwrap(); + read_models.commit().unwrap(); + + let mut check = store.workspace(); + let reloaded = check + .load::(player_key("player-1")) + .one() + .unwrap() + .unwrap(); + assert_eq!(reloaded.data.display_name, "Countess Lovelace"); +} + #[test] fn sync_persists_added_and_modified_related_rows() { let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); @@ -250,6 +276,35 @@ fn sync_persists_added_and_modified_related_rows() { assert_eq!(reloaded.weapons[1].acquired_at, "2026-05-24"); } +#[test] +fn sync_refreshes_loaded_include_baseline_between_calls() { + let store = store_with_player_and_weapons([weapon("player-1", "sword", "2026-05-23")]); + let mut read_models = store.workspace(); + let mut loaded = read_models + .load::(player_key("player-1")) + .include("weapons") + .one() + .unwrap() + .unwrap() + .data; + + loaded.weapons[0].acquired_at = "2026-05-24".into(); + read_models.sync(loaded.clone()).unwrap(); + loaded.weapons[0].acquired_at = "2026-05-25".into(); + read_models.sync(loaded).unwrap(); + read_models.commit().unwrap(); + + let mut check = store.workspace(); + let reloaded = check + .load::(player_key("player-1")) + .include("weapons") + .one() + .unwrap() + .unwrap() + .data; + assert_eq!(reloaded.weapons[0].acquired_at, "2026-05-25"); +} + #[test] fn sync_deletes_removed_related_rows() { let store = store_with_player_and_weapons([