Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
run: cargo build --verbose

- name: Run tests
run: cargo test --verbose
run: cargo test --features not-send-futures --verbose
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ pretty_assertions = "1.4.1"
derive_more = { version = "2", features = ["display"] }

tokio = { version = "1.43.1", features = ["rt", "rt-multi-thread", "macros"] }

[features]
default = [] # default = Send futures
not-send-futures = [] # opt into non-Send futures
148 changes: 69 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,65 +430,31 @@ Pushing these decisions from the core domain model is very valuable. Being able

## Fearless Concurrency

### `Send` bound futures/Async (multi-threaded executors)

Splitting the computation in your program into multiple threads to run multiple tasks at the same time can improve performance.
However, programming with threads has a reputation for being difficult. Rust’s type system and ownership model guarantee thread safety.

Example of the concurrent execution of the aggregate:
Example of the concurrent execution of the aggregate in multi-threaded environment (**default** - `Send`-bound futures):

```rust
async fn es_test() {
let repository = InMemoryOrderEventRepository::new();
let aggregate = Arc::new(EventSourcedAggregate::new(repository, decider()));
// Makes a clone of the Arc pointer. This creates another pointer to the same allocation, increasing the strong reference count.
let aggregate = Arc::new(EventSourcedAggregate::new(
repository,
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
));
let aggregate1 = Arc::clone(&aggregate);
let aggregate2 = Arc::clone(&aggregate);

// Lets spawn two threads to simulate two concurrent requests
let handle1 = thread::spawn(|| async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});

let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Created(OrderCreatedEvent {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}),
0
)]
);
let command = OrderCommand::Update(UpdateOrderCommand {
order_id: 1,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Updated(OrderUpdatedEvent {
order_id: 1,
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}),
1
)]
);
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 });
let result = aggregate.handle(&command).await;
let result = aggregate1.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }),
2
)]
);
});

let handle2 = thread::spawn(|| async move {
Expand All @@ -499,47 +465,71 @@ async fn es_test() {
});
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Created(OrderCreatedEvent {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}),
0
)]
);
let command = OrderCommand::Update(UpdateOrderCommand {
order_id: 2,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
});

handle1.join().unwrap().await;
handle2.join().unwrap().await;
}
```

### `Send` free futures/Async (single-threaded executors)

Concurrency and async programming do not require a multi-threaded environment. You can run async tasks on a single-threaded executor, which allows you to write async code without the Send bound.

This approach has several benefits:

- Simpler code: No need for Arc, Mutex(RwLock), or other thread synchronization primitives for shared state.

- Ergonomic references: You can freely use references within your async code without worrying about moving data across threads. 🤯

- Efficient design: This model aligns with the “Thread-per-Core” pattern, letting you safely run multiple async tasks concurrently on a single thread.

In short: you get all the power of async/await without the complexity of multi-threaded synchronization all the time.

Just switching to a [LocalExecutor](https://docs.rs/async-executor/latest/async_executor/struct.LocalExecutor.html) or something like Tokio [LocalSet](https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html) should be enough.

If you want to enable single-threaded, Send-free async support, you can enable the optional feature `not-send-futures` when adding fmodel-rust to your project:

```toml
[dependencies]
fmodel-rust = { version = "0.8.2", features = ["not-send-futures"] }
```

Example of the concurrent execution of the aggregate in single-threaded environment (**behind feature** - `Send` free `Futures`):

```rust
async fn es_test_not_send() {
let repository = InMemoryOrderEventRepository::new();

let aggregate = Rc::new(EventSourcedAggregate::new(
repository,
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
));
let aggregate2 = Rc::clone(&aggregate);

// Notice how we `move` here, which requires Rc (not ARc). If you do not move, Rc is not needed.
let task1 = async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "Alice".to_string(),
items: vec!["Item1".to_string()],
});
let result = aggregate2.handle(&command).await;
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Updated(OrderUpdatedEvent {
order_id: 2,
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}),
1
)]
);
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 });
};

let task2 = async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }),
2
)]
);
});
};

handle1.join().unwrap().await;
handle2.join().unwrap().await;
// Run both tasks concurrently on the same thread.
tokio::join!(task1, task2);
}
```

Expand Down
Loading
Loading