diff --git a/eventually-core/src/store.rs b/eventually-core/src/store.rs index afe1999d..9ecd068f 100644 --- a/eventually-core/src/store.rs +++ b/eventually-core/src/store.rs @@ -11,171 +11,6 @@ use serde::{Deserialize, Serialize}; use crate::versioning::Versioned; -/// Contains a type-state builder for [`PersistentEvent`] type. -/// -/// [`PersistentEvent`]: struct.PersistedEvent.html -pub mod persistent { - /// Creates a new [`PersistedEvent`] by wrapping an Event value. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - pub struct EventBuilder { - pub(super) event: T, - pub(super) source_id: SourceId, - } - - impl From<(SourceId, T)> for EventBuilder { - #[inline] - fn from(value: (SourceId, T)) -> Self { - let (source_id, event) = value; - Self { source_id, event } - } - } - - impl EventBuilder { - /// Specifies the [`PersistentEvent`] version and moves to the next - /// builder state. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - #[inline] - pub fn version(self, value: u32) -> EventBuilderWithVersion { - EventBuilderWithVersion { - version: value, - event: self.event, - source_id: self.source_id, - } - } - - /// Specifies the [`PersistentEvent`] sequence number and moves to the next - /// builder state. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - #[inline] - pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber { - EventBuilderWithSequenceNumber { - sequence_number: value, - event: self.event, - source_id: self.source_id, - } - } - } - - /// Next step in creating a new [`PersistedEvent`] carrying an Event value - /// and its version. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - pub struct EventBuilderWithVersion { - version: u32, - event: T, - source_id: SourceId, - } - - impl EventBuilderWithVersion { - /// Specifies the [`PersistentEvent`] sequence number and moves to the next - /// builder state. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - #[inline] - pub fn sequence_number(self, value: u32) -> super::PersistedEvent { - super::PersistedEvent { - version: self.version, - event: self.event, - source_id: self.source_id, - sequence_number: value, - } - } - } - - /// Next step in creating a new [`PersistedEvent`] carrying an Event value - /// and its sequence number. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - pub struct EventBuilderWithSequenceNumber { - sequence_number: u32, - event: T, - source_id: SourceId, - } - - impl EventBuilderWithSequenceNumber { - /// Specifies the [`PersistentEvent`] version and moves to the next - /// builder state. - /// - /// [`PersistentEvent`]: ../struct.PersistedEvent.html - #[inline] - pub fn version(self, value: u32) -> super::PersistedEvent { - super::PersistedEvent { - version: value, - event: self.event, - source_id: self.source_id, - sequence_number: self.sequence_number, - } - } - } -} - -/// An [`Event`] wrapper for events that have been -/// successfully committed to the [`EventStore`]. -/// -/// [`EventStream`]s are composed of these events. -/// -/// [`Event`]: trait.EventStore.html#associatedtype.Event -/// [`EventStream`]: type.EventStream.html -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct PersistedEvent { - source_id: SourceId, - version: u32, - sequence_number: u32, - #[serde(flatten)] - event: T, -} - -impl Versioned for PersistedEvent { - #[inline] - fn version(&self) -> u32 { - self.version - } -} - -impl Deref for PersistedEvent { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.event - } -} - -impl PersistedEvent { - /// Creates a new [`EventBuilder`] from the provided Event value. - /// - /// [`EventBuilder`]: persistent/struct.EventBuilder.html - #[inline] - pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder { - persistent::EventBuilder { source_id, event } - } - - /// Returns the event sequence number. - #[inline] - pub fn sequence_number(&self) -> u32 { - self.sequence_number - } - - /// Returns the [`SourceId`] of the persisted event. - /// - /// [`SourceId`]: trait.EventStore.html#associatedType.SourceId - #[inline] - pub fn source_id(&self) -> &SourceId { - &self.source_id - } - - /// Unwraps the inner [`Event`] from the `PersistedEvent` wrapper. - /// - /// [`Event`]: trait.EventStore.html#associatedtype.Event - #[inline] - pub fn take(self) -> T { - self.event - } -} - /// Selection operation for the events to capture in an [`EventStream`]. /// /// [`EventStream`]: type.EventStream.html @@ -223,7 +58,7 @@ pub enum Expected { pub type EventStream<'a, S> = BoxStream< 'a, Result< - PersistedEvent<::SourceId, ::Event>, + Persisted<::SourceId, ::Event>, ::Error, >, >; @@ -338,3 +173,168 @@ pub trait EventStore { /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture>; } + +/// An [`Event`] wrapper for events that have been +/// successfully committed to the [`EventStore`]. +/// +/// [`EventStream`]s are composed of these events. +/// +/// [`Event`]: trait.EventStore.html#associatedtype.Event +/// [`EventStream`]: type.EventStream.html +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct Persisted { + source_id: SourceId, + version: u32, + sequence_number: u32, + #[serde(flatten)] + event: T, +} + +impl Versioned for Persisted { + #[inline] + fn version(&self) -> u32 { + self.version + } +} + +impl Deref for Persisted { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +impl Persisted { + /// Creates a new [`EventBuilder`] from the provided Event value. + /// + /// [`EventBuilder`]: persistent/struct.EventBuilder.html + #[inline] + pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder { + persistent::EventBuilder { source_id, event } + } + + /// Returns the event sequence number. + #[inline] + pub fn sequence_number(&self) -> u32 { + self.sequence_number + } + + /// Returns the [`SourceId`] of the persisted event. + /// + /// [`SourceId`]: trait.EventStore.html#associatedType.SourceId + #[inline] + pub fn source_id(&self) -> &SourceId { + &self.source_id + } + + /// Unwraps the inner [`Event`] from the `Persisted` wrapper. + /// + /// [`Event`]: trait.EventStore.html#associatedtype.Event + #[inline] + pub fn take(self) -> T { + self.event + } +} + +/// Contains a type-state builder for [`PersistentEvent`] type. +/// +/// [`PersistentEvent`]: struct.Persisted.html +pub mod persistent { + /// Creates a new [`Persisted`] by wrapping an Event value. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + pub struct EventBuilder { + pub(super) event: T, + pub(super) source_id: SourceId, + } + + impl From<(SourceId, T)> for EventBuilder { + #[inline] + fn from(value: (SourceId, T)) -> Self { + let (source_id, event) = value; + Self { source_id, event } + } + } + + impl EventBuilder { + /// Specifies the [`PersistentEvent`] version and moves to the next + /// builder state. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + #[inline] + pub fn version(self, value: u32) -> EventBuilderWithVersion { + EventBuilderWithVersion { + version: value, + event: self.event, + source_id: self.source_id, + } + } + + /// Specifies the [`PersistentEvent`] sequence number and moves to the next + /// builder state. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + #[inline] + pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber { + EventBuilderWithSequenceNumber { + sequence_number: value, + event: self.event, + source_id: self.source_id, + } + } + } + + /// Next step in creating a new [`Persisted`] carrying an Event value + /// and its version. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + pub struct EventBuilderWithVersion { + version: u32, + event: T, + source_id: SourceId, + } + + impl EventBuilderWithVersion { + /// Specifies the [`PersistentEvent`] sequence number and moves to the next + /// builder state. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + #[inline] + pub fn sequence_number(self, value: u32) -> super::Persisted { + super::Persisted { + version: self.version, + event: self.event, + source_id: self.source_id, + sequence_number: value, + } + } + } + + /// Next step in creating a new [`Persisted`] carrying an Event value + /// and its sequence number. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + pub struct EventBuilderWithSequenceNumber { + sequence_number: u32, + event: T, + source_id: SourceId, + } + + impl EventBuilderWithSequenceNumber { + /// Specifies the [`PersistentEvent`] version and moves to the next + /// builder state. + /// + /// [`PersistentEvent`]: ../struct.Persisted.html + #[inline] + pub fn version(self, value: u32) -> super::Persisted { + super::Persisted { + version: value, + event: self.event, + source_id: self.source_id, + sequence_number: self.sequence_number, + } + } + } +} diff --git a/eventually-postgres/src/lib.rs b/eventually-postgres/src/lib.rs index 93730684..4f409f83 100644 --- a/eventually-postgres/src/lib.rs +++ b/eventually-postgres/src/lib.rs @@ -37,7 +37,7 @@ //! // Use an EventStoreBuilder to build multiple EventStore instances. //! let builder = EventStoreBuilder::migrate_database(&mut client) //! .await? -//! .new(Arc::new(client)); +//! .builder(Arc::new(client)); //! //! // Event store for the events. //! // @@ -58,7 +58,7 @@ use std::convert::TryFrom; use std::fmt::Display; use std::sync::Arc; -use eventually::store::{AppendError, EventStream, Expected, PersistedEvent, Select}; +use eventually::store::{AppendError, EventStream, Expected, Persisted, Select}; use eventually::{Aggregate, AggregateId}; use futures::future::BoxFuture; @@ -86,6 +86,15 @@ pub type Result = std::result::Result; /// [`EventStore`]: struct.EventStore.html #[derive(Debug, thiserror::Error)] pub enum Error { + /// Error when decoding persistent events from the database + /// back into the application during a [`stream`] + /// or [`stream_all`] operation. + /// + /// [`stream`]: struct.EventStore.html#method.stream + /// [`stream_all`]: struct.EventStore.html#method.stream_all + #[error("store failed to decode event from the database: {0}")] + DecodeEvent(#[source] anyhow::Error), + /// Error when encoding the events in [`append`] to JSON prior /// to sending them to the database. /// @@ -106,21 +115,21 @@ impl AppendError for Error { } } -const APPEND: &'static str = "SELECT * FROM append_to_store($1::text, $2::text, $3, $4, $5)"; +const APPEND: &str = "SELECT * FROM append_to_store($1::text, $2::text, $3, $4, $5)"; -const CREATE_AGGREGATE_TYPE: &'static str = "SELECT * FROM create_aggregate_type($1::text)"; +const CREATE_AGGREGATE_TYPE: &str = "SELECT * FROM create_aggregate_type($1::text)"; -const STREAM: &'static str = "SELECT e.* +const STREAM: &str = "SELECT e.* FROM events e LEFT JOIN aggregates a ON a.id = e.aggregate_id WHERE a.aggregate_type_id = $1 AND e.aggregate_id = $2 AND e.version >= $3 ORDER BY version ASC"; -const STREAM_ALL: &'static str = "SELECT e.* +const STREAM_ALL: &str = "SELECT e.* FROM events e LEFT JOIN aggregates a ON a.id = e.aggregate_id WHERE a.aggregate_type_id = $1 AND e.sequence_number >= $2 ORDER BY e.sequence_number ASC"; -const REMOVE: &'static str = "DELETE FROM aggregates WHERE aggregate_type_id = $1 AND id = $2"; +const REMOVE: &str = "DELETE FROM aggregates WHERE aggregate_type_id = $1 AND id = $2"; /// Builder type for [`EventStore`] instances. /// @@ -139,7 +148,7 @@ impl EventStoreBuilder { } /// Returns a new builder instance after migrations have been completed. - pub fn new(self, client: Arc) -> EventStoreBuilderMigrated { + pub fn builder(self, client: Arc) -> EventStoreBuilderMigrated { EventStoreBuilderMigrated { client } } } @@ -176,40 +185,8 @@ impl EventStoreBuilderMigrated { } /// Creates a new [`EventStore`] for an [`Aggregate`] type. - /// Check out [`build`] for more information. - /// - /// ## Usage - /// - /// ```text - /// // Open a connection with Postgres. - /// let (client, connection) = - /// tokio_postgres::connect("postgres://user@pass:localhost:5432/db", tokio_postgres::NoTls) - /// .await - /// .map_err(|err| { - /// eprintln!("failed to connect to Postgres: {}", err); - /// err - /// })?; - /// - /// // The connection, responsible for the actual IO, must be handled by a different - /// // execution context. - /// tokio::spawn(async move { - /// if let Err(e) = connection.await { - /// eprintln!("connection error: {}", e); - /// } - /// }); - /// - /// // Use an EventStoreBuilder to build multiple EventStore instances. - /// let builder = EventStoreBuilder::from(Arc::new(RwLock::new(client))); - /// - /// let aggregate = SomeAggregate; /// - /// // Event store for the events. - /// let store = { - /// let store = builder.aggregate_stream(&aggregate, "orders"); - /// store.create_stream().await?; - /// store - /// }; - /// ``` + /// Check out [`build`] for more information. /// /// [`EventStore`]: struct.EventStore.html /// [`Aggregate`]: ../../eventually_core/aggregate/trait.Aggregate.html @@ -234,6 +211,15 @@ impl EventStoreBuilderMigrated { /// Check out [`EventStoreBuilder`] for examples to how initialize new /// instances of this type. /// +/// ### Considerations on the `Id` type +/// +/// The `Id` type supplied to the `EventStore` has to be +/// able to `to_string()` (so implement the `std::fmt::Display` trait) +/// and to be parsed from a `String` (so to implement the `std::convert::TryFrom` trait). +/// +/// The error for the `TryFrom` conversion has to be a `std::error::Error`, +/// so as to be able to map it into an `anyhow::Error` generic error value. +/// /// [`EventStore`]: ../../eventually_core/store/trait.EventStore.html /// [`EventStoreBuilder`]: ../../eventually_core/store/trait.EventStoreBuilder.html #[derive(Debug, Clone)] @@ -246,8 +232,10 @@ pub struct EventStore { impl eventually::EventStore for EventStore where - // TODO: remove this Infallible error here and use a proper one. - Id: TryFrom + Display + Eq + Send + Sync, + Id: TryFrom + Display + Eq + Send + Sync, + // This bound is for the translation into an anyhow::Error. + >::Error: std::error::Error + Send + Sync + 'static, + >::Error: Into, Event: Serialize + Send + Sync, for<'de> Event: Deserialize<'de>, { @@ -339,7 +327,9 @@ impl EventStore { impl EventStore where - Id: TryFrom + Display + Eq + Send + Sync, + Id: TryFrom + Display + Eq + Send + Sync, + // This bound is for the translation into an anyhow::Error. + >::Error: std::error::Error + Send + Sync + 'static, Event: Serialize + Send + Sync, for<'de> Event: Deserialize<'de>, { @@ -349,20 +339,39 @@ where .query_raw(query, slice_iter(params)) .await .map_err(Error::from)? + .map_err(Error::from) .and_then(|row| async move { - // FIXME: can we remove this unwrap here? - let event: Event = serde_json::from_value(row.try_get("event")?).unwrap(); - - let id: String = row.try_get("aggregate_id")?; - let version: i32 = row.try_get("version")?; - let sequence_number: i64 = row.try_get("sequence_number")?; - - // FIXME: can we also remove this unwrap here? - Ok(PersistedEvent::from(Id::try_from(id).unwrap(), event) + let event: Event = serde_json::from_value( + row.try_get("event") + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?, + ) + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?; + + let id: String = row + .try_get("aggregate_id") + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?; + + let version: i32 = row + .try_get("version") + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?; + + let sequence_number: i64 = row + .try_get("sequence_number") + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?; + + let id = Id::try_from(id) + .map_err(anyhow::Error::from) + .map_err(Error::DecodeEvent)?; + + Ok(Persisted::from(id, event) .version(version as u32) .sequence_number(sequence_number as u32)) }) - .map_err(Error::from) .boxed()) } } diff --git a/eventually-test/src/api.rs b/eventually-test/src/api.rs index 9e36736e..b19e601b 100644 --- a/eventually-test/src/api.rs +++ b/eventually-test/src/api.rs @@ -1,8 +1,9 @@ use chrono::{DateTime, Utc}; -use eventually::store::{PersistedEvent, Select}; +use eventually::store::{Persisted, Select}; use eventually::EventStore; +use futures::future::ready; use futures::TryStreamExt; use serde::Deserialize; @@ -21,16 +22,16 @@ pub(crate) async fn full_history(req: Request) -> Result> = req + let mut stream: Vec> = req .state() .store .stream_all(Select::All) .await .map_err(Error::from)? .try_filter(|event| { - futures::future::ready(match from { + ready(match from { None => true, - Some(from) => event.happened_at() >= &from, + Some(ref from) => event.happened_at() >= from, }) }) .try_collect() @@ -53,7 +54,7 @@ pub(crate) async fn history(req: Request) -> Result { let params: Params = req.query()?; let from = params.from; - let mut stream: Vec> = req + let mut stream: Vec> = req .state() .store .stream(id, Select::All) diff --git a/eventually-util/src/inmemory.rs b/eventually-util/src/inmemory.rs index 5430a61a..587bbca6 100644 --- a/eventually-util/src/inmemory.rs +++ b/eventually-util/src/inmemory.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use eventually_core::aggregate::Aggregate; use eventually_core::store::persistent::EventBuilderWithVersion; -use eventually_core::store::{AppendError, EventStream, Expected, PersistedEvent, Select}; +use eventually_core::store::{AppendError, EventStream, Expected, Persisted, Select}; use eventually_core::versioning::Versioned; use futures::future::BoxFuture; @@ -67,7 +67,7 @@ where Id: Hash + Eq, { global_offset: Arc, - backend: Arc>>>>, + backend: Arc>>>>, } impl Default for EventStore @@ -113,7 +113,7 @@ where } } - let mut persisted_events: Vec> = + let mut persisted_events: Vec> = into_persisted_events(expected, id.clone(), events) .into_iter() .map(|event| { @@ -124,7 +124,7 @@ where let last_version = persisted_events .last() - .map(PersistedEvent::version) + .map(Persisted::version) .unwrap_or(expected); self.backend @@ -163,7 +163,7 @@ where } fn stream_all(&self, select: Select) -> BoxFuture, Self::Error>> { - let mut events: Vec> = self + let mut events: Vec> = self .backend .read() .values() @@ -201,16 +201,14 @@ where events .into_iter() .enumerate() - .map(|(i, event)| { - PersistedEvent::from(id.clone(), event).version(last_version + (i as u32) + 1) - }) + .map(|(i, event)| Persisted::from(id.clone(), event).version(last_version + (i as u32) + 1)) .collect() } #[cfg(test)] mod tests { use super::{Error, EventStore as InMemoryStore}; - use eventually_core::store::{EventStore, Expected, PersistedEvent, Select}; + use eventually_core::store::{EventStore, Expected, Persisted, Select}; use futures::TryStreamExt; @@ -318,21 +316,11 @@ mod tests { assert_eq!( block_on(stream_to_vec(&store, id, Select::All)).unwrap(), vec![ - PersistedEvent::from(id, Event::A) - .version(1) - .sequence_number(0), - PersistedEvent::from(id, Event::B) - .version(2) - .sequence_number(1), - PersistedEvent::from(id, Event::C) - .version(3) - .sequence_number(2), - PersistedEvent::from(id, Event::B) - .version(4) - .sequence_number(3), - PersistedEvent::from(id, Event::A) - .version(5) - .sequence_number(4) + Persisted::from(id, Event::A).version(1).sequence_number(0), + Persisted::from(id, Event::B).version(2).sequence_number(1), + Persisted::from(id, Event::C).version(3).sequence_number(2), + Persisted::from(id, Event::B).version(4).sequence_number(3), + Persisted::from(id, Event::A).version(5).sequence_number(4) ] ); @@ -340,12 +328,8 @@ mod tests { assert_eq!( block_on(stream_to_vec(&store, id, Select::From(4))).unwrap(), vec![ - PersistedEvent::from(id, Event::B) - .version(4) - .sequence_number(3), - PersistedEvent::from(id, Event::A) - .version(5) - .sequence_number(4) + Persisted::from(id, Event::B).version(4).sequence_number(3), + Persisted::from(id, Event::A).version(5).sequence_number(4) ] ); @@ -375,7 +359,7 @@ mod tests { assert!(result.is_ok()); // Stream from the start. - let result: anyhow::Result>> = block_on(async { + let result: anyhow::Result>> = block_on(async { store .stream_all(Select::All) .await @@ -390,16 +374,16 @@ mod tests { assert_eq!( vec![ - PersistedEvent::from(id_1, Event::A) + Persisted::from(id_1, Event::A) .version(1) .sequence_number(0), - PersistedEvent::from(id_2, Event::B) + Persisted::from(id_2, Event::B) .version(1) .sequence_number(1), - PersistedEvent::from(id_1, Event::C) + Persisted::from(id_1, Event::C) .version(2) .sequence_number(2), - PersistedEvent::from(id_2, Event::A) + Persisted::from(id_2, Event::A) .version(2) .sequence_number(3) ], @@ -407,7 +391,7 @@ mod tests { ); // Stream from a specified sequence number. - let result: anyhow::Result>> = block_on(async { + let result: anyhow::Result>> = block_on(async { store .stream_all(Select::From(2)) .await @@ -422,10 +406,10 @@ mod tests { assert_eq!( vec![ - PersistedEvent::from(id_1, Event::C) + Persisted::from(id_1, Event::C) .version(2) .sequence_number(2), - PersistedEvent::from(id_2, Event::A) + Persisted::from(id_2, Event::A) .version(2) .sequence_number(3) ], @@ -437,7 +421,7 @@ mod tests { store: &InMemoryStore<&'static str, Event>, id: &'static str, select: Select, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { store .stream(id, select) .await