diff --git a/src/commit_builder/mod.rs b/src/commit_builder/mod.rs index e916e7e..25b9c46 100644 --- a/src/commit_builder/mod.rs +++ b/src/commit_builder/mod.rs @@ -576,8 +576,8 @@ impl AsyncReadModelWritePlanCommitExt for R {} mod tests { use super::*; use crate::{ - impl_aggregate, AsyncTransactionalCommit, Entity, EventRecord, Get, HashMapRepository, - ReadModelWorkspaceExt, RowKey, RowValue, + sourced, AsyncTransactionalCommit, Entity, Get, HashMapRepository, ReadModelWorkspaceExt, + RowKey, RowValue, }; use serde::{Deserialize, Serialize}; use std::cell::RefCell; @@ -590,25 +590,20 @@ mod tests { entity: Entity, } + #[sourced(entity)] impl TestAggregate { + #[event("Touched")] fn touch(&mut self) { if self.entity.id().is_empty() { self.entity.set_id("agg-1"); } - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) } } - impl_aggregate!(TestAggregate, entity, replay); - #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, crate::ReadModel)] - #[readmodel(table = "commit_builder_views")] + #[table("commit_builder_views")] struct RelationalView { - #[readmodel(id)] + #[id] id: String, counter: i32, } @@ -773,7 +768,7 @@ mod tests { }; let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) .commit_sync(&mut agg) @@ -798,7 +793,7 @@ mod tests { }; let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let mut read_models = crate::read_model::ReadModelWritePlanBuilder::new(); read_models.upsert(&view1).unwrap().upsert(&view2).unwrap(); @@ -823,7 +818,7 @@ mod tests { let outbox = OutboxMessage::create("msg-1", "TestEvent", b"{}".to_vec()).unwrap(); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) .outbox_sync(outbox) @@ -845,7 +840,7 @@ mod tests { let outbox = OutboxMessage::create("msg-2", "TestEvent", b"{}".to_vec()).unwrap(); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); SyncCommitBuilderExt::outbox_sync(&repo, outbox) .read_models_sync(read_models(&view)) @@ -895,11 +890,11 @@ mod tests { }; let mut agg1 = TestAggregate::default(); - agg1.touch(); + agg1.touch().unwrap(); agg1.entity.set_id("agg-1"); let mut agg2 = TestAggregate::default(); - agg2.touch(); + agg2.touch().unwrap(); agg2.entity.set_id("agg-2"); SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) @@ -924,7 +919,7 @@ mod tests { }; let outbox = OutboxMessage::create("ordered-msg", "TestEvent", b"{}".to_vec()).unwrap(); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); match order { 0 => SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) @@ -961,7 +956,7 @@ mod tests { fn staged_commit_sets_outbox_source_from_single_aggregate() { let repo = RecordingBatchRepo::default(); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let outbox = OutboxMessage::create("sourced-msg", "TestEvent", b"{}".to_vec()).unwrap(); SyncReadModelWritePlanCommitExt::aggregate_sync(&repo, &mut agg) @@ -988,10 +983,10 @@ mod tests { counter: 77, }; let mut agg1 = TestAggregate::default(); - agg1.touch(); + agg1.touch().unwrap(); agg1.entity.set_id("agg-1"); let mut agg2 = TestAggregate::default(); - agg2.touch(); + agg2.touch().unwrap(); agg2.entity.set_id("agg-2"); SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) @@ -1016,7 +1011,7 @@ mod tests { let mut session = crate::read_model::ReadModelWritePlanBuilder::new(); session.mark_processed("", "message-1"); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let err = SyncReadModelWritePlanCommitExt::read_models_sync(&repo, session) .commit_sync(&mut agg) @@ -1042,7 +1037,7 @@ mod tests { }; let outbox = OutboxMessage::create("msg-rollback", "TestEvent", b"{}".to_vec()).unwrap(); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let err = SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view)) .outbox_sync(outbox) @@ -1082,7 +1077,7 @@ mod tests { counter: 42, }; let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); repo.read_models(read_models(&view)) .commit(&mut agg) @@ -1111,7 +1106,7 @@ mod tests { counter: 7, }; let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let outbox = OutboxMessage::create("async-msg", "TestEvent", b"{}".to_vec()).unwrap(); repo.read_models(read_models(&view)) @@ -1140,10 +1135,10 @@ mod tests { async fn async_commit_many_supports_same_type_aggregates() { let repo = RecordingAsyncBatchRepo::default(); let mut agg1 = TestAggregate::default(); - agg1.touch(); + agg1.touch().unwrap(); agg1.entity.set_id("async-agg-1"); let mut agg2 = TestAggregate::default(); - agg2.touch(); + agg2.touch().unwrap(); agg2.entity.set_id("async-agg-2"); AsyncCommitBuilder::new(&repo) @@ -1172,7 +1167,7 @@ mod tests { let mut read_models = crate::read_model::ReadModelWritePlanBuilder::new(); read_models.mark_processed("", "message-1"); let mut agg = TestAggregate::default(); - agg.touch(); + agg.touch().unwrap(); let err = repo .read_models(read_models) diff --git a/src/outbox/commit.rs b/src/outbox/commit.rs index e140b03..158e052 100644 --- a/src/outbox/commit.rs +++ b/src/outbox/commit.rs @@ -86,8 +86,8 @@ impl AsyncAggregateRepository { mod tests { use super::*; use crate::{ - impl_aggregate, AggregateBuilder, CommitBatch, Entity, EventRecord, HashMapRepository, - OutboxStore, TransactionalCommit, + sourced, AggregateBuilder, CommitBatch, Entity, HashMapRepository, OutboxStore, + TransactionalCommit, }; use std::cell::RefCell; @@ -96,21 +96,16 @@ mod tests { entity: Entity, } + #[sourced(entity)] impl Dummy { + #[event("Touched")] fn touch(&mut self) { if self.entity.id().is_empty() { self.entity.set_id("dummy-1"); } - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) } } - impl_aggregate!(Dummy, entity, replay); - #[derive(Default)] struct FailingOutboxRepo { seen_ids: RefCell>, @@ -139,7 +134,7 @@ mod tests { let repo = HashMapRepository::new().aggregate::(); let mut aggregate = Dummy::default(); - aggregate.touch(); + aggregate.touch().unwrap(); let event = OutboxMessage::create("msg-1", "DummyTouched", b"{}".to_vec()).unwrap(); @@ -155,7 +150,7 @@ mod tests { let repo = AggregateRepository::<_, Dummy>::new(FailingOutboxRepo::default()); let mut aggregate = Dummy::default(); - aggregate.touch(); + aggregate.touch().unwrap(); let event = OutboxMessage::create("msg-fail", "DummyTouched", b"{}".to_vec()).unwrap(); diff --git a/src/snapshot/repository.rs b/src/snapshot/repository.rs index 5931c97..ef4ba2f 100644 --- a/src/snapshot/repository.rs +++ b/src/snapshot/repository.rs @@ -542,7 +542,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{impl_aggregate, Aggregate, AggregateRepository, Entity, EventRecord}; + use crate::{sourced, Aggregate, AggregateRepository, Entity, EventRecord}; use std::cell::RefCell; #[derive(Default)] @@ -551,22 +551,17 @@ mod tests { value: u32, } + #[sourced(entity)] impl TestAggregate { + #[event("Touched")] fn touch(&mut self) { if self.entity.id().is_empty() { self.entity.set_id("snap-1"); } self.value += 1; - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) } } - impl_aggregate!(TestAggregate, entity, replay); - impl Snapshottable for TestAggregate { type Snapshot = u32; @@ -605,7 +600,7 @@ mod tests { let snapshot_repo = SnapshotAggregateRepository::new(aggregate_repo, 1); let mut aggregate = TestAggregate::default(); - aggregate.touch(); + aggregate.touch().unwrap(); let err = snapshot_repo.commit(&mut aggregate).unwrap_err(); diff --git a/tests/async_repository/main.rs b/tests/async_repository/main.rs index 1b5fc76..09467a8 100644 --- a/tests/async_repository/main.rs +++ b/tests/async_repository/main.rs @@ -2,12 +2,12 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, - AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, ClaimOutboxMessages, Entity, - EventRecord, HashMapRepository, InMemorySnapshotStore, OutboxMessage, ProcessedMessageMark, - ReadModel, ReadModelWritePlan, ReadModelWritePlanBuilder, RelationalReadModel, RepositoryError, - RowKey, RowValue, SnapshotRecord, Snapshottable, StreamIdentity, Versioned, + sourced, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, AsyncOutboxStore, + AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, AsyncSnapshotStore, + AsyncStreamWrite, AsyncTransactionalCommit, ClaimOutboxMessages, Entity, HashMapRepository, + InMemorySnapshotStore, OutboxMessage, ProcessedMessageMark, ReadModel, ReadModelWritePlan, + ReadModelWritePlanBuilder, RelationalReadModel, RepositoryError, RowKey, RowValue, + SnapshotRecord, Snapshottable, StreamIdentity, Versioned, }; #[derive(Default)] @@ -15,70 +15,42 @@ struct AlphaAggregate { entity: Entity, } +#[sourced(entity, aggregate_type = "async.alpha")] impl AlphaAggregate { - fn touch(&mut self, id: &str) { - self.entity.set_id(id); - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) + #[event("Touched")] + fn touch(&mut self, id: String) { + self.entity.set_id(&id); } } -impl_aggregate!( - AlphaAggregate, - entity, - replay, - aggregate_type = "async.alpha" -); - #[derive(Default)] struct BetaAggregate { entity: Entity, } +#[sourced(entity, aggregate_type = "async.beta")] impl BetaAggregate { - fn touch(&mut self, id: &str) { - self.entity.set_id(id); - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) + #[event("Touched")] + fn touch(&mut self, id: String) { + self.entity.set_id(&id); } } -impl_aggregate!(BetaAggregate, entity, replay, aggregate_type = "async.beta"); - #[derive(Default)] struct SnapshotCounter { entity: Entity, value: i32, } +#[sourced(entity, aggregate_type = "async.snapshot_counter")] impl SnapshotCounter { - fn increment(&mut self, id: &str, by: i32) { - self.entity.set_id(id); - self.entity.digest("Incremented", &by).unwrap(); + #[event("Incremented")] + fn increment(&mut self, id: String, by: i32) { + self.entity.set_id(&id); self.value += by; } - - fn replay(&mut self, event: &EventRecord) -> Result<(), String> { - if event.event_name == "Incremented" { - self.value += event.decode::().map_err(|err| err.to_string())?; - } - Ok(()) - } } -impl_aggregate!( - SnapshotCounter, - entity, - replay, - aggregate_type = "async.snapshot_counter" -); - impl Snapshottable for SnapshotCounter { type Snapshot = i32; @@ -92,9 +64,9 @@ impl Snapshottable for SnapshotCounter { } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "async_test_views")] +#[table("async_test_views")] struct TestView { - #[readmodel(id)] + #[id] id: String, value: i32, } @@ -121,9 +93,9 @@ async fn async_aggregate_repository_separates_streams_by_aggregate_type() { let beta_repo = repo.clone().async_aggregate::(); let mut alpha = AlphaAggregate::default(); - alpha.touch("shared-id"); + alpha.touch("shared-id".into()).unwrap(); let mut beta = BetaAggregate::default(); - beta.touch("shared-id"); + beta.touch("shared-id".into()).unwrap(); alpha_repo.commit(&mut alpha).await.unwrap(); beta_repo.commit(&mut beta).await.unwrap(); @@ -256,9 +228,9 @@ async fn async_snapshot_repository_writes_cache_without_event_record() { let id = "snapshot-counter-1"; let mut counter = SnapshotCounter::default(); - counter.increment(id, 2); + counter.increment(id.into(), 2).unwrap(); snapshot_repo.commit(&mut counter).await.unwrap(); - counter.increment(id, 3); + counter.increment(id.into(), 3).unwrap(); snapshot_repo.commit(&mut counter).await.unwrap(); let identity = StreamIdentity::new(SnapshotCounter::aggregate_type(), id).unwrap(); @@ -284,8 +256,8 @@ async fn async_snapshot_repository_ignores_invalid_cache_and_replays_events() { let id = "snapshot-counter-invalid"; let mut counter = SnapshotCounter::default(); - counter.increment(id, 4); - counter.increment(id, 6); + counter.increment(id.into(), 4).unwrap(); + counter.increment(id.into(), 6).unwrap(); aggregate_repo.commit(&mut counter).await.unwrap(); let identity = StreamIdentity::new(SnapshotCounter::aggregate_type(), id).unwrap(); @@ -316,7 +288,7 @@ async fn async_snapshot_repository_ignores_cache_past_stream_version_and_replays let id = "snapshot-counter-ahead"; let mut counter = SnapshotCounter::default(); - counter.increment(id, 4); + counter.increment(id.into(), 4).unwrap(); aggregate_repo.commit(&mut counter).await.unwrap(); let identity = StreamIdentity::new(SnapshotCounter::aggregate_type(), id).unwrap(); @@ -342,7 +314,7 @@ async fn async_outbox_repository_delegates_worker_operations() { let outbox = repo.outbox_store(); let message = OutboxMessage::create("msg-1", "Event", b"{}".to_vec()).unwrap(); let mut aggregate = AlphaAggregate::default(); - aggregate.touch("outbox-aggregate-1"); + aggregate.touch("outbox-aggregate-1".into()).unwrap(); repo.clone() .async_aggregate::() .outbox(message) diff --git a/tests/blob_game/aggregate.rs b/tests/blob_game/aggregate.rs index e225050..6d2190d 100644 --- a/tests/blob_game/aggregate.rs +++ b/tests/blob_game/aggregate.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; /// Tile states for the game grid #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -64,6 +64,7 @@ const FIVE_SECONDS_MS: u64 = 5 * 1000; const TIME_TO_END: u64 = FIVE_MINUTES_MS + FIVE_SECONDS_MS; #[allow(dead_code)] +#[sourced(entity)] impl BlobGame { pub fn new() -> Self { Self::default() @@ -191,8 +192,7 @@ impl BlobGame { } } - // Commands (digest events) - #[digest("Initialized")] + #[event("Initialized")] pub fn initialize( &mut self, id: String, @@ -210,7 +210,7 @@ impl BlobGame { } } - #[digest("NextLevelStarted", when = self.current_level_completed && !self.player_dead)] + #[event("NextLevelStarted", when = self.current_level_completed && !self.player_dead)] pub fn start_next_level(&mut self, map: Vec>) { self.current_level += 1; let level = Level { @@ -222,7 +222,7 @@ impl BlobGame { self.current_level_completed = false; } - #[digest("MovedUp", when = self.can_move_up())] + #[event("MovedUp", when = self.can_move_up())] pub fn up(&mut self, time: Option) { if let Some(pos) = self.player_position() { let new_pos = Coordinate { @@ -233,7 +233,7 @@ impl BlobGame { } } - #[digest("MovedDown", when = self.can_move_down())] + #[event("MovedDown", when = self.can_move_down())] pub fn down(&mut self, time: Option) { if let Some(pos) = self.player_position() { let new_pos = Coordinate { @@ -244,7 +244,7 @@ impl BlobGame { } } - #[digest("MovedLeft", when = self.can_move_left())] + #[event("MovedLeft", when = self.can_move_left())] pub fn left(&mut self, time: Option) { if let Some(pos) = self.player_position() { let new_pos = Coordinate { @@ -255,7 +255,7 @@ impl BlobGame { } } - #[digest("MovedRight", when = self.can_move_right())] + #[event("MovedRight", when = self.can_move_right())] pub fn right(&mut self, time: Option) { if let Some(pos) = self.player_position() { let new_pos = Coordinate { @@ -324,16 +324,6 @@ impl BlobGame { } } -// For replay, we need to handle the map serialization -sourced_rust::aggregate!(BlobGame, entity { - "Initialized"(id, address, minigame_id, timed, started_at) => initialize, - "NextLevelStarted"(map) => start_next_level, - "MovedUp"(time) => up, - "MovedDown"(time) => down, - "MovedLeft"(time) => left, - "MovedRight"(time) => right, -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BlobGameSnapshot { pub id: String, diff --git a/tests/bomberman/views.rs b/tests/bomberman/views.rs index df93cce..81564c7 100644 --- a/tests/bomberman/views.rs +++ b/tests/bomberman/views.rs @@ -30,9 +30,9 @@ pub struct ExplosionState { /// Whole-board view stored in the `boards` read-model table. #[derive(Clone, Debug, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "boards")] +#[table("boards")] pub struct BoardView { - #[readmodel(id)] + #[id] pub game_id: String, pub width: usize, pub height: usize, diff --git a/tests/distributed_read_model/checkout_saga_service/models/saga.rs b/tests/distributed_read_model/checkout_saga_service/models/saga.rs index 8ce1776..af94b0a 100644 --- a/tests/distributed_read_model/checkout_saga_service/models/saga.rs +++ b/tests/distributed_read_model/checkout_saga_service/models/saga.rs @@ -14,7 +14,7 @@ pub struct CheckoutSaga { pub reserved_seat_id: String, } -#[sourced(entity)] +#[sourced(entity, aggregate_type = "checkout_saga")] impl CheckoutSaga { #[event("CheckoutSagaStarted", when = !checkout_id.is_empty() && !seat_id.is_empty())] pub fn start(&mut self, checkout_id: String, seat_id: String, seat_category: String) { diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index ac34288..7504597 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -52,18 +52,17 @@ use sourced_rust::bus::Subscribable; use sourced_rust::microsvc::{self, Service, Session}; #[cfg(feature = "sqlite")] use sourced_rust::SqliteRepository; -#[cfg(any(feature = "sqlite", feature = "postgres"))] -use sourced_rust::{ - impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBuilderExt, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelWritePlanCommitExt, AsyncReadModelWritePlanStore, - AsyncRelationalReadModelQueryStore, AsyncTransactionalCommit, Entity, EventRecord, - OutboxMessage, ReadModelError, ReadModelWritePlanBuilder, RelationalReadModel, - RelationalReadModelIncludes, StreamIdentity, -}; use sourced_rust::{ AggregateBuilder, HashMapRepository, InMemoryQueue, InMemoryReadModelStore, OutboxWorkerThread, Queueable, ReadModelWritePlanStore, }; +#[cfg(any(feature = "sqlite", feature = "postgres"))] +use sourced_rust::{ + AsyncAggregateBuilder, AsyncCommitBuilderExt, AsyncGetStream, AsyncOutboxStore, + AsyncReadModelWritePlanStore, AsyncRelationalReadModelQueryStore, AsyncTransactionalCommit, + OutboxMessage, ReadModelError, ReadModelWritePlanBuilder, RelationalReadModel, + RelationalReadModelIncludes, +}; fn dispatch(service: &Service, command: &str, input: C) where @@ -107,41 +106,6 @@ fn wait_for_checkout_state( #[cfg(any(feature = "sqlite", feature = "postgres"))] static NEXT_ASYNC_FLOW_ID: AtomicU64 = AtomicU64::new(1); -#[cfg(any(feature = "sqlite", feature = "postgres"))] -#[derive(Default)] -struct ProjectionCheckpoint { - entity: Entity, - last_message_id: String, -} - -#[cfg(any(feature = "sqlite", feature = "postgres"))] -impl ProjectionCheckpoint { - fn mark_projected(&mut self, message_id: &str) { - if self.entity.id().is_empty() { - self.entity.set_id(CHECKOUT_SCREEN_CONSUMER); - } - self.last_message_id = message_id.to_string(); - self.entity - .digest("MessageProjected", &self.last_message_id) - .expect("projection checkpoint event should record"); - } - - fn replay(&mut self, event: &EventRecord) -> Result<(), String> { - if event.event_name == "MessageProjected" { - self.last_message_id = event.decode::().map_err(|err| err.to_string())?; - } - Ok(()) - } -} - -#[cfg(any(feature = "sqlite", feature = "postgres"))] -impl_aggregate!( - ProjectionCheckpoint, - entity, - replay, - aggregate_type = "distributed.checkout_projection_checkpoint" -); - #[cfg(any(feature = "sqlite", feature = "postgres"))] struct AsyncFlowIds { checkout_id: String, @@ -271,18 +235,6 @@ async fn run_async_persistent_checkout_flow( message.id() ); } - - let checkpoint_identity = StreamIdentity::new( - ProjectionCheckpoint::aggregate_type(), - CHECKOUT_SCREEN_CONSUMER, - ) - .expect("checkpoint identity should build"); - let checkpoint = read_repo - .get_stream(&checkpoint_identity) - .await - .expect("projection checkpoint should reload") - .expect("projection checkpoint should exist"); - assert_eq!(checkpoint.version(), 4); } #[cfg(any(feature = "sqlite", feature = "postgres"))] @@ -416,13 +368,7 @@ where #[cfg(any(feature = "sqlite", feature = "postgres"))] async fn project_message_async(repo: &R, message: &OutboxMessage) where - R: Clone - + AsyncGetStream - + AsyncReadModelWritePlanStore - + AsyncTransactionalCommit - + Send - + Sync - + 'static, + R: AsyncReadModelWritePlanStore + Send + Sync, { let mut read_models = ReadModelWritePlanBuilder::new(); @@ -510,19 +456,10 @@ where } read_models.mark_processed(CHECKOUT_SCREEN_CONSUMER, message.id()); - let mut checkpoint = repo - .clone() - .async_aggregate::() - .get(CHECKOUT_SCREEN_CONSUMER) - .await - .expect("projection checkpoint should load") - .unwrap_or_default(); - checkpoint.mark_projected(message.id()); - - repo.read_models(read_models) - .commit(&mut checkpoint) + read_models + .commit_async(repo) .await - .expect("projection read models should commit with checkpoint"); + .expect("projection read models should commit"); } #[cfg(any(feature = "sqlite", feature = "postgres"))] diff --git a/tests/distributed_read_model/read_models/checkout_step_view.rs b/tests/distributed_read_model/read_models/checkout_step_view.rs index c93b995..7d8250e 100644 --- a/tests/distributed_read_model/read_models/checkout_step_view.rs +++ b/tests/distributed_read_model/read_models/checkout_step_view.rs @@ -2,7 +2,8 @@ use serde::{Deserialize, Serialize}; use sourced_rust::ReadModel; #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "checkout_steps", primary_key = ["checkout_id", "step"])] +#[table("checkout_steps")] +#[readmodel(primary_key = ["checkout_id", "step"])] pub struct CheckoutStepView { #[readmodel( foreign_key = "checkouts.checkout_id", diff --git a/tests/distributed_read_model/read_models/checkout_view.rs b/tests/distributed_read_model/read_models/checkout_view.rs index f4ca71d..71cff23 100644 --- a/tests/distributed_read_model/read_models/checkout_view.rs +++ b/tests/distributed_read_model/read_models/checkout_view.rs @@ -4,9 +4,9 @@ use sourced_rust::ReadModel; use super::{CheckoutStepView, SeatView}; #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "checkouts")] +#[table("checkouts")] pub struct CheckoutView { - #[readmodel(id, column = "checkout_id")] + #[id("checkout_id")] pub checkout_id: String, pub seat_id: String, pub seat_category: String, diff --git a/tests/distributed_read_model/read_models/seat_view.rs b/tests/distributed_read_model/read_models/seat_view.rs index 8419020..23644b1 100644 --- a/tests/distributed_read_model/read_models/seat_view.rs +++ b/tests/distributed_read_model/read_models/seat_view.rs @@ -2,9 +2,9 @@ use serde::{Deserialize, Serialize}; use sourced_rust::ReadModel; #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "seats")] +#[table("seats")] pub struct SeatView { - #[readmodel(id, column = "seat_id")] + #[id("seat_id")] pub seat_id: String, pub category: String, pub status: String, diff --git a/tests/distributed_read_model/seat_inventory_service/models/seat.rs b/tests/distributed_read_model/seat_inventory_service/models/seat.rs index a1e2ac8..e975f5d 100644 --- a/tests/distributed_read_model/seat_inventory_service/models/seat.rs +++ b/tests/distributed_read_model/seat_inventory_service/models/seat.rs @@ -13,7 +13,7 @@ pub struct Seat { pub checkout_id: String, } -#[sourced(entity)] +#[sourced(entity, aggregate_type = "seat")] impl Seat { #[event("SeatAdded", when = !seat_id.is_empty() && !category.is_empty())] pub fn add(&mut self, seat_id: String, category: String) { diff --git a/tests/distributed_read_model_board/read_models/board_view.rs b/tests/distributed_read_model_board/read_models/board_view.rs index 071cb29..d32e06f 100644 --- a/tests/distributed_read_model_board/read_models/board_view.rs +++ b/tests/distributed_read_model_board/read_models/board_view.rs @@ -6,9 +6,9 @@ use super::CardView; /// Board header row. `has_many` cards and a `source_version` guard for /// out-of-order delivery. #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "boards")] +#[table("boards")] pub struct BoardView { - #[readmodel(id, column = "board_id")] + #[id("board_id")] pub board_id: String, pub name: String, pub source_version: i64, diff --git a/tests/distributed_read_model_board/read_models/card_view.rs b/tests/distributed_read_model_board/read_models/card_view.rs index a712d9d..830e45e 100644 --- a/tests/distributed_read_model_board/read_models/card_view.rs +++ b/tests/distributed_read_model_board/read_models/card_view.rs @@ -14,7 +14,8 @@ pub struct CardPayload { /// delegated foreign key from the board; `payload` is a JSONB column; `board` /// is a `belongs_to` include. #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "cards", primary_key = ["board_id", "card_id"])] +#[table("cards")] +#[readmodel(primary_key = ["board_id", "card_id"])] pub struct CardView { #[readmodel(foreign_key = "boards.board_id", delegated_from = "BoardView.board_id")] pub board_id: String, diff --git a/tests/persistent_repository_conformance/read_models.rs b/tests/persistent_repository_conformance/read_models.rs index df076ee..e64c69c 100644 --- a/tests/persistent_repository_conformance/read_models.rs +++ b/tests/persistent_repository_conformance/read_models.rs @@ -13,9 +13,9 @@ use super::scenario::unique_id; use super::seat::Seat; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "conformance_seat_views")] +#[table("conformance_seat_views")] struct SeatView { - #[readmodel(id)] + #[id] id: String, status: String, } diff --git a/tests/postgres_repository/main.rs b/tests/postgres_repository/main.rs index 5bf3377..9be61f8 100644 --- a/tests/postgres_repository/main.rs +++ b/tests/postgres_repository/main.rs @@ -9,11 +9,11 @@ use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelWritePlanCommitExt, AsyncReadModelWritePlanStore, - AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, Entity, EventRecord, - OutboxMessageStatus, PostgresRepository, ReadModel, ReadModelWritePlanBuilder, RepositoryError, - RowKey, RowPatch, RowValue, SnapshotRecord, StreamIdentity, TableSchemaRegistry, + sourced, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, AsyncOutboxStore, + AsyncReadModelWritePlanCommitExt, AsyncReadModelWritePlanStore, AsyncSnapshotStore, + AsyncStreamWrite, AsyncTransactionalCommit, Entity, OutboxMessageStatus, PostgresRepository, + ReadModel, ReadModelWritePlanBuilder, RepositoryError, RowKey, RowPatch, RowValue, + SnapshotRecord, StreamIdentity, TableSchemaRegistry, }; static NEXT_ID: AtomicU64 = AtomicU64::new(1); @@ -24,51 +24,32 @@ struct Counter { value: i32, } +#[sourced(entity, aggregate_type = "postgres.counter")] impl Counter { - fn increment(&mut self, id: &str, by: i32) { - self.entity.set_id(id); - self.entity.digest("Incremented", &by).unwrap(); + #[event("Incremented")] + fn increment(&mut self, id: String, by: i32) { + self.entity.set_id(&id); self.value += by; } - - fn replay(&mut self, event: &EventRecord) -> Result<(), String> { - if event.event_name == "Incremented" { - let by = event.decode::().map_err(|err| err.to_string())?; - self.value += by; - } - Ok(()) - } } -impl_aggregate!(Counter, entity, replay, aggregate_type = "postgres.counter"); - #[derive(Default)] struct CounterProjection { entity: Entity, } +#[sourced(entity, aggregate_type = "postgres.counter_projection")] impl CounterProjection { - fn touch(&mut self, id: &str) { - self.entity.set_id(id); - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) + #[event("Touched")] + fn touch(&mut self, id: String) { + self.entity.set_id(&id); } } -impl_aggregate!( - CounterProjection, - entity, - replay, - aggregate_type = "postgres.counter_projection" -); - #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "postgres_relational_counter_views")] +#[table("postgres_relational_counter_views")] struct RelationalCounterView { - #[readmodel(id)] + #[id] id: String, value: i64, #[readmodel(jsonb)] @@ -189,8 +170,8 @@ async fn aggregate_stream_round_trips_with_metadata() { let mut counter = Counter::default(); counter.entity.set_correlation_id("corr-postgres"); - counter.increment(&id, 2); - counter.increment(&id, 3); + counter.increment(id.clone(), 2).unwrap(); + counter.increment(id.clone(), 3).unwrap(); counter_repo.commit(&mut counter).await.unwrap(); @@ -215,17 +196,17 @@ async fn optimistic_conflict_rolls_back_other_stream_and_snapshot() { let other_id = unique_id("rollback"); let mut original = Counter::default(); - original.increment(&counter_id, 1); + original.increment(counter_id.clone(), 1).unwrap(); counter_repo.commit(&mut original).await.unwrap(); let mut stale = counter_repo.get(&counter_id).await.unwrap().unwrap(); let mut winner = counter_repo.get(&counter_id).await.unwrap().unwrap(); - stale.increment(&counter_id, 10); - winner.increment(&counter_id, 20); + stale.increment(counter_id.clone(), 10).unwrap(); + winner.increment(counter_id.clone(), 20).unwrap(); counter_repo.commit(&mut winner).await.unwrap(); let mut other = CounterProjection::default(); - other.touch(&other_id); + other.touch(other_id.clone()).unwrap(); let stale_identity = StreamIdentity::new(Counter::aggregate_type(), &counter_id).unwrap(); let other_identity = @@ -310,7 +291,7 @@ async fn commit_batch_lowers_relational_read_model_plan_into_registered_table() let mut session = ReadModelWritePlanBuilder::new(); session.upsert(&view).unwrap(); let mut projection = CounterProjection::default(); - projection.touch(&id); + projection.touch(id.clone()).unwrap(); let identity = StreamIdentity::new(CounterProjection::aggregate_type(), &id).unwrap(); repo.read_models(session) diff --git a/tests/read_model_commit_bridge/main.rs b/tests/read_model_commit_bridge/main.rs index ed504ef..6c9dd4e 100644 --- a/tests/read_model_commit_bridge/main.rs +++ b/tests/read_model_commit_bridge/main.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Entity, EventRecord, HashMapRepository, ReadModel, ReadModelWorkspaceExt, + sourced, Entity, HashMapRepository, ReadModel, ReadModelWorkspaceExt, ReadModelWritePlanBuilder, RowKey, RowValue, SyncReadModelWritePlanCommitExt, }; @@ -9,25 +9,20 @@ struct TestAggregate { entity: Entity, } +#[sourced(entity)] impl TestAggregate { + #[event("Touched")] fn touch(&mut self) { if self.entity.id().is_empty() { self.entity.set_id("agg-1"); } - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) } } -impl_aggregate!(TestAggregate, entity, replay); - #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "bridge_views")] +#[table("bridge_views")] struct BridgeView { - #[readmodel(id)] + #[id] id: String, value: i32, } @@ -42,7 +37,7 @@ fn repo_first_read_models_session_commit_form_is_available() { let mut session = ReadModelWritePlanBuilder::new(); session.upsert(&view).unwrap(); let mut aggregate = TestAggregate::default(); - aggregate.touch(); + aggregate.touch().unwrap(); repo.read_models_sync(session) .commit_sync(&mut aggregate) diff --git a/tests/read_model_metadata/main.rs b/tests/read_model_metadata/main.rs index 5b4dd84..2d9fea1 100644 --- a/tests/read_model_metadata/main.rs +++ b/tests/read_model_metadata/main.rs @@ -7,9 +7,9 @@ use sourced_rust::{ }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "account_summaries")] +#[table("account_summaries")] struct AccountSummary { - #[readmodel(id, column = "account_id")] + #[id("account_id")] account_id: String, #[index] owner: Option, @@ -23,9 +23,9 @@ struct AccountSummary { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "players")] +#[table("players")] struct Player { - #[readmodel(id, column = "player_id")] + #[id("player_id")] player_id: String, display_name: String, #[readmodel(has_many = "PlayerWeapon", foreign_key = "player_id")] @@ -33,7 +33,8 @@ struct Player { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "player_weapons", primary_key = ["player_id", "weapon_id"])] +#[table("player_weapons")] +#[readmodel(primary_key = ["player_id", "weapon_id"])] struct PlayerWeapon { #[readmodel(foreign_key = "players.player_id", delegated_from = "Player.player_id")] player_id: String, @@ -219,7 +220,7 @@ fn derive_represents_relationships_composite_keys_and_delegated_foreign_keys() { #[test] fn metadata_validation_reports_missing_keys_before_storage_writes() { #[derive(Clone, Debug, Serialize, Deserialize, ReadModel)] - #[readmodel(table = "missing_key_models")] + #[table("missing_key_models")] struct MissingKeyModel { value: String, } diff --git a/tests/read_model_relationship_includes/main.rs b/tests/read_model_relationship_includes/main.rs index 70193ea..edb5476 100644 --- a/tests/read_model_relationship_includes/main.rs +++ b/tests/read_model_relationship_includes/main.rs @@ -7,9 +7,9 @@ use sourced_rust::{ }; #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "players")] +#[table("players")] struct Player { - #[readmodel(id, column = "player_id")] + #[id("player_id")] player_id: String, display_name: String, #[readmodel(has_many = "PlayerWeapon", foreign_key = "player_id")] @@ -17,7 +17,8 @@ struct Player { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "player_weapons", primary_key = ["player_id", "weapon_id"])] +#[table("player_weapons")] +#[readmodel(primary_key = ["player_id", "weapon_id"])] struct PlayerWeapon { #[readmodel(foreign_key = "players.player_id", delegated_from = "Player.player_id")] player_id: String, @@ -28,9 +29,9 @@ struct PlayerWeapon { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "players_with_many")] +#[table("players_with_many")] struct PlayerWithMany { - #[readmodel(id, column = "player_id")] + #[id("player_id")] player_id: String, #[readmodel( many_to_many = "Weapon", @@ -41,16 +42,16 @@ struct PlayerWithMany { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "weapons")] +#[table("weapons")] struct Weapon { - #[readmodel(id, column = "weapon_id")] + #[id("weapon_id")] weapon_id: String, } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "weapon_label_refs")] +#[table("weapon_label_refs")] struct WeaponLabelRef { - #[readmodel(id, column = "ref_id")] + #[id("ref_id")] ref_id: String, player_id: String, #[readmodel(belongs_to = "CompositeWeaponLabel", foreign_key = "player_id")] @@ -58,7 +59,8 @@ struct WeaponLabelRef { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "weapon_labels", primary_key = ["player_id", "weapon_id"])] +#[table("weapon_labels")] +#[readmodel(primary_key = ["player_id", "weapon_id"])] struct CompositeWeaponLabel { player_id: String, weapon_id: String, diff --git a/tests/read_model_schema_bootstrap/main.rs b/tests/read_model_schema_bootstrap/main.rs index 9d04a39..0eb55bc 100644 --- a/tests/read_model_schema_bootstrap/main.rs +++ b/tests/read_model_schema_bootstrap/main.rs @@ -9,9 +9,9 @@ use sourced_rust::{ }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "account_summaries")] +#[table("account_summaries")] struct AccountSummary { - #[readmodel(id, column = "account_id")] + #[id("account_id")] account_id: String, #[unique] owner_slug: String, @@ -23,9 +23,9 @@ struct AccountSummary { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "players")] +#[table("players")] struct Player { - #[readmodel(id, column = "player_id")] + #[id("player_id")] player_id: String, display_name: String, #[readmodel(has_many = "PlayerWeapon", foreign_key = "player_id")] @@ -33,7 +33,8 @@ struct Player { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "player_weapons", primary_key = ["player_id", "weapon_id"])] +#[table("player_weapons")] +#[readmodel(primary_key = ["player_id", "weapon_id"])] struct PlayerWeapon { #[readmodel(foreign_key = "players.player_id", delegated_from = "Player.player_id")] player_id: String, diff --git a/tests/read_model_session/main.rs b/tests/read_model_session/main.rs index 901ea8b..805b83d 100644 --- a/tests/read_model_session/main.rs +++ b/tests/read_model_session/main.rs @@ -8,9 +8,9 @@ use sourced_rust::{ }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "account_summaries")] +#[table("account_summaries")] struct AccountSummary { - #[readmodel(id, column = "account_id")] + #[id("account_id")] account_id: String, #[index] owner: Option, @@ -37,9 +37,9 @@ impl AccountSummary { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "players")] +#[table("players")] struct Player { - #[readmodel(id, column = "player_id")] + #[id("player_id")] player_id: String, display_name: String, #[readmodel(has_many = "PlayerWeapon", foreign_key = "player_id")] @@ -47,7 +47,8 @@ struct Player { } #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "player_weapons", primary_key = ["player_id", "weapon_id"])] +#[table("player_weapons")] +#[readmodel(primary_key = ["player_id", "weapon_id"])] struct PlayerWeapon { #[readmodel(foreign_key = "players.player_id", delegated_from = "Player.player_id")] player_id: String, diff --git a/tests/read_models/aggregate.rs b/tests/read_models/aggregate.rs index 09f28de..61ddc90 100644 --- a/tests/read_models/aggregate.rs +++ b/tests/read_models/aggregate.rs @@ -1,6 +1,6 @@ //! Simple Counter aggregate for testing read models. -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; #[derive(Default)] pub struct Counter { @@ -10,12 +10,13 @@ pub struct Counter { value: i32, } +#[sourced(entity)] impl Counter { pub fn new() -> Self { Self::default() } - #[digest("CounterCreated")] + #[event("CounterCreated")] pub fn create(&mut self, id: String, name: String, user_id: String) { self.entity.set_id(&id); self.name = name; @@ -23,12 +24,12 @@ impl Counter { self.value = 0; } - #[digest("CounterIncremented")] + #[event("CounterIncremented")] pub fn increment(&mut self, amount: i32) { self.value += amount; } - #[digest("CounterDecremented")] + #[event("CounterDecremented")] pub fn decrement(&mut self, amount: i32) { self.value -= amount; } @@ -37,9 +38,3 @@ impl Counter { self.value } } - -sourced_rust::aggregate!(Counter, entity { - "CounterCreated"(id, name, user_id) => create, - "CounterIncremented"(amount) => increment, - "CounterDecremented"(amount) => decrement, -}); diff --git a/tests/sagas/order/inventory.rs b/tests/sagas/order/inventory.rs index 378dbd6..8524dc4 100644 --- a/tests/sagas/order/inventory.rs +++ b/tests/sagas/order/inventory.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; use std::collections::HashMap; /// Inventory aggregate - tracks stock levels and reservations @@ -13,6 +13,7 @@ pub struct Inventory { } #[allow(dead_code)] +#[sourced(entity)] impl Inventory { pub fn new() -> Self { Self::default() @@ -38,21 +39,21 @@ impl Inventory { self.reservations.get(order_id).copied() } - #[digest("InventoryInitialized")] + #[event("InventoryInitialized")] pub fn initialize(&mut self, sku: String, initial_stock: u32) { self.entity.set_id(&sku); self.sku = sku; self.available = initial_stock; } - #[digest("StockReserved", when = self.can_reserve(quantity))] + #[event("StockReserved", when = self.can_reserve(quantity))] pub fn reserve(&mut self, order_id: String, quantity: u32) { self.available -= quantity; self.reserved += quantity; self.reservations.insert(order_id, quantity); } - #[digest("ReservationReleased", when = self.reservations.contains_key(&order_id))] + #[event("ReservationReleased", when = self.reservations.contains_key(&order_id))] pub fn release_reservation(&mut self, order_id: String) { if let Some(quantity) = self.reservations.remove(&order_id) { self.available += quantity; @@ -60,7 +61,7 @@ impl Inventory { } } - #[digest("ReservationCommitted", when = self.reservations.contains_key(&order_id))] + #[event("ReservationCommitted", when = self.reservations.contains_key(&order_id))] pub fn commit_reservation(&mut self, order_id: String) { if let Some(quantity) = self.reservations.remove(&order_id) { self.reserved -= quantity; @@ -78,13 +79,6 @@ impl Inventory { } } -sourced_rust::aggregate!(Inventory, entity { - "InventoryInitialized"(sku, initial_stock) => initialize, - "StockReserved"(order_id, quantity) => reserve, - "ReservationReleased"(order_id) => release_reservation, - "ReservationCommitted"(order_id) => commit_reservation, -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct InventorySnapshot { pub sku: String, diff --git a/tests/sagas/order/order_aggregate.rs b/tests/sagas/order/order_aggregate.rs index 8207409..869e6cd 100644 --- a/tests/sagas/order/order_aggregate.rs +++ b/tests/sagas/order/order_aggregate.rs @@ -1,6 +1,6 @@ use bitcode::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; /// Represents a line item in an order #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Encode, Decode)] @@ -33,6 +33,7 @@ pub struct Order { } #[allow(dead_code)] +#[sourced(entity)] impl Order { pub fn new() -> Self { Self::default() @@ -58,7 +59,7 @@ impl Order { self.failure_reason.as_deref() } - #[digest("OrderCreated")] + #[event("OrderCreated")] pub fn create(&mut self, id: String, customer_id: String, items: Vec) { self.entity.set_id(&id); self.customer_id = customer_id; @@ -67,22 +68,22 @@ impl Order { self.status = OrderStatus::Pending; } - #[digest("InventoryReserved", when = self.status == OrderStatus::Pending)] + #[event("InventoryReserved", when = self.status == OrderStatus::Pending)] pub fn mark_inventory_reserved(&mut self) { self.status = OrderStatus::InventoryReserved; } - #[digest("PaymentProcessed", when = self.status == OrderStatus::InventoryReserved)] + #[event("PaymentProcessed", when = self.status == OrderStatus::InventoryReserved)] pub fn mark_payment_processed(&mut self) { self.status = OrderStatus::PaymentProcessed; } - #[digest("OrderCompleted", when = self.status == OrderStatus::PaymentProcessed)] + #[event("OrderCompleted", when = self.status == OrderStatus::PaymentProcessed)] pub fn complete(&mut self) { self.status = OrderStatus::Completed; } - #[digest("OrderCancelled", when = self.status != OrderStatus::Completed && self.status != OrderStatus::Cancelled)] + #[event("OrderCancelled", when = self.status != OrderStatus::Completed && self.status != OrderStatus::Cancelled)] pub fn cancel(&mut self, reason: String) { self.status = OrderStatus::Cancelled; self.failure_reason = Some(reason); @@ -100,14 +101,6 @@ impl Order { } } -sourced_rust::aggregate!(Order, entity { - "OrderCreated"(id, customer_id, items) => create, - "InventoryReserved"() => mark_inventory_reserved(), - "PaymentProcessed"() => mark_payment_processed(), - "OrderCompleted"() => complete(), - "OrderCancelled"(reason) => cancel, -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrderSnapshot { pub id: String, diff --git a/tests/sagas/order/payment.rs b/tests/sagas/order/payment.rs index 8263fa0..a62f716 100644 --- a/tests/sagas/order/payment.rs +++ b/tests/sagas/order/payment.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; /// Payment status #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] @@ -24,6 +24,7 @@ pub struct Payment { } #[allow(dead_code)] +#[sourced(entity)] impl Payment { pub fn new() -> Self { Self::default() @@ -52,7 +53,7 @@ impl Payment { self.failure_reason.as_deref() } - #[digest("PaymentInitiated")] + #[event("PaymentInitiated")] pub fn initiate(&mut self, id: String, order_id: String, amount_cents: u32) { self.entity.set_id(&id); self.order_id = order_id; @@ -60,24 +61,24 @@ impl Payment { self.status = PaymentStatus::Pending; } - #[digest("PaymentAuthorized", when = self.status == PaymentStatus::Pending)] + #[event("PaymentAuthorized", when = self.status == PaymentStatus::Pending)] pub fn authorize(&mut self, transaction_id: String) { self.status = PaymentStatus::Authorized; self.transaction_id = Some(transaction_id); } - #[digest("PaymentCaptured", when = self.status == PaymentStatus::Authorized)] + #[event("PaymentCaptured", when = self.status == PaymentStatus::Authorized)] pub fn capture(&mut self) { self.status = PaymentStatus::Captured; } - #[digest("PaymentFailed", when = self.status == PaymentStatus::Pending || self.status == PaymentStatus::Authorized)] + #[event("PaymentFailed", when = self.status == PaymentStatus::Pending || self.status == PaymentStatus::Authorized)] pub fn fail(&mut self, reason: String) { self.status = PaymentStatus::Failed; self.failure_reason = Some(reason); } - #[digest("PaymentRefunded", when = self.status == PaymentStatus::Captured)] + #[event("PaymentRefunded", when = self.status == PaymentStatus::Captured)] pub fn refund(&mut self) { self.status = PaymentStatus::Refunded; } @@ -94,14 +95,6 @@ impl Payment { } } -sourced_rust::aggregate!(Payment, entity { - "PaymentInitiated"(id, order_id, amount_cents) => initiate, - "PaymentAuthorized"(transaction_id) => authorize, - "PaymentCaptured"() => capture(), - "PaymentFailed"(reason) => fail, - "PaymentRefunded"() => refund(), -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PaymentSnapshot { pub id: String, diff --git a/tests/sagas/order/saga.rs b/tests/sagas/order/saga.rs index cd0d043..342060b 100644 --- a/tests/sagas/order/saga.rs +++ b/tests/sagas/order/saga.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; use super::OrderItem; @@ -48,6 +48,7 @@ pub struct OrderFulfillmentSaga { } #[allow(dead_code)] +#[sourced(entity)] impl OrderFulfillmentSaga { pub fn new() -> Self { Self::default() @@ -95,7 +96,7 @@ impl OrderFulfillmentSaga { // === Saga Commands === - #[digest("SagaStarted")] + #[event("SagaStarted")] pub fn start( &mut self, saga_id: String, @@ -112,42 +113,42 @@ impl OrderFulfillmentSaga { self.status = SagaStatus::Started; } - #[digest("InventoryReservationSucceeded", when = self.status == SagaStatus::Started)] + #[event("InventoryReservationSucceeded", when = self.status == SagaStatus::Started)] pub fn inventory_reserved(&mut self) { self.status = SagaStatus::InventoryReserved; self.compensation.inventory_reserved = true; } - #[digest("PaymentSucceeded", when = self.status == SagaStatus::InventoryReserved)] + #[event("PaymentSucceeded", when = self.status == SagaStatus::InventoryReserved)] pub fn payment_succeeded(&mut self) { self.status = SagaStatus::PaymentProcessed; self.compensation.payment_processed = true; } - #[digest("SagaCompleted", when = self.status == SagaStatus::PaymentProcessed)] + #[event("SagaCompleted", when = self.status == SagaStatus::PaymentProcessed)] pub fn complete(&mut self) { self.status = SagaStatus::Completed; } // === Failure and Compensation === - #[digest("StepFailed", when = !self.is_complete())] + #[event("StepFailed", when = !self.is_complete())] pub fn step_failed(&mut self, step: String, reason: String) { self.status = SagaStatus::Compensating; self.failure_reason = Some(format!("{}: {}", step, reason)); } - #[digest("InventoryCompensated", when = self.needs_inventory_compensation())] + #[event("InventoryCompensated", when = self.needs_inventory_compensation())] pub fn inventory_compensated(&mut self) { self.compensation.inventory_reserved = false; } - #[digest("PaymentCompensated", when = self.needs_payment_compensation())] + #[event("PaymentCompensated", when = self.needs_payment_compensation())] pub fn payment_compensated(&mut self) { self.compensation.payment_processed = false; } - #[digest("SagaFailed", when = self.status == SagaStatus::Compensating && !self.compensation.inventory_reserved && !self.compensation.payment_processed)] + #[event("SagaFailed", when = self.status == SagaStatus::Compensating && !self.compensation.inventory_reserved && !self.compensation.payment_processed)] pub fn mark_failed(&mut self) { self.status = SagaStatus::Failed; } @@ -166,17 +167,6 @@ impl OrderFulfillmentSaga { } } -sourced_rust::aggregate!(OrderFulfillmentSaga, entity { - "SagaStarted"(saga_id, order_id, customer_id, items, total_cents) => start, - "InventoryReservationSucceeded"() => inventory_reserved(), - "PaymentSucceeded"() => payment_succeeded(), - "SagaCompleted"() => complete(), - "StepFailed"(step, reason) => step_failed, - "InventoryCompensated"() => inventory_compensated(), - "PaymentCompensated"() => payment_compensated(), - "SagaFailed"() => mark_failed(), -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrderFulfillmentSagaSnapshot { pub id: String, diff --git a/tests/snapshots/aggregate.rs b/tests/snapshots/aggregate.rs index b231f46..79dde14 100644 --- a/tests/snapshots/aggregate.rs +++ b/tests/snapshots/aggregate.rs @@ -1,4 +1,4 @@ -use sourced_rust::{digest, Entity, Snapshot}; +use sourced_rust::{sourced, Entity, Snapshot}; #[derive(Default, Snapshot)] pub struct Todo { @@ -8,25 +8,21 @@ pub struct Todo { pub completed: bool, } +#[sourced(entity)] impl Todo { pub fn new() -> Self { Self::default() } - #[digest("Initialized")] + #[event("Initialized")] pub fn initialize(&mut self, id: String, user_id: String, task: String) { self.entity.set_id(&id); self.user_id = user_id; self.task = task; } - #[digest("Completed", when = !self.completed)] + #[event("Completed", when = !self.completed)] pub fn complete(&mut self) { self.completed = true; } } - -sourced_rust::aggregate!(Todo, entity { - "Initialized"(id, user_id, task) => initialize, - "Completed"() => complete(), -}); diff --git a/tests/snapshots/main.rs b/tests/snapshots/main.rs index a069717..057e9f4 100644 --- a/tests/snapshots/main.rs +++ b/tests/snapshots/main.rs @@ -3,8 +3,8 @@ mod aggregate; use aggregate::Todo; use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Aggregate, AggregateBuilder, Entity, EventRecord, HashMapRepository, Queueable, - SnapshotRecord, SnapshotStore, Snapshottable, + sourced, Aggregate, AggregateBuilder, Entity, HashMapRepository, Queueable, SnapshotRecord, + SnapshotStore, Snapshottable, }; #[derive(Default)] @@ -13,30 +13,17 @@ struct ReplayCounter { total: i32, } +#[sourced(entity, aggregate_type = "snapshot.replay_counter")] impl ReplayCounter { - fn add(&mut self, id: &str, amount: i32) { + #[event("Added")] + fn add(&mut self, id: String, amount: i32) { if self.entity.id().is_empty() { - self.entity.set_id(id); + self.entity.set_id(&id); } - self.entity.digest("Added", &amount).unwrap(); self.total += amount; } - - fn replay(&mut self, event: &EventRecord) -> Result<(), String> { - if event.event_name == "Added" { - self.total += event.decode::().map_err(|err| err.to_string())?; - } - Ok(()) - } } -impl_aggregate!( - ReplayCounter, - entity, - replay, - aggregate_type = "snapshot.replay_counter" -); - #[derive(Serialize, Deserialize)] struct ReplayCounterSnapshot { id: String, @@ -176,7 +163,7 @@ fn snapshot_hydration_replays_every_event_after_snapshot_version() { .with_snapshots(100); let mut counter = ReplayCounter::default(); - counter.add("counter-1", 10); + counter.add("counter-1".into(), 10).unwrap(); full_replay_repo.commit(&mut counter).unwrap(); let payload = bitcode::serialize(&ReplayCounterSnapshot { @@ -196,11 +183,11 @@ fn snapshot_hydration_replays_every_event_after_snapshot_version() { .unwrap(); let mut counter = snapshot_repo.get("counter-1").unwrap().unwrap(); - counter.add("counter-1", 5); + counter.add("counter-1".into(), 5).unwrap(); snapshot_repo.commit(&mut counter).unwrap(); let mut counter = snapshot_repo.get("counter-1").unwrap().unwrap(); - counter.add("counter-1", 7); + counter.add("counter-1".into(), 7).unwrap(); snapshot_repo.commit(&mut counter).unwrap(); let loaded = snapshot_repo.get("counter-1").unwrap().unwrap(); diff --git a/tests/sourced_snapshot/aggregates.rs b/tests/sourced_snapshot/aggregates.rs index 3c8b68c..a76d88b 100644 --- a/tests/sourced_snapshot/aggregates.rs +++ b/tests/sourced_snapshot/aggregates.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity, Snapshot}; +use sourced_rust::{sourced, Entity, Snapshot}; // ============================================================================ // Default case: id + all fields @@ -13,29 +13,25 @@ pub struct Todo { pub completed: bool, } +#[sourced(entity)] impl Todo { pub fn new() -> Self { Self::default() } - #[digest("Initialized")] + #[event("Initialized")] pub fn initialize(&mut self, id: String, user_id: String, task: String) { self.entity.set_id(&id); self.user_id = user_id; self.task = task; } - #[digest("Completed", when = !self.completed)] + #[event("Completed", when = !self.completed)] pub fn complete(&mut self) { self.completed = true; } } -sourced_rust::aggregate!(Todo, entity { - "Initialized"(id, user_id, task) => initialize, - "Completed"() => complete(), -}); - // ============================================================================ // Custom ID key: snapshot(id = "sku") // ============================================================================ @@ -48,29 +44,25 @@ pub struct Inventory { pub available: u32, } +#[sourced(entity)] impl Inventory { pub fn new() -> Self { Self::default() } - #[digest("Created")] + #[event("Created")] pub fn create(&mut self, id: String, sku: String, available: u32) { self.entity.set_id(&id); self.sku = sku; self.available = available; } - #[digest("Restocked")] + #[event("Restocked")] pub fn restock(&mut self, qty: u32) { self.available += qty; } } -sourced_rust::aggregate!(Inventory, entity { - "Created"(id, sku, available) => create, - "Restocked"(qty) => restock, -}); - // ============================================================================ // serde(skip) field exclusion // ============================================================================ @@ -84,12 +76,13 @@ pub struct Order { pub cached_label: String, } +#[sourced(entity)] impl Order { pub fn new() -> Self { Self::default() } - #[digest("Placed")] + #[event("Placed")] pub fn place(&mut self, id: String, customer: String, total: u64) { self.entity.set_id(&id); self.customer = customer; @@ -98,10 +91,6 @@ impl Order { } } -sourced_rust::aggregate!(Order, entity { - "Placed"(id, customer, total) => place, -}); - // ============================================================================ // Works with #[sourced(entity)] on impl // ============================================================================ @@ -141,12 +130,13 @@ pub struct Widget { pub weight: f64, } +#[sourced(my_entity)] impl Widget { pub fn new() -> Self { Self::default() } - #[digest(my_entity, "Created")] + #[event("Created")] pub fn create(&mut self, id: String, name: String, weight: f64) { self.my_entity.set_id(&id); self.name = name; @@ -154,10 +144,6 @@ impl Widget { } } -sourced_rust::aggregate!(Widget, my_entity { - "Created"(id, name, weight) => create, -}); - // ============================================================================ // serde(skip, default) field exclusion (EntityEmitter pattern) // ============================================================================ @@ -174,18 +160,15 @@ pub struct Notifier { pub emitter: DummyEmitter, } +#[sourced(entity)] impl Notifier { pub fn new() -> Self { Self::default() } - #[digest("Sent")] + #[event("Sent")] pub fn send(&mut self, id: String, message: String) { self.entity.set_id(&id); self.message = message; } } - -sourced_rust::aggregate!(Notifier, entity { - "Sent"(id, message) => send, -}); diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index ea1e83b..25b7046 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -4,12 +4,11 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use sourced_rust::{ - impl_aggregate, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, - AsyncOutboxStore, AsyncReadModelWritePlanCommitExt, AsyncReadModelWritePlanStore, - AsyncSnapshotStore, AsyncStreamWrite, AsyncTransactionalCommit, Entity, EventRecord, - OutboxMessageStatus, ReadModel, ReadModelWritePlanBuilder, RepositoryError, RowKey, RowPatch, - RowValue, SnapshotRecord, SqliteRepository, StreamIdentity, TableSchemaRegistry, - OUTBOX_MESSAGES_TABLE, + sourced, Aggregate, AsyncAggregateBuilder, AsyncCommitBatch, AsyncGetStream, AsyncOutboxStore, + AsyncReadModelWritePlanCommitExt, AsyncReadModelWritePlanStore, AsyncSnapshotStore, + AsyncStreamWrite, AsyncTransactionalCommit, Entity, OutboxMessageStatus, ReadModel, + ReadModelWritePlanBuilder, RepositoryError, RowKey, RowPatch, RowValue, SnapshotRecord, + SqliteRepository, StreamIdentity, TableSchemaRegistry, OUTBOX_MESSAGES_TABLE, }; #[derive(Default)] @@ -18,51 +17,32 @@ struct Counter { value: i32, } +#[sourced(entity, aggregate_type = "sqlite.counter")] impl Counter { - fn increment(&mut self, id: &str, by: i32) { - self.entity.set_id(id); - self.entity.digest("Incremented", &by).unwrap(); + #[event("Incremented")] + fn increment(&mut self, id: String, by: i32) { + self.entity.set_id(&id); self.value += by; } - - fn replay(&mut self, event: &EventRecord) -> Result<(), String> { - if event.event_name == "Incremented" { - let by = event.decode::().map_err(|err| err.to_string())?; - self.value += by; - } - Ok(()) - } } -impl_aggregate!(Counter, entity, replay, aggregate_type = "sqlite.counter"); - #[derive(Default)] struct CounterProjection { entity: Entity, } +#[sourced(entity, aggregate_type = "sqlite.counter_projection")] impl CounterProjection { - fn touch(&mut self, id: &str) { - self.entity.set_id(id); - self.entity.digest_empty("Touched").unwrap(); - } - - fn replay(&mut self, _event: &EventRecord) -> Result<(), String> { - Ok(()) + #[event("Touched")] + fn touch(&mut self, id: String) { + self.entity.set_id(&id); } } -impl_aggregate!( - CounterProjection, - entity, - replay, - aggregate_type = "sqlite.counter_projection" -); - #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ReadModel)] -#[readmodel(table = "local_relational_counter_views")] +#[table("local_relational_counter_views")] struct RelationalCounterView { - #[readmodel(id)] + #[id] id: String, value: i64, #[readmodel(jsonb)] @@ -95,8 +75,8 @@ async fn migration_is_idempotent_and_aggregate_stream_round_trips() { let mut counter = Counter::default(); counter.entity.set_correlation_id("corr-1"); - counter.increment("counter-1", 2); - counter.increment("counter-1", 3); + counter.increment("counter-1".into(), 2).unwrap(); + counter.increment("counter-1".into(), 3).unwrap(); counter_repo.commit(&mut counter).await.unwrap(); @@ -147,9 +127,9 @@ async fn aggregate_stream_identity_separates_same_id_across_types() { let projection_repo = repo.clone().async_aggregate::(); let mut counter = Counter::default(); - counter.increment("shared-id", 7); + counter.increment("shared-id".into(), 7).unwrap(); let mut projection = CounterProjection::default(); - projection.touch("shared-id"); + projection.touch("shared-id".into()).unwrap(); counter_repo.commit(&mut counter).await.unwrap(); projection_repo.commit(&mut projection).await.unwrap(); @@ -169,17 +149,17 @@ async fn optimistic_conflict_rolls_back_other_stream_and_read_model_plan() { let counter_repo = repo.clone().async_aggregate::(); let mut original = Counter::default(); - original.increment("conflict-1", 1); + original.increment("conflict-1".into(), 1).unwrap(); counter_repo.commit(&mut original).await.unwrap(); let mut stale = counter_repo.get("conflict-1").await.unwrap().unwrap(); let mut winner = counter_repo.get("conflict-1").await.unwrap().unwrap(); - stale.increment("conflict-1", 10); - winner.increment("conflict-1", 20); + stale.increment("conflict-1".into(), 10).unwrap(); + winner.increment("conflict-1".into(), 20).unwrap(); counter_repo.commit(&mut winner).await.unwrap(); let mut other = CounterProjection::default(); - other.touch("should-not-commit"); + other.touch("should-not-commit".into()).unwrap(); let view = RelationalCounterView { id: "should-not-commit".into(), @@ -237,7 +217,7 @@ async fn commit_batch_lowers_relational_read_model_plan_into_registered_table() let mut session = ReadModelWritePlanBuilder::new(); session.upsert(&view).unwrap(); let mut projection = CounterProjection::default(); - projection.touch("relational-batch-1"); + projection.touch("relational-batch-1".into()).unwrap(); let identity = StreamIdentity::new(CounterProjection::aggregate_type(), "relational-batch-1").unwrap(); diff --git a/tests/todos/aggregate.rs b/tests/todos/aggregate.rs index edbe46d..bf5edd3 100644 --- a/tests/todos/aggregate.rs +++ b/tests/todos/aggregate.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sourced_rust::{digest, Entity}; +use sourced_rust::{sourced, Entity}; #[derive(Default)] pub struct Todo { @@ -9,19 +9,20 @@ pub struct Todo { completed: bool, } +#[sourced(entity)] impl Todo { pub fn new() -> Self { Self::default() } - #[digest("Initialized")] + #[event("Initialized")] pub fn initialize(&mut self, id: String, user_id: String, task: String) { self.entity.set_id(&id); self.user_id = user_id; self.task = task; } - #[digest("Completed", when = !self.completed)] + #[event("Completed", when = !self.completed)] pub fn complete(&mut self) { self.completed = true; } @@ -36,11 +37,6 @@ impl Todo { } } -sourced_rust::aggregate!(Todo, entity { - "Initialized"(id, user_id, task) => initialize, - "Completed"() => complete(), -}); - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TodoSnapshot { pub id: String,