From 84f98c1eceb34498d777fd111ec510aae221b243 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Thu, 13 Aug 2020 17:51:24 +0200 Subject: [PATCH 1/4] feat(core): add core support for Projections --- eventually-core/src/lib.rs | 1 + eventually-core/src/projection.rs | 35 +++++++++++++++++++++++++++++++ eventually/src/lib.rs | 2 ++ 3 files changed, 38 insertions(+) create mode 100644 eventually-core/src/projection.rs diff --git a/eventually-core/src/lib.rs b/eventually-core/src/lib.rs index b30dd47b..d19cb9ed 100644 --- a/eventually-core/src/lib.rs +++ b/eventually-core/src/lib.rs @@ -11,6 +11,7 @@ //! [`eventually`]: https://crates.io/crates/eventually pub mod aggregate; +pub mod projection; pub mod repository; pub mod store; pub mod subscription; diff --git a/eventually-core/src/projection.rs b/eventually-core/src/projection.rs new file mode 100644 index 00000000..d48183e9 --- /dev/null +++ b/eventually-core/src/projection.rs @@ -0,0 +1,35 @@ +//! Contain support for [`Projection`], an optimized read model +//! of an [`Aggregate`] or of a number of `Aggregate`s. +//! +//! More information about projections can be found here: +//! https://eventstore.com/docs/getting-started/projections/index.html +//! +//! [`Projection`]: trait.Projection.html +//! [`Aggregate`]: ../aggregate/trait.Aggregate.html + +use crate::store::Persisted; + +/// A `Projection` is an optimized read model (or materialized view) +/// of an [`Aggregate`] model(s), that can be assembled by left-folding +/// its previous state and a number of ordered, consecutive events. +/// +/// The events passed to a `Projection` have been persisted onto +/// an [`EventStore`] first. +/// +/// [`Aggregate`]: ../aggregate/trait.Aggregate.html +/// [`EventStore`]: ../store/trait.EventStore.html +pub trait Projection: Default { + /// Type of the Source id, typically an [`AggregateId`]. + /// + /// [`AggregateId`]: ../aggregate/type.AggregateId.html + type SourceId: Eq; + + /// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`]. + /// + /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event + type Event; + + /// Updates the next value of the `Projection` using the provided + /// event value. + fn project(self, event: Persisted) -> Self; +} diff --git a/eventually/src/lib.rs b/eventually/src/lib.rs index 95ee7fce..9938a944 100644 --- a/eventually/src/lib.rs +++ b/eventually/src/lib.rs @@ -1,8 +1,10 @@ pub use eventually_core::aggregate::{ Aggregate, AggregateExt, AggregateId, AggregateRoot, AggregateRootBuilder, }; +pub use eventually_core::projection::Projection; pub use eventually_core::repository::Repository; pub use eventually_core::store::EventStore; +pub use eventually_core::subscription::EventSubscriber; pub use eventually_core::versioning::Versioned; pub mod aggregate { From b04434ee9a6c2928ef9c7afc6a51bd7710bfc56b Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Thu, 13 Aug 2020 17:53:51 +0200 Subject: [PATCH 2/4] feat(inmemory): add Projector and ProjectorBuilder support --- eventually-util/Cargo.toml | 2 +- eventually-util/src/inmemory/mod.rs | 7 + eventually-util/src/inmemory/projector.rs | 129 ++++++++++++++++++ .../src/{inmemory.rs => inmemory/store.rs} | 0 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 eventually-util/src/inmemory/mod.rs create mode 100644 eventually-util/src/inmemory/projector.rs rename eventually-util/src/{inmemory.rs => inmemory/store.rs} (100%) diff --git a/eventually-util/Cargo.toml b/eventually-util/Cargo.toml index 9109c2c6..4a1b3d53 100644 --- a/eventually-util/Cargo.toml +++ b/eventually-util/Cargo.toml @@ -20,7 +20,7 @@ parking_lot = "0.11.0" serde = { version = "1.0", features = ["derive"], optional = true } thiserror = "1.0" tokio = { version = "0.2", features = ["sync"] } +anyhow = "1.0" [dev-dependencies] -anyhow = "1.0" tokio = { version = "0.2", features = ["macros"] } diff --git a/eventually-util/src/inmemory/mod.rs b/eventually-util/src/inmemory/mod.rs new file mode 100644 index 00000000..a6f5599f --- /dev/null +++ b/eventually-util/src/inmemory/mod.rs @@ -0,0 +1,7 @@ +//! Contains supporting entities using an in-memory backend. + +mod projector; +mod store; + +pub use projector::*; +pub use store::*; diff --git a/eventually-util/src/inmemory/projector.rs b/eventually-util/src/inmemory/projector.rs new file mode 100644 index 00000000..cf438510 --- /dev/null +++ b/eventually-util/src/inmemory/projector.rs @@ -0,0 +1,129 @@ +use std::fmt::Debug; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use eventually_core::projection::Projection; +use eventually_core::store::{EventStore, Select}; +use eventually_core::subscription::EventSubscriber; + +use futures::stream::{Stream, StreamExt, TryStreamExt}; + +use tokio::sync::watch::{channel, Receiver, Sender}; + +pub struct ProjectorBuilder { + store: Arc, + subscriber: Arc, +} + +impl ProjectorBuilder { + pub fn new(store: Arc, subscriber: Arc) -> Self { + Self { store, subscriber } + } + + pub fn build

(&self) -> Projector + where + P: Projection + Debug + Clone, + Store: EventStore, + Subscriber: EventSubscriber, + ::Error: std::error::Error + Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, + { + Projector::new(self.store.clone(), self.subscriber.clone()) + } +} + +pub struct Projector +where + P: Projection, +{ + tx: Sender

, + rx: Receiver

, // Keep the receiver to be able to clone it in watch(). + store: Arc, + subscriber: Arc, + state: P, + last_sequence_number: AtomicU32, + projection: std::marker::PhantomData

, +} + +impl Projector +where + P: Projection + Debug + Clone, + Store: EventStore, + Subscriber: EventSubscriber, + // NOTE: this bound is needed to clone the current state for next projection. + // NOTE: these bounds are needed for anyhow::Error conversion. + ::Error: std::error::Error + Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, +{ + fn new(store: Arc, subscriber: Arc) -> Self { + let state: P = Default::default(); + let (tx, rx) = channel(state.clone()); + + Self { + tx, + rx, + store, + subscriber, + state, + last_sequence_number: Default::default(), + projection: std::marker::PhantomData, + } + } + + pub fn watch(&self) -> impl Stream { + self.rx.clone() + } + + pub async fn run(&mut self, select: Select) -> anyhow::Result<()> { + // Create the Subscription first, so that once the future has been resolved + // we'll start receiving events right away. + // + // This is to avoid losing events when waiting for the one-off stream + // to resolve its future. + // + // The impact is that we _might_ get duplicated events from the one-off stream + // and the subscription stream. Luckily, we can discard those by + // keeping an internal state of the last processed sequence number, + // and discard all those events that are found. + let subscription = self.subscriber.subscribe_all().await?; + let one_off_stream = self.store.stream_all(select).await?; + + let mut stream = one_off_stream + .map_err(anyhow::Error::from) + .chain(subscription.map_err(anyhow::Error::from)); + + while let Some(event) = stream.next().await { + let event = event?; + let expected_sequence_number = self.last_sequence_number.load(Ordering::SeqCst); + let event_sequence_number = event.sequence_number(); + + // If some bounds are requested when running the projector, + // make sure the subscription is also upholding the Select operation + // by skipping events with a sequence number we're not interested in. + if let Select::From(v) = select { + if event_sequence_number < v { + continue; + } + } + + if event_sequence_number < expected_sequence_number { + continue; // Duplicated event detected, let's skip it. + } + + self.state = P::project(self.state.clone(), event); + + self.last_sequence_number.compare_and_swap( + expected_sequence_number, + event_sequence_number, + Ordering::SeqCst, + ); + + // Notify watchers of the latest projection state. + self.tx.broadcast(self.state.clone()).expect( + "since this struct holds the original receiver, failures should not happen", + ); + } + + Ok(()) + } +} diff --git a/eventually-util/src/inmemory.rs b/eventually-util/src/inmemory/store.rs similarity index 100% rename from eventually-util/src/inmemory.rs rename to eventually-util/src/inmemory/store.rs From 5ff6178eeb7411722bb9b72c1b204eccfad8bdec Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Thu, 13 Aug 2020 17:54:13 +0200 Subject: [PATCH 3/4] feat(test): add example of TotalOrderProjection --- eventually-test/src/api.rs | 10 ++++- eventually-test/src/lib.rs | 50 ++++++++++++++++++++++- eventually-test/src/order.rs | 25 ++++++++++++ eventually-test/src/state.rs | 3 +- eventually-test/tests/acceptance_tests.rs | 3 +- 5 files changed, 85 insertions(+), 6 deletions(-) diff --git a/eventually-test/src/api.rs b/eventually-test/src/api.rs index b19e601b..fdc08797 100644 --- a/eventually-test/src/api.rs +++ b/eventually-test/src/api.rs @@ -44,6 +44,14 @@ pub(crate) async fn full_history(req: Request) -> Result) -> Result { + let state = req.state().total_orders_projection.read().await; + + Ok(Response::builder(StatusCode::Ok) + .body(Body::from_json(&*state)?) + .build()) +} + pub(crate) async fn history(req: Request) -> Result { #[derive(Deserialize)] struct Params { @@ -79,8 +87,6 @@ pub(crate) async fn history(req: Request) -> Result { pub(crate) async fn get_order(req: Request) -> Result { let id: String = req.param("id")?; - println!("ASD"); - let root = req .state() .repository diff --git a/eventually-test/src/lib.rs b/eventually-test/src/lib.rs index 1bbdd389..4ff56570 100644 --- a/eventually-test/src/lib.rs +++ b/eventually-test/src/lib.rs @@ -6,13 +6,16 @@ mod state; use std::sync::Arc; use eventually::aggregate::Optional; -use eventually::inmemory::EventStoreBuilder; +use eventually::inmemory::{EventStoreBuilder, ProjectorBuilder}; +use eventually::store::Select; use eventually::{AggregateRootBuilder, Repository}; +use futures::stream::StreamExt; + use tokio::sync::RwLock; use crate::config::Config; -use crate::order::OrderAggregate; +use crate::order::{OrderAggregate, TotalOrdersProjection}; pub async fn run(config: Config) -> anyhow::Result<()> { femme::with_level(config.log_level); @@ -33,6 +36,47 @@ pub async fn run(config: Config) -> anyhow::Result<()> { store.clone(), ))); + // Put the store behind an Arc to allow for clone-ness of a single instance. + let store = Arc::new(store); + + // Create a new Projector for the desired projection. + let mut total_orders_projector = + ProjectorBuilder::new(store.clone(), store.clone()).build::(); + + // Get a watch channel from the Projector: updates to the projector values + // will be sent here. + let mut total_orders_projector_rx = total_orders_projector.watch(); + + // Keep the projection value in memory. + // We can use it to access it from the context of an endpoint and serialize the read model. + let total_orders_projection = Arc::new(RwLock::new(TotalOrdersProjection::default())); + let total_orders_projection_state = total_orders_projection.clone(); + + // Spawn a dedicated coroutine to run the projector. + // + // The projector will open its own running subscription, on which + // it will receive all oldest and newest events as they come into the EventStore, + // and it will progressively update the projection as events arrive. + tokio::spawn(async move { + total_orders_projector + .run(Select::All) + .await + .expect("should not fail") + }); + + // Spawn a dedicated coroutine to listen to changes to the projection. + // + // In this case we're logging the latest version, but in more advanced + // scenario you might want to do something more with it. + // + // In some cases you might not need to watch the projection changes. + tokio::spawn(async move { + while let Some(total_orders) = total_orders_projector_rx.next().await { + log::info!("Total orders: {:?}", total_orders); + *total_orders_projection_state.write().await = total_orders; + } + }); + // Set up the HTTP router. let mut app = tide::new(); @@ -41,9 +85,11 @@ pub async fn run(config: Config) -> anyhow::Result<()> { store, builder: aggregate_root_builder, repository, + total_orders_projection, }); api.at("/history").get(api::full_history); + api.at("/total").get(api::total_orders); api.at("/:id").get(api::get_order); api.at("/:id/create").post(api::create_order); diff --git a/eventually-test/src/order.rs b/eventually-test/src/order.rs index c7645aaf..38e9a697 100644 --- a/eventually-test/src/order.rs +++ b/eventually-test/src/order.rs @@ -7,6 +7,31 @@ use futures::{future, future::BoxFuture}; use serde::{Deserialize, Serialize}; use eventually::optional::Aggregate; +use eventually::store::Persisted; +use eventually::Projection; + +#[derive(Debug, Default, Clone, Copy, Serialize)] +pub struct TotalOrdersProjection { + created: u64, + completed: u64, + cancelled: u64, +} + +impl Projection for TotalOrdersProjection { + type SourceId = String; + type Event = OrderEvent; + + fn project(mut self, event: Persisted) -> Self { + match event.take() { + OrderEvent::Created { .. } => self.created += 1, + OrderEvent::Completed { .. } => self.completed += 1, + OrderEvent::Cancelled { .. } => self.cancelled += 1, + _ => (), + }; + + self + } +} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrderItem { diff --git a/eventually-test/src/state.rs b/eventually-test/src/state.rs index 73f38b09..dad4fb15 100644 --- a/eventually-test/src/state.rs +++ b/eventually-test/src/state.rs @@ -14,7 +14,8 @@ pub(crate) type OrderRepository = Repository; #[derive(Clone)] pub(crate) struct AppState { - pub store: OrderStore, + pub store: Arc, pub builder: AggregateRootBuilder, pub repository: Arc>, + pub total_orders_projection: Arc>, } diff --git a/eventually-test/tests/acceptance_tests.rs b/eventually-test/tests/acceptance_tests.rs index 358ef6db..d3367d8b 100644 --- a/eventually-test/tests/acceptance_tests.rs +++ b/eventually-test/tests/acceptance_tests.rs @@ -23,10 +23,11 @@ fn setup() { let config = Config::init().unwrap(); SERVER_STARTED.store(true, std::sync::atomic::Ordering::SeqCst); - smol::run(eventually_test::run(config)); + smol::run(eventually_test::run(config)).expect("don't fail :("); }); }); + // Busy loading :( while !SERVER_STARTED.load(std::sync::atomic::Ordering::SeqCst) {} } From d5c0b382f28d2c9a046980b19042efb56b9604cb Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Fri, 14 Aug 2020 09:06:33 +0200 Subject: [PATCH 4/4] chore(inmemory): add documentation for inmemory::projector --- eventually-test/src/lib.rs | 8 +-- eventually-util/src/inmemory/projector.rs | 59 +++++++++++++++++------ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/eventually-test/src/lib.rs b/eventually-test/src/lib.rs index 4ff56570..0bc929be 100644 --- a/eventually-test/src/lib.rs +++ b/eventually-test/src/lib.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use eventually::aggregate::Optional; use eventually::inmemory::{EventStoreBuilder, ProjectorBuilder}; -use eventually::store::Select; use eventually::{AggregateRootBuilder, Repository}; use futures::stream::StreamExt; @@ -57,12 +56,7 @@ pub async fn run(config: Config) -> anyhow::Result<()> { // The projector will open its own running subscription, on which // it will receive all oldest and newest events as they come into the EventStore, // and it will progressively update the projection as events arrive. - tokio::spawn(async move { - total_orders_projector - .run(Select::All) - .await - .expect("should not fail") - }); + tokio::spawn(async move { total_orders_projector.run().await.expect("should not fail") }); // Spawn a dedicated coroutine to listen to changes to the projection. // diff --git a/eventually-util/src/inmemory/projector.rs b/eventually-util/src/inmemory/projector.rs index cf438510..1123fbf1 100644 --- a/eventually-util/src/inmemory/projector.rs +++ b/eventually-util/src/inmemory/projector.rs @@ -1,3 +1,4 @@ +use std::error::Error as StdError; use std::fmt::Debug; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -10,28 +11,59 @@ use futures::stream::{Stream, StreamExt, TryStreamExt}; use tokio::sync::watch::{channel, Receiver, Sender}; +/// Reusable builder for multiple [`Projector`] instances. +/// +/// [`Projector`]: struct.Projector.html pub struct ProjectorBuilder { store: Arc, subscriber: Arc, } impl ProjectorBuilder { + /// Creates a new builder instance using the provided [`EventStore`] + /// and [`EventSubscriber`]. + /// + /// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html + /// [`EventSubscriber`]: ../../../eventually-core/subscription/trait.EventSubscriber.html pub fn new(store: Arc, subscriber: Arc) -> Self { Self { store, subscriber } } + /// Builds a new [`Projector`] for the [`Projection`] + /// specified in the function type. + /// + /// [`Projector`]: struct.Projector.html + /// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html pub fn build

(&self) -> Projector where + // NOTE: these bounds are required for Projector::run. P: Projection + Debug + Clone, Store: EventStore, Subscriber: EventSubscriber, - ::Error: std::error::Error + Send + Sync + 'static, - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: StdError + Send + Sync + 'static, + ::Error: StdError + Send + Sync + 'static, { Projector::new(self.store.clone(), self.subscriber.clone()) } } +/// A `Projector` manages the state of a single [`Projection`] +/// by opening a long-running stream of all events coming from the [`EventStore`]. +/// +/// New instances of a `Projector` are obtainable through a [`ProjectorBuilder`] +/// instance. +/// +/// The `Projector` will start updating the [`Projection`] state when [`run`] +/// is called. +/// +/// At each update, the `Projector` will broadcast the latest version of the +/// [`Projection`] on a `Stream` obtainable through [`watch`]. +/// +/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html +/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html +/// [`ProjectorBuilder`]: struct.ProjectorBuilder.html +/// [`run`]: struct.Projector.html#method.run +/// [`watch`]: struct.Projector.html#method.watch pub struct Projector where P: Projection, @@ -50,10 +82,9 @@ where P: Projection + Debug + Clone, Store: EventStore, Subscriber: EventSubscriber, - // NOTE: this bound is needed to clone the current state for next projection. // NOTE: these bounds are needed for anyhow::Error conversion. - ::Error: std::error::Error + Send + Sync + 'static, - ::Error: std::error::Error + Send + Sync + 'static, + ::Error: StdError + Send + Sync + 'static, + ::Error: StdError + Send + Sync + 'static, { fn new(store: Arc, subscriber: Arc) -> Self { let state: P = Default::default(); @@ -70,11 +101,16 @@ where } } + /// Provides a `Stream` that receives the latest copy of the `Projection` state. pub fn watch(&self) -> impl Stream { self.rx.clone() } - pub async fn run(&mut self, select: Select) -> anyhow::Result<()> { + /// Starts the update of the `Projection` by processing all the events + /// coming from the [`EventStore`]. + /// + /// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html + pub async fn run(&mut self) -> anyhow::Result<()> { // Create the Subscription first, so that once the future has been resolved // we'll start receiving events right away. // @@ -86,7 +122,7 @@ where // keeping an internal state of the last processed sequence number, // and discard all those events that are found. let subscription = self.subscriber.subscribe_all().await?; - let one_off_stream = self.store.stream_all(select).await?; + let one_off_stream = self.store.stream_all(Select::All).await?; let mut stream = one_off_stream .map_err(anyhow::Error::from) @@ -97,15 +133,6 @@ where let expected_sequence_number = self.last_sequence_number.load(Ordering::SeqCst); let event_sequence_number = event.sequence_number(); - // If some bounds are requested when running the projector, - // make sure the subscription is also upholding the Select operation - // by skipping events with a sequence number we're not interested in. - if let Select::From(v) = select { - if event_sequence_number < v { - continue; - } - } - if event_sequence_number < expected_sequence_number { continue; // Duplicated event detected, let's skip it. }