Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eventually-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! [`eventually`]: https://crates.io/crates/eventually

pub mod aggregate;
pub mod projection;
pub mod repository;
pub mod store;
pub mod subscription;
Expand Down
35 changes: 35 additions & 0 deletions eventually-core/src/projection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Contain support for [`Projection`], an optimized read model
//! of an [`Aggregate`] or of a number of `Aggregate`s.
//!
//! More information about projections can be found here:
//! https://eventstore.com/docs/getting-started/projections/index.html
//!
//! [`Projection`]: trait.Projection.html
//! [`Aggregate`]: ../aggregate/trait.Aggregate.html

use crate::store::Persisted;

/// A `Projection` is an optimized read model (or materialized view)
/// of an [`Aggregate`] model(s), that can be assembled by left-folding
/// its previous state and a number of ordered, consecutive events.
///
/// The events passed to a `Projection` have been persisted onto
/// an [`EventStore`] first.
///
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
/// [`EventStore`]: ../store/trait.EventStore.html
pub trait Projection: Default {
/// Type of the Source id, typically an [`AggregateId`].
///
/// [`AggregateId`]: ../aggregate/type.AggregateId.html
type SourceId: Eq;

/// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`].
///
/// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
type Event;

/// Updates the next value of the `Projection` using the provided
/// event value.
fn project(self, event: Persisted<Self::SourceId, Self::Event>) -> Self;
}
10 changes: 8 additions & 2 deletions eventually-test/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pub(crate) async fn full_history(req: Request<AppState>) -> Result<Response, Err
.build())
}

pub(crate) async fn total_orders(req: Request<AppState>) -> Result<Response, Error> {
let state = req.state().total_orders_projection.read().await;

Ok(Response::builder(StatusCode::Ok)
.body(Body::from_json(&*state)?)
.build())
}

pub(crate) async fn history(req: Request<AppState>) -> Result<Response, Error> {
#[derive(Deserialize)]
struct Params {
Expand Down Expand Up @@ -79,8 +87,6 @@ pub(crate) async fn history(req: Request<AppState>) -> Result<Response, Error> {
pub(crate) async fn get_order(req: Request<AppState>) -> Result<Response, Error> {
let id: String = req.param("id")?;

println!("ASD");

let root = req
.state()
.repository
Expand Down
44 changes: 42 additions & 2 deletions eventually-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ mod state;
use std::sync::Arc;

use eventually::aggregate::Optional;
use eventually::inmemory::EventStoreBuilder;
use eventually::inmemory::{EventStoreBuilder, ProjectorBuilder};
use eventually::{AggregateRootBuilder, Repository};

use futures::stream::StreamExt;

use tokio::sync::RwLock;

use crate::config::Config;
use crate::order::OrderAggregate;
use crate::order::{OrderAggregate, TotalOrdersProjection};

pub async fn run(config: Config) -> anyhow::Result<()> {
femme::with_level(config.log_level);
Expand All @@ -33,6 +35,42 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
store.clone(),
)));

// Put the store behind an Arc to allow for clone-ness of a single instance.
let store = Arc::new(store);

// Create a new Projector for the desired projection.
let mut total_orders_projector =
ProjectorBuilder::new(store.clone(), store.clone()).build::<TotalOrdersProjection>();

// Get a watch channel from the Projector: updates to the projector values
// will be sent here.
let mut total_orders_projector_rx = total_orders_projector.watch();

// Keep the projection value in memory.
// We can use it to access it from the context of an endpoint and serialize the read model.
let total_orders_projection = Arc::new(RwLock::new(TotalOrdersProjection::default()));
let total_orders_projection_state = total_orders_projection.clone();

// Spawn a dedicated coroutine to run the projector.
//
// The projector will open its own running subscription, on which
// it will receive all oldest and newest events as they come into the EventStore,
// and it will progressively update the projection as events arrive.
tokio::spawn(async move { total_orders_projector.run().await.expect("should not fail") });

// Spawn a dedicated coroutine to listen to changes to the projection.
//
// In this case we're logging the latest version, but in more advanced
// scenario you might want to do something more with it.
//
// In some cases you might not need to watch the projection changes.
tokio::spawn(async move {
while let Some(total_orders) = total_orders_projector_rx.next().await {
log::info!("Total orders: {:?}", total_orders);
*total_orders_projection_state.write().await = total_orders;
}
});

// Set up the HTTP router.
let mut app = tide::new();

Expand All @@ -41,9 +79,11 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
store,
builder: aggregate_root_builder,
repository,
total_orders_projection,
});

api.at("/history").get(api::full_history);
api.at("/total").get(api::total_orders);

api.at("/:id").get(api::get_order);
api.at("/:id/create").post(api::create_order);
Expand Down
25 changes: 25 additions & 0 deletions eventually-test/src/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,31 @@ use futures::{future, future::BoxFuture};
use serde::{Deserialize, Serialize};

use eventually::optional::Aggregate;
use eventually::store::Persisted;
use eventually::Projection;

#[derive(Debug, Default, Clone, Copy, Serialize)]
pub struct TotalOrdersProjection {
created: u64,
completed: u64,
cancelled: u64,
}

impl Projection for TotalOrdersProjection {
type SourceId = String;
type Event = OrderEvent;

fn project(mut self, event: Persisted<Self::SourceId, Self::Event>) -> Self {
match event.take() {
OrderEvent::Created { .. } => self.created += 1,
OrderEvent::Completed { .. } => self.completed += 1,
OrderEvent::Cancelled { .. } => self.cancelled += 1,
_ => (),
};

self
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderItem {
Expand Down
3 changes: 2 additions & 1 deletion eventually-test/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub(crate) type OrderRepository = Repository<OrderAggregate, OrderStore>;

#[derive(Clone)]
pub(crate) struct AppState {
pub store: OrderStore,
pub store: Arc<OrderStore>,
pub builder: AggregateRootBuilder<OrderAggregate>,
pub repository: Arc<RwLock<OrderRepository>>,
pub total_orders_projection: Arc<RwLock<order::TotalOrdersProjection>>,
}
3 changes: 2 additions & 1 deletion eventually-test/tests/acceptance_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ fn setup() {
let config = Config::init().unwrap();
SERVER_STARTED.store(true, std::sync::atomic::Ordering::SeqCst);

smol::run(eventually_test::run(config));
smol::run(eventually_test::run(config)).expect("don't fail :(");
});
});

// Busy loading :(
while !SERVER_STARTED.load(std::sync::atomic::Ordering::SeqCst) {}
}

Expand Down
2 changes: 1 addition & 1 deletion eventually-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ parking_lot = "0.11.0"
serde = { version = "1.0", features = ["derive"], optional = true }
thiserror = "1.0"
tokio = { version = "0.2", features = ["sync"] }
anyhow = "1.0"

[dev-dependencies]
anyhow = "1.0"
tokio = { version = "0.2", features = ["macros"] }
7 changes: 7 additions & 0 deletions eventually-util/src/inmemory/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Contains supporting entities using an in-memory backend.

mod projector;
mod store;

pub use projector::*;
pub use store::*;
156 changes: 156 additions & 0 deletions eventually-util/src/inmemory/projector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::error::Error as StdError;
use std::fmt::Debug;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use eventually_core::projection::Projection;
use eventually_core::store::{EventStore, Select};
use eventually_core::subscription::EventSubscriber;

use futures::stream::{Stream, StreamExt, TryStreamExt};

use tokio::sync::watch::{channel, Receiver, Sender};

/// Reusable builder for multiple [`Projector`] instances.
///
/// [`Projector`]: struct.Projector.html
pub struct ProjectorBuilder<Store, Subscriber> {
store: Arc<Store>,
subscriber: Arc<Subscriber>,
}

impl<Store, Subscriber> ProjectorBuilder<Store, Subscriber> {
/// Creates a new builder instance using the provided [`EventStore`]
/// and [`EventSubscriber`].
///
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
/// [`EventSubscriber`]: ../../../eventually-core/subscription/trait.EventSubscriber.html
pub fn new(store: Arc<Store>, subscriber: Arc<Subscriber>) -> Self {
Self { store, subscriber }
}

/// Builds a new [`Projector`] for the [`Projection`]
/// specified in the function type.
///
/// [`Projector`]: struct.Projector.html
/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
pub fn build<P>(&self) -> Projector<P, Store, Subscriber>
where
// NOTE: these bounds are required for Projector::run.
P: Projection + Debug + Clone,
Store: EventStore<SourceId = P::SourceId, Event = P::Event>,
Subscriber: EventSubscriber<SourceId = P::SourceId, Event = P::Event>,
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
{
Projector::new(self.store.clone(), self.subscriber.clone())
}
}

/// A `Projector` manages the state of a single [`Projection`]
/// by opening a long-running stream of all events coming from the [`EventStore`].
///
/// New instances of a `Projector` are obtainable through a [`ProjectorBuilder`]
/// instance.
///
/// The `Projector` will start updating the [`Projection`] state when [`run`]
/// is called.
///
/// At each update, the `Projector` will broadcast the latest version of the
/// [`Projection`] on a `Stream` obtainable through [`watch`].
///
/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
/// [`ProjectorBuilder`]: struct.ProjectorBuilder.html
/// [`run`]: struct.Projector.html#method.run
/// [`watch`]: struct.Projector.html#method.watch
pub struct Projector<P, Store, Subscriber>
where
P: Projection,
{
tx: Sender<P>,
rx: Receiver<P>, // Keep the receiver to be able to clone it in watch().
store: Arc<Store>,
subscriber: Arc<Subscriber>,
state: P,
last_sequence_number: AtomicU32,
projection: std::marker::PhantomData<P>,
}

impl<P, Store, Subscriber> Projector<P, Store, Subscriber>
where
P: Projection + Debug + Clone,
Store: EventStore<SourceId = P::SourceId, Event = P::Event>,
Subscriber: EventSubscriber<SourceId = P::SourceId, Event = P::Event>,
// NOTE: these bounds are needed for anyhow::Error conversion.
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
{
fn new(store: Arc<Store>, subscriber: Arc<Subscriber>) -> Self {
let state: P = Default::default();
let (tx, rx) = channel(state.clone());

Self {
tx,
rx,
store,
subscriber,
state,
last_sequence_number: Default::default(),
projection: std::marker::PhantomData,
}
}

/// Provides a `Stream` that receives the latest copy of the `Projection` state.
pub fn watch(&self) -> impl Stream<Item = P> {
self.rx.clone()
}

/// Starts the update of the `Projection` by processing all the events
/// coming from the [`EventStore`].
///
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
pub async fn run(&mut self) -> anyhow::Result<()> {
// Create the Subscription first, so that once the future has been resolved
// we'll start receiving events right away.
//
// This is to avoid losing events when waiting for the one-off stream
// to resolve its future.
//
// The impact is that we _might_ get duplicated events from the one-off stream
// and the subscription stream. Luckily, we can discard those by
// keeping an internal state of the last processed sequence number,
// and discard all those events that are found.
let subscription = self.subscriber.subscribe_all().await?;
let one_off_stream = self.store.stream_all(Select::All).await?;

let mut stream = one_off_stream
.map_err(anyhow::Error::from)
.chain(subscription.map_err(anyhow::Error::from));

while let Some(event) = stream.next().await {
let event = event?;
let expected_sequence_number = self.last_sequence_number.load(Ordering::SeqCst);
let event_sequence_number = event.sequence_number();

if event_sequence_number < expected_sequence_number {
continue; // Duplicated event detected, let's skip it.
}

self.state = P::project(self.state.clone(), event);

self.last_sequence_number.compare_and_swap(
expected_sequence_number,
event_sequence_number,
Ordering::SeqCst,
);

// Notify watchers of the latest projection state.
self.tx.broadcast(self.state.clone()).expect(
"since this struct holds the original receiver, failures should not happen",
);
}

Ok(())
}
}
File renamed without changes.
2 changes: 2 additions & 0 deletions eventually/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub use eventually_core::aggregate::{
Aggregate, AggregateExt, AggregateId, AggregateRoot, AggregateRootBuilder,
};
pub use eventually_core::projection::Projection;
pub use eventually_core::repository::Repository;
pub use eventually_core::store::EventStore;
pub use eventually_core::subscription::EventSubscriber;
pub use eventually_core::versioning::Versioned;

pub mod aggregate {
Expand Down