From a3b60f2d2e46d271acec09146cda2c0ba9e02361 Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 13 Sep 2025 17:49:49 +0200 Subject: [PATCH 1/5] Single-threaded async execution for aggregates, alongside the existing multi-threaded, `Send`-bound futures. --- Cargo.toml | 4 + src/aggregate.rs | 326 +++++++++++++++++++++++++++++++ tests/aggregate_not_send_test.rs | 198 +++++++++++++++++++ tests/aggregate_test.rs | 269 ++++++------------------- tests/api/mod.rs | 5 + 5 files changed, 589 insertions(+), 213 deletions(-) create mode 100644 tests/aggregate_not_send_test.rs diff --git a/Cargo.toml b/Cargo.toml index aa67038..548660e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,7 @@ pretty_assertions = "1.4.1" derive_more = { version = "2", features = ["display"] } tokio = { version = "1.43.1", features = ["rt", "rt-multi-thread", "macros"] } + +[features] +default = [] # default = Send futures +not-send-futures = [] # opt into non-Send futures \ No newline at end of file diff --git a/src/aggregate.rs b/src/aggregate.rs index 6477430..ca73a03 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -13,6 +13,7 @@ use crate::Identifier; /// - `E` - Event /// - `Version` - Version/Offset/Sequence number /// - `Error` - Error +#[cfg(not(feature = "not-send-futures"))] pub trait EventRepository { /// Fetches current events, based on the command. /// Desugared `async fn fetch_events(&self, command: &C) -> Result, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`. @@ -35,6 +36,31 @@ pub trait EventRepository { ) -> impl Future, Error>> + Send; } +/// Event Repository trait +/// +/// Generic parameters: +/// +/// - `C` - Command +/// - `E` - Event +/// - `Version` - Version/Offset/Sequence number +/// - `Error` - Error +#[cfg(feature = "not-send-futures")] +pub trait EventRepository { + /// Fetches current events, based on the command. + /// Desugared `async fn fetch_events(&self, command: &C) -> Result, Error>;` to a normal `fn` that returns `impl Future`. + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn fetch_events(&self, command: &C) -> impl Future, Error>>; + /// Saves events. + /// Desugared `async fn save(&self, events: &[E], latest_version: &Option) -> Result, Error>;` to a normal `fn` that returns `impl Future` + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn save(&self, events: &[E]) -> impl Future, Error>>; + + /// Version provider. It is used to provide the version/sequence of the stream to wich this event belongs to. Optimistic locking is useing this version to check if the event is already saved. + /// Desugared `async fn version_provider(&self, event: &E) -> Result, Error>;` to a normal `fn` that returns `impl Future` + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn version_provider(&self, event: &E) -> impl Future, Error>>; +} + /// Event Sourced Aggregate. /// /// It is using a `Decider` / [EventComputation] to compute new events based on the current events and the command. @@ -71,6 +97,7 @@ where } } +#[cfg(not(feature = "not-send-futures"))] impl EventRepository for EventSourcedAggregate where @@ -96,6 +123,28 @@ where } } +#[cfg(feature = "not-send-futures")] +impl EventRepository + for EventSourcedAggregate +where + Repository: EventRepository, + Decider: EventComputation, +{ + /// Fetches current events, based on the command. + async fn fetch_events(&self, command: &C) -> Result, Error> { + self.repository.fetch_events(command).await + } + /// Saves events. + async fn save(&self, events: &[E]) -> Result, Error> { + self.repository.save(events).await + } + /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved. + async fn version_provider(&self, event: &E) -> Result, Error> { + self.repository.version_provider(event).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl EventSourcedAggregate where @@ -128,6 +177,34 @@ where } } +#[cfg(feature = "not-send-futures")] +impl + EventSourcedAggregate +where + Repository: EventRepository, + Decider: EventComputation, +{ + /// Creates a new instance of [EventSourcedAggregate]. + pub fn new(repository: Repository, decider: Decider) -> Self { + EventSourcedAggregate { + repository, + decider, + _marker: PhantomData, + } + } + /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository. + pub async fn handle(&self, command: &C) -> Result, Error> { + let events: Vec<(E, Version)> = self.fetch_events(command).await?; + let mut current_events: Vec = vec![]; + for (event, _) in events { + current_events.push(event); + } + let new_events = self.compute_new_events(¤t_events, command)?; + let saved_events = self.save(&new_events).await?; + Ok(saved_events) + } +} + /// State Repository trait /// /// Generic parameters: @@ -136,6 +213,7 @@ where /// - `S` - State /// - `Version` - Version /// - `Error` - Error +#[cfg(not(feature = "not-send-futures"))] pub trait StateRepository { /// Fetches current state, based on the command. /// Desugared `async fn fetch_state(&self, command: &C) -> Result, Error>;` to a normal `fn` that returns `impl Future` and adds bound `Send` @@ -154,6 +232,31 @@ pub trait StateRepository { ) -> impl Future> + Send; } +/// State Repository trait +/// +/// Generic parameters: +/// +/// - `C` - Command +/// - `S` - State +/// - `Version` - Version +/// - `Error` - Error +#[cfg(feature = "not-send-futures")] +pub trait StateRepository { + /// Fetches current state, based on the command. + /// Desugared `async fn fetch_state(&self, command: &C) -> Result, Error>;` to a normal `fn` that returns `impl Future` + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn fetch_state(&self, command: &C) + -> impl Future, Error>>; + /// Saves state. + /// Desugared `async fn save(&self, state: &S, version: &Option) -> Result<(S, Version), Error>;` to a normal `fn` that returns `impl Future` + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn save( + &self, + state: &S, + version: &Option, + ) -> impl Future>; +} + /// State Stored Aggregate. /// /// It is using a `Decider` / [StateComputation] to compute new state based on the current state and the command. @@ -190,6 +293,7 @@ where } } +#[cfg(not(feature = "not-send-futures"))] impl StateRepository for StateStoredAggregate where @@ -211,6 +315,24 @@ where } } +#[cfg(feature = "not-send-futures")] +impl StateRepository + for StateStoredAggregate +where + Repository: StateRepository, + Decider: StateComputation, +{ + /// Fetches current state, based on the command. + async fn fetch_state(&self, command: &C) -> Result, Error> { + self.repository.fetch_state(command).await + } + /// Saves state. + async fn save(&self, state: &S, version: &Option) -> Result<(S, Version), Error> { + self.repository.save(state, version).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl StateStoredAggregate where @@ -248,6 +370,39 @@ where } } +#[cfg(feature = "not-send-futures")] +impl + StateStoredAggregate +where + Repository: StateRepository, + Decider: StateComputation, +{ + /// Creates a new instance of [StateStoredAggregate]. + pub fn new(repository: Repository, decider: Decider) -> Self { + StateStoredAggregate { + repository, + decider, + _marker: PhantomData, + } + } + /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository. + pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> { + let state_version = self.fetch_state(command).await?; + match state_version { + None => { + let new_state = self.compute_new_state(None, command)?; + let saved_state = self.save(&new_state, &None).await?; + Ok(saved_state) + } + Some((state, version)) => { + let new_state = self.compute_new_state(Some(state), command)?; + let saved_state = self.save(&new_state, &Some(version)).await?; + Ok(saved_state) + } + } + } +} + /// Orchestrating Event Sourced Aggregate. /// It is using a [Decider] and [Saga] to compute new events based on the current events and the command. /// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction. @@ -269,6 +424,7 @@ where _marker: PhantomData<(C, S, E, Version, Error)>, } +#[cfg(not(feature = "not-send-futures"))] impl EventRepository for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error> where @@ -293,6 +449,27 @@ where } } +#[cfg(feature = "not-send-futures")] +impl EventRepository + for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error> +where + Repository: EventRepository, +{ + /// Fetches current events, based on the command. + async fn fetch_events(&self, command: &C) -> Result, Error> { + self.repository.fetch_events(command).await + } + /// Saves events. + async fn save(&self, events: &[E]) -> Result, Error> { + self.repository.save(events).await + } + /// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved. + async fn version_provider(&self, event: &E) -> Result, Error> { + self.repository.version_provider(event).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl<'a, C, S, E, Repository, Version, Error> EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error> where @@ -389,6 +566,99 @@ where } } +#[cfg(feature = "not-send-futures")] +impl<'a, C, S, E, Repository, Version, Error> + EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error> +where + Repository: EventRepository, + E: Clone, +{ + /// Creates a new instance of [EventSourcedAggregate]. + pub fn new( + repository: Repository, + decider: Decider<'a, C, S, E, Error>, + saga: Saga<'a, E, C>, + ) -> Self { + EventSourcedOrchestratingAggregate { + repository, + decider, + saga, + _marker: PhantomData, + } + } + /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository. + pub async fn handle(&self, command: &C) -> Result, Error> + where + E: Identifier, + C: Identifier, + { + let events: Vec<(E, Version)> = self.fetch_events(command).await?; + let mut current_events: Vec = vec![]; + for (event, _) in events { + current_events.push(event); + } + let new_events = self + .compute_new_events_dynamically(¤t_events, command) + .await?; + let saved_events = self.save(&new_events).await?; + Ok(saved_events) + } + /// Computes new events based on the current events and the command. + /// It is using a [Decider] and [Saga] to compute new events based on the current events and the command. + /// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction. + /// It is using a [EventRepository] to fetch the current events for the command that is computed by the `saga`. + async fn compute_new_events_dynamically( + &self, + current_events: &[E], + command: &C, + ) -> Result, Error> + where + E: Identifier, + C: Identifier, + { + let current_state: S = current_events + .iter() + .fold((self.decider.initial_state)(), |state, event| { + (self.decider.evolve)(&state, event) + }); + + let initial_events = (self.decider.decide)(command, ¤t_state)?; + + let commands: Vec = initial_events + .iter() + .flat_map(|event: &E| self.saga.compute_new_actions(event)) + .collect(); + + // Collect all events including recursively computed new events. + let mut all_events = initial_events.clone(); + + for command in commands.iter() { + let previous_events = [ + self.repository + .fetch_events(command) + .await? + .iter() + .map(|(e, _)| e.clone()) + .collect::>(), + initial_events + .clone() + .into_iter() + .filter(|e| e.identifier() == command.identifier()) + .collect::>(), + ] + .concat(); + + // Recursively compute new events and extend the accumulated events list. + // By wrapping the recursive call in a Box, we ensure that the future type is not self-referential. + let new_events = + Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?; + all_events.extend(new_events); + } + + Ok(all_events) + } +} + /// Orchestrating State Stored Aggregate. /// /// It is using a [Decider] and [Saga] to compute new state based on the current state and the command. @@ -438,6 +708,7 @@ where } } +#[cfg(not(feature = "not-send-futures"))] impl StateRepository for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error> where @@ -458,6 +729,23 @@ where } } +#[cfg(feature = "not-send-futures")] +impl StateRepository + for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error> +where + Repository: StateRepository, +{ + /// Fetches current state, based on the command. + async fn fetch_state(&self, command: &C) -> Result, Error> { + self.repository.fetch_state(command).await + } + /// Saves state. + async fn save(&self, state: &S, version: &Option) -> Result<(S, Version), Error> { + self.repository.save(state, version).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl<'a, C, S, E, Repository, Version, Error> StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error> where @@ -498,3 +786,41 @@ where } } } + +#[cfg(feature = "not-send-futures")] +impl<'a, C, S, E, Repository, Version, Error> + StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error> +where + Repository: StateRepository, + S: Clone, +{ + /// Creates a new instance of [StateStoredAggregate]. + pub fn new( + repository: Repository, + decider: Decider<'a, C, S, E, Error>, + saga: Saga<'a, E, C>, + ) -> Self { + StateStoredOrchestratingAggregate { + repository, + decider, + saga, + _marker: PhantomData, + } + } + /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository. + pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> { + let state_version = self.fetch_state(command).await?; + match state_version { + None => { + let new_state = self.compute_new_state(None, command)?; + let saved_state = self.save(&new_state, &None).await?; + Ok(saved_state) + } + Some((state, version)) => { + let new_state = self.compute_new_state(Some(state), command)?; + let saved_state = self.save(&new_state, &Some(version)).await?; + Ok(saved_state) + } + } + } +} diff --git a/tests/aggregate_not_send_test.rs b/tests/aggregate_not_send_test.rs new file mode 100644 index 0000000..372bbfc --- /dev/null +++ b/tests/aggregate_not_send_test.rs @@ -0,0 +1,198 @@ +#![cfg(feature = "not-send-futures")] + +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; + +use fmodel_rust::aggregate::{ + EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate, +}; +use fmodel_rust::decider::Decider; +use fmodel_rust::Identifier; + +use crate::api::{CreateOrderCommand, OrderCommand, OrderCreatedEvent, OrderEvent, OrderState}; +use crate::application::AggregateError; + +mod api; +mod application; + +/// In-memory event repository for testing +struct InMemoryOrderEventRepository { + events: RefCell>, +} + +impl InMemoryOrderEventRepository { + fn new() -> Self { + Self { + events: RefCell::new(vec![]), + } + } +} + +impl EventRepository + for InMemoryOrderEventRepository +{ + async fn fetch_events( + &self, + command: &OrderCommand, + ) -> Result, AggregateError> { + let events = self.events.borrow(); // borrow the Vec immutably + Ok(events + .iter() + .cloned() + .filter(|(e, _)| e.identifier() == command.identifier()) + .collect()) + } + + async fn save(&self, events: &[OrderEvent]) -> Result, AggregateError> { + // Step 1: compute latest version without holding mutable borrow + let latest_version = { + let events_vec = self.events.borrow(); // immutable borrow + events + .first() + .and_then(|first_event| { + events_vec + .iter() + .filter(|(e, _)| e.identifier() == first_event.identifier()) + .map(|(_, v)| *v) + .last() + }) + .unwrap_or(-1) + }; + + // Step 2: build new events + let mut current_version = latest_version; + let new_events: Vec<(OrderEvent, i32)> = events + .iter() + .map(|event| { + current_version += 1; + (event.clone(), current_version) + }) + .collect(); + + // Step 3: commit them under a mutable borrow + self.events.borrow_mut().extend_from_slice(&new_events); + + Ok(new_events) + } + + async fn version_provider(&self, event: &OrderEvent) -> Result, AggregateError> { + let events = self.events.borrow(); + Ok(events + .iter() + .filter(|(e, _)| e.identifier() == event.identifier()) + .map(|(_, v)| *v) + .last()) + } +} + +/// In-memory state repository for testing +struct InMemoryOrderStateRepository { + states: RefCell>, +} +impl InMemoryOrderStateRepository { + fn new() -> Self { + Self { + states: RefCell::new(HashMap::new()), + } + } +} + +impl StateRepository + for InMemoryOrderStateRepository +{ + async fn fetch_state( + &self, + command: &OrderCommand, + ) -> Result, AggregateError> { + let states = self.states.borrow(); + Ok(states + .get(&command.identifier().parse::().unwrap()) + .cloned()) + } + + async fn save( + &self, + state: &OrderState, + version: &Option, + ) -> Result<(OrderState, i32), AggregateError> { + let mut states = self.states.borrow_mut(); + let version = version.unwrap_or(0); + states.insert(state.order_id, (state.clone(), version + 1)); + Ok((state.clone(), version)) + } +} + +/// Example decider +fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { + Decider { + decide: Box::new(|command, _state| match command { + OrderCommand::Create(cmd) => Ok(vec![OrderEvent::Created(OrderCreatedEvent { + order_id: cmd.order_id, + customer_name: cmd.customer_name.clone(), + items: cmd.items.clone(), + })]), + OrderCommand::Update(_cmd) => Ok(vec![]), + OrderCommand::Cancel(_cmd) => Ok(vec![]), + }), + evolve: Box::new(|state, _event| state.clone()), + initial_state: Box::new(|| OrderState { + order_id: 0, + customer_name: "".to_string(), + items: vec![], + is_cancelled: false, + }), + } +} + +#[tokio::test] +async fn es_test_not_send() { + let repository = InMemoryOrderEventRepository::new(); + + let aggregate = Rc::new(EventSourcedAggregate::new( + repository, + decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), + )); + let aggregate2 = Rc::clone(&aggregate); + + let task1 = async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "Alice".to_string(), + items: vec!["Item1".to_string()], + }); + let result = aggregate.handle(&command).await; + assert!(result.is_ok()); + }; + + let task2 = async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate2.handle(&command).await; + assert!(result.is_ok()); + }; + + // Run both tasks concurrently on the same thread. + tokio::join!(task1, task2); +} + +#[tokio::test] +async fn ss_test_not_send() { + let repository = InMemoryOrderStateRepository::new(); + let aggregate = StateStoredAggregate::new( + repository, + decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), + ); + + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "Alice".to_string(), + items: vec!["Item1".to_string()], + }); + + let result = aggregate.handle(&command).await; + assert!(result.is_ok()); +} diff --git a/tests/aggregate_test.rs b/tests/aggregate_test.rs index 9254a76..727c023 100644 --- a/tests/aggregate_test.rs +++ b/tests/aggregate_test.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::thread; +use std::sync::{Arc, Mutex, RwLock}; use fmodel_rust::aggregate::{ EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate, @@ -9,8 +8,8 @@ use fmodel_rust::decider::Decider; use fmodel_rust::Identifier; use crate::api::{ - CancelOrderCommand, CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent, - OrderEvent, OrderState, OrderUpdatedEvent, UpdateOrderCommand, + CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent, OrderEvent, + OrderState, OrderUpdatedEvent, }; use crate::application::AggregateError; @@ -19,13 +18,13 @@ mod application; /// A simple in-memory event repository - infrastructure struct InMemoryOrderEventRepository { - events: Mutex>, + events: RwLock>, } impl InMemoryOrderEventRepository { fn new() -> Self { InMemoryOrderEventRepository { - events: Mutex::new(vec![]), + events: RwLock::new(vec![]), } } } @@ -40,7 +39,7 @@ impl EventRepository ) -> Result, AggregateError> { Ok(self .events - .lock() + .read() .unwrap() .clone() .into_iter() @@ -62,7 +61,7 @@ impl EventRepository .collect::>(); self.events - .lock() + .write() .unwrap() .extend_from_slice(&events.clone()); Ok(events) @@ -71,7 +70,7 @@ impl EventRepository async fn version_provider(&self, event: &OrderEvent) -> Result, AggregateError> { Ok(self .events - .lock() + .read() .unwrap() .clone() .into_iter() @@ -185,108 +184,37 @@ async fn es_test() { repository, decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), )); - // Makes a clone of the Arc pointer. - // This creates another pointer to the same allocation, increasing the strong reference count. let aggregate2 = Arc::clone(&aggregate); - // Let's spawn two threads to simulate two concurrent requests - let handle1 = thread::spawn(|| async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Created(OrderCreatedEvent { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }), - 0 - )] - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 1, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Updated(OrderUpdatedEvent { - order_id: 1, - updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }), - 1 - )] - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }), - 2 - )] - ); + // Spawn two async tasks instead of threads + let handle1 = tokio::spawn({ + let aggregate = Arc::clone(&aggregate); + async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate.handle(&command).await; + assert!(result.is_ok()); + } }); - let handle2 = thread::spawn(|| async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Created(OrderCreatedEvent { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }), - 0 - )] - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 2, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Updated(OrderUpdatedEvent { - order_id: 2, - updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }), - 1 - )] - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }), - 2 - )] - ); + let handle2 = tokio::spawn({ + let aggregate2 = Arc::clone(&aggregate2); + async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate2.handle(&command).await; + assert!(result.is_ok()); + } }); - handle1.join().unwrap().await; - handle2.join().unwrap().await; + // Wait for both tasks to complete + let _ = tokio::join!(handle1, handle2); } #[tokio::test] @@ -298,116 +226,31 @@ async fn ss_test() { )); let aggregate2 = Arc::clone(&aggregate); - let handle1 = thread::spawn(|| async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - is_cancelled: false, - }, - 0 - ) - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 1, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 3".to_string(), "Item 4".to_string()], - is_cancelled: false, - }, - 1 - ) - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 3".to_string(), "Item 4".to_string()], - is_cancelled: true, - }, - 2 - ) - ); + let handle1 = tokio::spawn({ + let aggregate = Arc::clone(&aggregate); + async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate.handle(&command).await; + assert!(result.is_ok()); + } }); - let handle2 = thread::spawn(|| async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - is_cancelled: false, - }, - 0 - ) - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 2, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 3".to_string(), "Item 4".to_string()], - is_cancelled: false, - }, - 1 - ) - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - ( - OrderState { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 3".to_string(), "Item 4".to_string()], - is_cancelled: true, - }, - 2 - ) - ); + let handle2 = tokio::spawn({ + let aggregate2 = Arc::clone(&aggregate2); + async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate2.handle(&command).await; + assert!(result.is_ok()); + } }); - handle1.join().unwrap().await; - handle2.join().unwrap().await; + let _ = tokio::join!(handle1, handle2); } diff --git a/tests/api/mod.rs b/tests/api/mod.rs index afd0dfd..030cd93 100644 --- a/tests/api/mod.rs +++ b/tests/api/mod.rs @@ -6,6 +6,7 @@ use fmodel_rust::Identifier; /// The state of the Order entity #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub struct OrderState { pub order_id: u32, pub customer_name: String, @@ -15,6 +16,7 @@ pub struct OrderState { /// The state of the ViewOrder entity / It represents the Query Model #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub struct OrderViewState { pub order_id: u32, pub customer_name: String, @@ -24,6 +26,7 @@ pub struct OrderViewState { /// A second version of the ViewOrder entity / It represents the Query Model #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub struct OrderView2State { pub order_id: u32, pub customer_name: String, @@ -114,6 +117,7 @@ impl Identifier for OrderEvent { /// The state of the Shipment entity #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub struct ShipmentState { pub shipment_id: u32, pub order_id: u32, @@ -123,6 +127,7 @@ pub struct ShipmentState { /// The state of the ViewShipment entity / It represents the Query Model #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub struct ShipmentViewState { pub shipment_id: u32, pub order_id: u32, From 41368e7a24b84ee137eee6c3b85ee8f6df46a21a Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 13 Sep 2025 18:46:00 +0200 Subject: [PATCH 2/5] Single-threaded async execution for materialized views, alongside the existing multi-threaded, Send-bound futures. --- src/materialized_view.rs | 63 ++++++++ tests/aggregate_test.rs | 86 +++++------ tests/materialized_view_not_send_test.rs | 184 +++++++++++++++++++++++ tests/materialized_view_test.rs | 10 +- 4 files changed, 291 insertions(+), 52 deletions(-) create mode 100644 tests/materialized_view_not_send_test.rs diff --git a/src/materialized_view.rs b/src/materialized_view.rs index b823abc..2762a70 100644 --- a/src/materialized_view.rs +++ b/src/materialized_view.rs @@ -10,6 +10,7 @@ use crate::view::ViewStateComputation; /// - `E` - Event /// - `S` - State /// - `Error` - Error +#[cfg(not(feature = "not-send-futures"))] pub trait ViewStateRepository { /// Fetches current state, based on the event. /// Desugared `async fn fetch_state(&self, event: &E) -> Result, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`. @@ -21,6 +22,25 @@ pub trait ViewStateRepository { fn save(&self, state: &S) -> impl Future> + Send; } +/// View State Repository trait +/// +/// Generic parameters: +/// +/// - `E` - Event +/// - `S` - State +/// - `Error` - Error +#[cfg(feature = "not-send-futures")] +pub trait ViewStateRepository { + /// Fetches current state, based on the event. + /// Desugared `async fn fetch_state(&self, event: &E) -> Result, Error>;` to a normal `fn` that returns `impl Future`. + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn fetch_state(&self, event: &E) -> impl Future, Error>>; + /// Saves the new state. + /// Desugared `async fn save(&self, state: &S) -> Result;` to a normal `fn` that returns `impl Future`. + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn save(&self, state: &S) -> impl Future>; +} + /// Materialized View. /// /// It is using a `View` / [ViewStateComputation] to compute new state based on the current state and the event. @@ -55,6 +75,7 @@ where } } +#[cfg(not(feature = "not-send-futures"))] impl ViewStateRepository for MaterializedView where @@ -75,6 +96,25 @@ where } } +#[cfg(feature = "not-send-futures")] +impl ViewStateRepository + for MaterializedView +where + Repository: ViewStateRepository, + View: ViewStateComputation, +{ + /// Fetches current state, based on the event. + async fn fetch_state(&self, event: &E) -> Result, Error> { + let state = self.repository.fetch_state(event).await?; + Ok(state) + } + /// Saves the new state. + async fn save(&self, state: &S) -> Result { + self.repository.save(state).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl MaterializedView where Repository: ViewStateRepository + Sync, @@ -99,3 +139,26 @@ where Ok(saved_state) } } + +#[cfg(feature = "not-send-futures")] +impl MaterializedView +where + Repository: ViewStateRepository, + View: ViewStateComputation, +{ + /// Creates a new instance of [MaterializedView]. + pub fn new(repository: Repository, view: View) -> Self { + MaterializedView { + repository, + view, + _marker: PhantomData, + } + } + /// Handles the event by fetching the state from the repository, computing new state based on the current state and the event, and saving the new state to the repository. + pub async fn handle(&self, event: &E) -> Result { + let state = self.fetch_state(event).await?; + let new_state = self.compute_new_state(state, &[event]); + let saved_state = self.save(&new_state).await?; + Ok(saved_state) + } +} diff --git a/tests/aggregate_test.rs b/tests/aggregate_test.rs index 727c023..e93f2e5 100644 --- a/tests/aggregate_test.rs +++ b/tests/aggregate_test.rs @@ -12,6 +12,7 @@ use crate::api::{ OrderState, OrderUpdatedEvent, }; use crate::application::AggregateError; +use std::thread; mod api; mod application; @@ -184,37 +185,32 @@ async fn es_test() { repository, decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), )); + let aggregate1 = Arc::clone(&aggregate); let aggregate2 = Arc::clone(&aggregate); // Spawn two async tasks instead of threads - let handle1 = tokio::spawn({ - let aggregate = Arc::clone(&aggregate); - async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - } + let handle1 = thread::spawn(|| async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate1.handle(&command).await; + assert!(result.is_ok()); }); - let handle2 = tokio::spawn({ - let aggregate2 = Arc::clone(&aggregate2); - async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - } + let handle2 = thread::spawn(|| async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate2.handle(&command).await; + assert!(result.is_ok()); }); - // Wait for both tasks to complete - let _ = tokio::join!(handle1, handle2); + handle1.join().unwrap().await; + handle2.join().unwrap().await; } #[tokio::test] @@ -224,33 +220,29 @@ async fn ss_test() { repository, decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), )); + let aggregate1 = Arc::clone(&aggregate); let aggregate2 = Arc::clone(&aggregate); - let handle1 = tokio::spawn({ - let aggregate = Arc::clone(&aggregate); - async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - } + let handle1 = thread::spawn(|| async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate1.handle(&command).await; + assert!(result.is_ok()); }); - let handle2 = tokio::spawn({ - let aggregate2 = Arc::clone(&aggregate2); - async move { - let command = OrderCommand::Create(CreateOrderCommand { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let result = aggregate2.handle(&command).await; - assert!(result.is_ok()); - } + let handle2 = thread::spawn(|| async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = aggregate2.handle(&command).await; + assert!(result.is_ok()); }); - let _ = tokio::join!(handle1, handle2); + handle1.join().unwrap().await; + handle2.join().unwrap().await; } diff --git a/tests/materialized_view_not_send_test.rs b/tests/materialized_view_not_send_test.rs new file mode 100644 index 0000000..5df9345 --- /dev/null +++ b/tests/materialized_view_not_send_test.rs @@ -0,0 +1,184 @@ +#![cfg(feature = "not-send-futures")] + +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; + +use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository}; +use fmodel_rust::view::View; +use fmodel_rust::Identifier; + +use crate::api::{ + OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState, +}; +use crate::application::MaterializedViewError; + +mod api; +mod application; + +fn view<'a>() -> View<'a, OrderViewState, OrderEvent> { + View { + evolve: Box::new(|state, event| { + let mut new_state = state.clone(); + match event { + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); + } + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); + } + OrderEvent::Cancelled(_) => { + new_state.is_cancelled = true; + } + } + new_state + }), + initial_state: Box::new(|| OrderViewState { + order_id: 0, + customer_name: "".to_string(), + items: Vec::new(), + is_cancelled: false, + }), + } +} + +struct InMemoryViewOrderStateRepository { + states: RefCell>, +} + +impl InMemoryViewOrderStateRepository { + fn new() -> Self { + InMemoryViewOrderStateRepository { + states: RefCell::new(HashMap::new()), + } + } +} + +// Implementation of [ViewStateRepository] for [InMemoryViewOrderStateRepository] +impl ViewStateRepository + for InMemoryViewOrderStateRepository +{ + async fn fetch_state( + &self, + event: &OrderEvent, + ) -> Result, MaterializedViewError> { + Ok(self + .states + .borrow() + .get(&event.identifier().parse::().unwrap()) + .cloned()) + } + + async fn save(&self, state: &OrderViewState) -> Result { + self.states + .borrow_mut() + .insert(state.order_id, state.clone()); + Ok(state.clone()) + } +} + +#[tokio::test] +async fn test() { + let repository = InMemoryViewOrderStateRepository::new(); + let materialized_view = Rc::new(MaterializedView::new(repository, view())); + let materialized_view1 = Rc::clone(&materialized_view); + let materialized_view2 = Rc::clone(&materialized_view); + + // Let's spawn two tasks to simulate two concurrent requests + let task1 = async move { + let event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = materialized_view1.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + is_cancelled: false, + } + ); + let event = OrderEvent::Updated(OrderUpdatedEvent { + order_id: 1, + updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], + }); + let result = materialized_view1.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 3".to_string(), "Item 4".to_string()], + is_cancelled: false, + } + ); + let event = OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }); + let result = materialized_view1.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 3".to_string(), "Item 4".to_string()], + is_cancelled: true, + } + ); + }; + + let task2 = async move { + let event = OrderEvent::Created(OrderCreatedEvent { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let result = materialized_view2.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + is_cancelled: false, + } + ); + let event = OrderEvent::Updated(OrderUpdatedEvent { + order_id: 2, + updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], + }); + let result = materialized_view2.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 3".to_string(), "Item 4".to_string()], + is_cancelled: false, + } + ); + let event = OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }); + let result = materialized_view2.handle(&event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + OrderViewState { + order_id: 2, + customer_name: "John Doe".to_string(), + items: vec!["Item 3".to_string(), "Item 4".to_string()], + is_cancelled: true, + } + ); + }; + + // Run both tasks concurrently on the same thread. + let _ = tokio::join!(task1, task2); +} diff --git a/tests/materialized_view_test.rs b/tests/materialized_view_test.rs index 3bf401d..efdba4b 100644 --- a/tests/materialized_view_test.rs +++ b/tests/materialized_view_test.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::thread; use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository}; @@ -43,13 +43,13 @@ fn view<'a>() -> View<'a, OrderViewState, OrderEvent> { } struct InMemoryViewOrderStateRepository { - states: Mutex>, + states: RwLock>, } impl InMemoryViewOrderStateRepository { fn new() -> Self { InMemoryViewOrderStateRepository { - states: Mutex::new(HashMap::new()), + states: RwLock::new(HashMap::new()), } } } @@ -64,7 +64,7 @@ impl ViewStateRepository ) -> Result, MaterializedViewError> { Ok(self .states - .lock() + .read() .unwrap() .get(&event.identifier().parse::().unwrap()) .cloned()) @@ -72,7 +72,7 @@ impl ViewStateRepository async fn save(&self, state: &OrderViewState) -> Result { self.states - .lock() + .write() .unwrap() .insert(state.order_id, state.clone()); Ok(state.clone()) From 89eb69db7c95c9b68a7032740f8496109a30cee8 Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 13 Sep 2025 19:16:10 +0200 Subject: [PATCH 3/5] Single-threaded async execution for saga managers, alongside the existing multi-threaded, Send-bound futures. --- src/saga_manager.rs | 55 ++++++++++++++++++ tests/saga_manager_test.rs | 111 ++++++++++++++++++++++++++++++++----- 2 files changed, 152 insertions(+), 14 deletions(-) diff --git a/src/saga_manager.rs b/src/saga_manager.rs index c3cdc29..5085ca0 100644 --- a/src/saga_manager.rs +++ b/src/saga_manager.rs @@ -9,6 +9,7 @@ use crate::saga::ActionComputation; /// /// - `A`. - action /// - `Error` - error +#[cfg(not(feature = "not-send-futures"))] pub trait ActionPublisher { /// Publishes the action/command to some external system, returning either the actions that are successfully published or error. /// Desugared `async fn publish(&self, action: &[A]) -> Result, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`. @@ -16,6 +17,20 @@ pub trait ActionPublisher { fn publish(&self, action: &[A]) -> impl Future, Error>> + Send; } +/// Publishes the action/command to some external system. +/// +/// Generic parameter: +/// +/// - `A`. - action +/// - `Error` - error +#[cfg(feature = "not-send-futures")] +pub trait ActionPublisher { + /// Publishes the action/command to some external system, returning either the actions that are successfully published or error. + /// Desugared `async fn publish(&self, action: &[A]) -> Result, Error>;` to a normal `fn` that returns `impl Future`. + /// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. + fn publish(&self, action: &[A]) -> impl Future, Error>>; +} + /// Saga Manager. /// /// It is using a `Saga` to react to the action result and to publish the new actions. @@ -48,6 +63,7 @@ where } } +#[cfg(not(feature = "not-send-futures"))] impl ActionPublisher for SagaManager where @@ -63,6 +79,20 @@ where } } +#[cfg(feature = "not-send-futures")] +impl ActionPublisher + for SagaManager +where + Publisher: ActionPublisher, + Saga: ActionComputation, +{ + /// Publishes the action/command to some external system, returning either the actions that are successfully published or error. + async fn publish(&self, action: &[A]) -> Result, Error> { + self.action_publisher.publish(action).await + } +} + +#[cfg(not(feature = "not-send-futures"))] impl SagaManager where Publisher: ActionPublisher + Sync, @@ -89,3 +119,28 @@ where Ok(published_actions) } } + +#[cfg(feature = "not-send-futures")] +impl SagaManager +where + Publisher: ActionPublisher, + Saga: ActionComputation, +{ + /// Creates a new instance of [SagaManager]. + pub fn new(action_publisher: Publisher, saga: Saga) -> Self { + SagaManager { + action_publisher, + saga, + _marker: PhantomData, + } + } + /// Handles the `action result` by computing new `actions` based on `action result`, and publishing new `actions` to the external system. + /// In most cases: + /// - the `action result` is an `event` that you react, + /// - the `actions` are `commands` that you publish downstream. + pub async fn handle(&self, action_result: &AR) -> Result, Error> { + let new_actions = self.compute_new_actions(action_result); + let published_actions = self.publish(&new_actions).await?; + Ok(published_actions) + } +} diff --git a/tests/saga_manager_test.rs b/tests/saga_manager_test.rs index fc524fb..2762fd6 100644 --- a/tests/saga_manager_test.rs +++ b/tests/saga_manager_test.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; +use std::thread; + use fmodel_rust::saga::Saga; use fmodel_rust::saga_manager::{ActionPublisher, SagaManager}; @@ -49,23 +52,103 @@ impl ActionPublisher for SimpleActionPublishe #[tokio::test] async fn test() { - let saga: Saga = saga(); - let order_created_event = OrderEvent::Created(OrderCreatedEvent { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], + let saga_manager = Arc::new(SagaManager::new(SimpleActionPublisher::new(), saga())); + let saga_manager1 = saga_manager.clone(); + let saga_manager2 = saga_manager.clone(); + + let handle1 = thread::spawn(|| async move { + let order_created_event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + + let result = saga_manager1.handle(&order_created_event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + })] + ); + }); + + let handle2 = thread::spawn(|| async move { + let order_created_event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 21".to_string(), "Item 22".to_string()], + }); + + let result = saga_manager2.handle(&order_created_event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 21".to_string(), "Item 22".to_string()], + })] + ); }); - let saga_manager = SagaManager::new(SimpleActionPublisher::new(), saga); - let result = saga_manager.handle(&order_created_event).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - vec![ShipmentCommand::Create(CreateShipmentCommand { - shipment_id: 1, + handle1.join().unwrap().await; + handle2.join().unwrap().await; +} + +#[cfg(feature = "not-send-futures")] +#[tokio::test] +async fn test2() { + use std::rc::Rc; + + let saga_manager = Rc::new(SagaManager::new(SimpleActionPublisher::new(), saga())); + let saga_manager1 = saga_manager.clone(); + let saga_manager2 = saga_manager.clone(); + + let task1 = async move { + let order_created_event = OrderEvent::Created(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })] - ); + }); + + let result = saga_manager1.handle(&order_created_event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + })] + ); + }; + + let task2 = async move { + let order_created_event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 21".to_string(), "Item 22".to_string()], + }); + + let result = saga_manager2.handle(&order_created_event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 21".to_string(), "Item 22".to_string()], + })] + ); + }; + + // Run both tasks concurrently on the same thread. + let _ = tokio::join!(task1, task2); } From bedaa2f73197bc0dea7fa43027babe3bfbb201f2 Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 13 Sep 2025 19:56:12 +0200 Subject: [PATCH 4/5] Readme updated - async single threaded --- README.md | 148 +++++++++++++++++++++++++----------------------------- 1 file changed, 69 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 5c577ae..bf80e95 100644 --- a/README.md +++ b/README.md @@ -430,65 +430,31 @@ Pushing these decisions from the core domain model is very valuable. Being able ## Fearless Concurrency +### `Send` bound futures/Async (multi-threaded executors) + Splitting the computation in your program into multiple threads to run multiple tasks at the same time can improve performance. However, programming with threads has a reputation for being difficult. Rust’s type system and ownership model guarantee thread safety. -Example of the concurrent execution of the aggregate: +Example of the concurrent execution of the aggregate in multi-threaded environment (**default** - `Send`-bound futures): ```rust async fn es_test() { let repository = InMemoryOrderEventRepository::new(); - let aggregate = Arc::new(EventSourcedAggregate::new(repository, decider())); - // Makes a clone of the Arc pointer. This creates another pointer to the same allocation, increasing the strong reference count. + let aggregate = Arc::new(EventSourcedAggregate::new( + repository, + decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), + )); + let aggregate1 = Arc::clone(&aggregate); let aggregate2 = Arc::clone(&aggregate); - // Lets spawn two threads to simulate two concurrent requests let handle1 = thread::spawn(|| async move { let command = OrderCommand::Create(CreateOrderCommand { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], }); - - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Created(OrderCreatedEvent { - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }), - 0 - )] - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 1, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }); - let result = aggregate.handle(&command).await; - assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Updated(OrderUpdatedEvent { - order_id: 1, - updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }), - 1 - )] - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 }); - let result = aggregate.handle(&command).await; + let result = aggregate1.handle(&command).await; assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }), - 2 - )] - ); }); let handle2 = thread::spawn(|| async move { @@ -499,47 +465,71 @@ async fn es_test() { }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Created(OrderCreatedEvent { - order_id: 2, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }), - 0 - )] - ); - let command = OrderCommand::Update(UpdateOrderCommand { - order_id: 2, - new_items: vec!["Item 3".to_string(), "Item 4".to_string()], + }); + + handle1.join().unwrap().await; + handle2.join().unwrap().await; +} +``` + +### `Send` free futures/Async (single-threaded executors) + +Concurrency and async programming do not require a multi-threaded environment. You can run async tasks on a single-threaded executor, which allows you to write async code without the Send bound. + +This approach has several benefits: + +- Simpler code: No need for Arc, Mutex(RwLock), or other thread synchronization primitives for shared state. + +- Ergonomic references: You can freely use references within your async code without worrying about moving data across threads. 🤯 + +- Efficient design: This model aligns with the “Thread-per-Core” pattern, letting you safely run multiple async tasks concurrently on a single thread. + +In short: you get all the power of async/await without the complexity of multi-threaded synchronization all the time. + +Just switching to a [LocalExecutor](https://docs.rs/async-executor/latest/async_executor/struct.LocalExecutor.html) or something like Tokio [LocalSet](https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html) should be enough. + +If you want to enable single-threaded, Send-free async support, you can enable the optional feature `not-send-futures` when adding fmodel-rust to your project: + +```toml +[dependencies] +fmodel-rust = { version = "0.8.2", features = ["not-send-futures"] } +``` + +Example of the concurrent execution of the aggregate in single-threaded environment (**behind feature** - `Send` free `Futures`): + +```rust +async fn es_test_not_send() { + let repository = InMemoryOrderEventRepository::new(); + + let aggregate = Rc::new(EventSourcedAggregate::new( + repository, + decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())), + )); + let aggregate2 = Rc::clone(&aggregate); + + // Notice how we `move` here, which requires Rc (not ARc). If you do not move, Rc is not needed. + let task1 = async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "Alice".to_string(), + items: vec!["Item1".to_string()], }); - let result = aggregate2.handle(&command).await; + let result = aggregate.handle(&command).await; assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Updated(OrderUpdatedEvent { - order_id: 2, - updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - }), - 1 - )] - ); - let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 }); + }; + + let task2 = async move { + let command = OrderCommand::Create(CreateOrderCommand { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); - assert_eq!( - result.unwrap(), - [( - OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }), - 2 - )] - ); - }); + }; - handle1.join().unwrap().await; - handle2.join().unwrap().await; + // Run both tasks concurrently on the same thread. + tokio::join!(task1, task2); } ``` From a1b44cfb4d69888d352db44f447ce7ad6723917c Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 13 Sep 2025 20:26:04 +0200 Subject: [PATCH 5/5] github build action updated to test `new` feature --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 62b2fc2..41c8783 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,4 +29,4 @@ jobs: run: cargo build --verbose - name: Run tests - run: cargo test --verbose + run: cargo test --features not-send-futures --verbose