From 572541b99e10d2baf5b32f1c136d6f492f837d3a Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Tue, 11 Aug 2020 10:37:05 +0200 Subject: [PATCH 1/3] feat(postgres): add migrations --- .../src/migrations/V1__event_store_tables.sql | 24 +++++++ .../migrations/V2__create_aggregate_type.sql | 9 +++ .../src/migrations/V3__append_to_store.sql | 66 +++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 eventually-postgres/src/migrations/V1__event_store_tables.sql create mode 100644 eventually-postgres/src/migrations/V2__create_aggregate_type.sql create mode 100644 eventually-postgres/src/migrations/V3__append_to_store.sql diff --git a/eventually-postgres/src/migrations/V1__event_store_tables.sql b/eventually-postgres/src/migrations/V1__event_store_tables.sql new file mode 100644 index 00000000..ee5386ed --- /dev/null +++ b/eventually-postgres/src/migrations/V1__event_store_tables.sql @@ -0,0 +1,24 @@ +CREATE TABLE aggregate_types ( + id TEXT PRIMARY KEY, + "offset" BIGINT NOT NULL DEFAULT -1 +); + +CREATE TABLE aggregates ( + id TEXT PRIMARY KEY, + aggregate_type_id TEXT NOT NULL, + "version" INTEGER NOT NULL DEFAULT 0, + + -- Remove all aggregates in case the aggregate type is deleted. + FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types(id) ON DELETE CASCADE +); + +CREATE TABLE events ( + aggregate_id TEXT NOT NULL, + "version" INTEGER NOT NULL, + sequence_number BIGINT NOT NULL, + "event" JSONB NOT NULL, + + PRIMARY KEY (aggregate_id, "version"), + -- Remove all the events of the aggregate in case of delete. + FOREIGN KEY (aggregate_id) REFERENCES aggregates(id) ON DELETE CASCADE +); diff --git a/eventually-postgres/src/migrations/V2__create_aggregate_type.sql b/eventually-postgres/src/migrations/V2__create_aggregate_type.sql new file mode 100644 index 00000000..518f605c --- /dev/null +++ b/eventually-postgres/src/migrations/V2__create_aggregate_type.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION create_aggregate_type(aggregate_type TEXT) +RETURNS VOID +AS $$ + + INSERT INTO aggregate_types (id) VALUES (aggregate_type) + ON CONFLICT (id) + DO NOTHING; + +$$ LANGUAGE SQL; diff --git a/eventually-postgres/src/migrations/V3__append_to_store.sql b/eventually-postgres/src/migrations/V3__append_to_store.sql new file mode 100644 index 00000000..5b1c3a2d --- /dev/null +++ b/eventually-postgres/src/migrations/V3__append_to_store.sql @@ -0,0 +1,66 @@ +CREATE OR REPLACE FUNCTION append_to_store( + aggregate_type_id TEXT, + aggregate_id TEXT, + current_version INTEGER, + perform_version_check BOOLEAN, + events JSONB[] +) RETURNS TABLE ( + "version" INTEGER, + sequence_number BIGINT +) AS $$ +DECLARE + aggregate_version INTEGER; + sequence_number BIGINT; + "event" JSONB; +BEGIN + + -- Retrieve the global offset value from the aggregate_type. + SELECT "offset" INTO sequence_number FROM aggregate_types WHERE id = aggregate_type_id; + IF NOT FOUND THEN + RAISE EXCEPTION 'invalid aggregate type provided: %', aggregate_type_id; + END IF; + + -- Retrieve the latest aggregate version for the specified aggregate id. + SELECT aggregates."version" INTO aggregate_version FROM aggregates WHERE id = aggregate_id; + IF NOT FOUND THEN + -- Add the initial aggregate representation inside the `aggregates` table. + INSERT INTO aggregates (id, aggregate_type_id) + VALUES (aggregate_id, aggregate_type_id); + + -- Make sure to initialize the aggregate version in case + aggregate_version = 0; + END IF; + + -- Perform optimistic concurrency check. + IF perform_version_check AND aggregate_version <> current_version THEN + RAISE EXCEPTION 'invalid aggregate version provided: %, expected: %', current_version, aggregate_version; + END IF; + + FOREACH "event" IN ARRAY events + LOOP + -- Increment the aggregate version prior to inserting the new event. + aggregate_version = aggregate_version + 1; + -- Increment the new sequence number value. + sequence_number = sequence_number + 1; + + -- Insert the event into the events table. + -- Version numbers should start from 1; sequence numbers should start from 0. + INSERT INTO events (aggregate_id, "version", sequence_number, "event") + VALUES (aggregate_id, aggregate_version, sequence_number, "event"); + END LOOP; + + -- Update the aggregate with the latest version computed. + UPDATE aggregates + set "version" = aggregate_version + WHERE aggregate_id = aggregate_id; + + -- Update the global offset with the latest sequence number. + UPDATE aggregate_types + SET "offset" = sequence_number + WHERE aggregate_type_id = aggregate_type_id; + + RETURN QUERY + SELECT aggregate_version, sequence_number; + +END; +$$ LANGUAGE PLPGSQL; From 19dbd93307f20bd9f195ac8ae3de3a84c02e2828 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Tue, 11 Aug 2020 10:37:56 +0200 Subject: [PATCH 2/3] chore(postgres): add refinery dependency --- eventually-postgres/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eventually-postgres/Cargo.toml b/eventually-postgres/Cargo.toml index f771d50e..f572352b 100644 --- a/eventually-postgres/Cargo.toml +++ b/eventually-postgres/Cargo.toml @@ -17,6 +17,7 @@ eventually = { version = "0.4", path = "../eventually", features = ["serde"] } futures = "0.3" serde = "1.0" serde_json = "1.0" -tokio = { version = "0.2", features = ["sync"] } tokio-postgres = { version = "0.5", features = ["with-serde_json-1"] } thiserror = "1.0" +refinery = { version = "0.3.0", features = ["tokio-postgres"] } +anyhow = "1.0.32" From 3a77c1d072ce178ab9f5492d34e421d0d554c015 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Tue, 11 Aug 2020 10:38:21 +0200 Subject: [PATCH 3/3] feat(postgres): overhaul the library --- eventually-postgres/src/lib.rs | 344 +++++++++++++++++++-------------- 1 file changed, 194 insertions(+), 150 deletions(-) diff --git a/eventually-postgres/src/lib.rs b/eventually-postgres/src/lib.rs index 9f3ecfae..93730684 100644 --- a/eventually-postgres/src/lib.rs +++ b/eventually-postgres/src/lib.rs @@ -35,14 +35,17 @@ //! struct SomeEvent; //! //! // Use an EventStoreBuilder to build multiple EventStore instances. -//! let builder = EventStoreBuilder::from(Arc::new(RwLock::new(client))); +//! let builder = EventStoreBuilder::migrate_database(&mut client) +//! .await? +//! .new(Arc::new(client)); //! //! // Event store for the events. -//! let store = { -//! let store = builder.event_stream::("orders"); -//! store.create_stream().await?; -//! store -//! }; +//! // +//! // When building an new EventStore instance, a type name is always needed +//! // to distinguish between different aggregates. +//! // +//! // You can also use std::any::type_name for that. +//! let store = builder::build::("aggregate-name").await? //! //! # Ok(()) //! # } @@ -63,22 +66,39 @@ use futures::stream::{StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; - use tokio_postgres::types::ToSql; -use tokio_postgres::{Client, Error}; +use tokio_postgres::Client; -use thiserror::Error; +/// Embedded migrations module. +mod embedded { + use refinery::embed_migrations; + embed_migrations!("src/migrations"); +} + +/// Result returning the crate [`Error`] type. +/// +/// [`Error`]: enum.Error.html +pub type Result = std::result::Result; /// Error type returned by the [`EventStore`] implementation, which is /// a _newtype_ wrapper around `tokio_postgres::Error`. /// /// [`EventStore`]: struct.EventStore.html -#[derive(Debug, Error)] -#[error(transparent)] -pub struct EventStoreError(#[from] Error); +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error when encoding the events in [`append`] to JSON prior + /// to sending them to the database. + /// + /// [`append`]: struct.EventStore.html#method.append + #[error("store failed to encode events in json: ${0}")] + EncodeEvents(#[source] serde_json::Error), -impl AppendError for EventStoreError { + /// Error returned by Postgres when executing queries. + #[error("postgres client returned an error: ${0}")] + Postgres(#[from] tokio_postgres::Error), +} + +impl AppendError for Error { #[inline] fn is_conflict_error(&self) -> bool { // TODO: implement this @@ -86,47 +106,77 @@ impl AppendError for EventStoreError { } } +const APPEND: &'static str = "SELECT * FROM append_to_store($1::text, $2::text, $3, $4, $5)"; + +const CREATE_AGGREGATE_TYPE: &'static str = "SELECT * FROM create_aggregate_type($1::text)"; + +const STREAM: &'static str = "SELECT e.* + FROM events e LEFT JOIN aggregates a ON a.id = e.aggregate_id + WHERE a.aggregate_type_id = $1 AND e.aggregate_id = $2 AND e.version >= $3 + ORDER BY version ASC"; + +const STREAM_ALL: &'static str = "SELECT e.* + FROM events e LEFT JOIN aggregates a ON a.id = e.aggregate_id + WHERE a.aggregate_type_id = $1 AND e.sequence_number >= $2 + ORDER BY e.sequence_number ASC"; + +const REMOVE: &'static str = "DELETE FROM aggregates WHERE aggregate_type_id = $1 AND id = $2"; + /// Builder type for [`EventStore`] instances. /// /// [`EventStore`]: struct.EventStore.html -pub struct EventStoreBuilder(Arc>); +pub struct EventStoreBuilder { + #[allow(dead_code)] + inner: (), +} -impl From>> for EventStoreBuilder { - #[inline] - fn from(client: Arc>) -> Self { - EventStoreBuilder(client.clone()) +impl EventStoreBuilder { + /// Ensure the database is migrated to the latest version. + pub async fn migrate_database(client: &mut Client) -> anyhow::Result { + embedded::migrations::runner().run_async(client).await?; + + Ok(Self { inner: () }) + } + + /// Returns a new builder instance after migrations have been completed. + pub fn new(self, client: Arc) -> EventStoreBuilderMigrated { + EventStoreBuilderMigrated { client } } } -impl EventStoreBuilder { - /// Creates a new [`EventStore`] instance using the specified stream name - /// as the Postgres backend table. +/// Builder step for [`EventStore`] instances, +/// after the database migration executed from [`EventStoreBuilder`] +/// has been completed. +/// +/// [`EventStore`]: struct.EventStore.html +/// [`EventStoreBuilder`]: struct.EventStoreBuilder.html +pub struct EventStoreBuilderMigrated { + client: Arc, +} + +impl EventStoreBuilderMigrated { + /// Creates a new [`EventStore`] instance using the specified name + /// to identify the source/aggregate type. + /// + /// Make sure the name is **unique** in your application. /// /// [`EventStore`]: struct.EventStore.html #[inline] - pub fn event_stream(&self, name: &'static str) -> EventStore { - EventStore { - client: self.0.clone(), - table_name: name, + pub async fn build(&self, type_name: &'static str) -> Result> { + let store = EventStore { + client: self.client.clone(), + type_name, id: std::marker::PhantomData, payload: std::marker::PhantomData, - append_query: format!( - "INSERT INTO {} (aggregate_id, event, version, \"offset\") - VALUES ($1, $2, $3, $4)", - name - ), - stream_query: format!( - "SELECT * FROM {} - WHERE aggregate_id = $1 AND version >= $2 - ORDER BY committed_at", - name - ), - remove_query: format!("DELETE FROM {} WHERE aggregate_id = $1", name), - } + }; + + store.create_aggregate_type().await?; + + Ok(store) } - /// Creates a new [`EventStore`] for an [`Aggregate`] type, - /// backed by a Postgres table using the specified stream name. + /// Creates a new [`EventStore`] for an [`Aggregate`] type. + /// Check out [`build`] for more information. /// /// ## Usage /// @@ -163,16 +213,17 @@ impl EventStoreBuilder { /// /// [`EventStore`]: struct.EventStore.html /// [`Aggregate`]: ../../eventually_core/aggregate/trait.Aggregate.html + /// [`build`]: struct.EventStoreBuilderMigrated.html#method.build #[inline] - pub fn aggregate_stream( - &self, - _: &T, - name: &'static str, - ) -> EventStore, T::Event> + pub async fn for_aggregate<'a, T>( + &'a self, + type_name: &'static str, + _: &'a T, + ) -> Result, T::Event>> where T: Aggregate, { - self.event_stream::, T::Event>(name) + self.build::, T::Event>(type_name).await } } @@ -187,139 +238,132 @@ impl EventStoreBuilder { /// [`EventStoreBuilder`]: ../../eventually_core/store/trait.EventStoreBuilder.html #[derive(Debug, Clone)] pub struct EventStore { - client: Arc>, - table_name: &'static str, + client: Arc, + type_name: &'static str, id: std::marker::PhantomData, payload: std::marker::PhantomData, - - append_query: String, - stream_query: String, - remove_query: String, -} - -impl EventStore -where - Id: TryFrom + Display + Eq + Send + Sync, -{ - /// Creates a new table in the database for the provided Stream name - /// during initialization. - /// - /// Check out [`EventStoreBuilder`] for more information. - /// - /// [`EventStoreBuilder`]: ../../eventually_core/store/trait.EventStoreBuilder.html - pub async fn create_stream(&self) -> Result<(), Error> { - let query = format!( - "CREATE TABLE IF NOT EXISTS {table_name} ( - event_id SERIAL PRIMARY KEY, - committed_at TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp, - aggregate_id VARCHAR NOT NULL, - version OID NOT NULL, - \"offset\" OID NOT NULL, - event JSONB NOT NULL, - CONSTRAINT {table_name}_versioned UNIQUE (aggregate_id, version, \"offset\") - )", - table_name = self.table_name - ); - - self.client - .read() - .await - .execute(&*query, &[]) - .await - .map(|_| ()) - } } impl eventually::EventStore for EventStore where - Id: TryFrom + Display + Eq + Send + Sync, + // TODO: remove this Infallible error here and use a proper one. + Id: TryFrom + Display + Eq + Send + Sync, Event: Serialize + Send + Sync, for<'de> Event: Deserialize<'de>, { type SourceId = Id; type Event = Event; - type Error = EventStoreError; + type Error = Error; fn append( &mut self, id: Self::SourceId, version: Expected, events: Vec, - ) -> BoxFuture> { - // FIXME(ar3s3ru): this crate needs a new overhaul. - unimplemented!("crate needs a new overhaul, after the changes with the EventStore") - - // let serialized = events - // .into_iter() - // .enumerate() - // .map(|(i, event)| serde_json::to_value(event).map(|value| (i, value))) - // .collect::, _>>() - // .unwrap(); - - // Box::pin(async move { - // let mut tx = self.client.write().await; - // let tx = tx.transaction().await.map_err(EventStoreError::from)?; - - // for (i, event) in serialized { - // tx.execute( - // &*self.append_query, - // &[&id.to_string(), &event, &version, &(i as u32)], - // ) - // .await - // .map_err(EventStoreError::from)?; - // } - - // tx.commit().await.map_err(EventStoreError::from) - // }) + ) -> BoxFuture> { + Box::pin(async move { + let serialized = events + .into_iter() + .map(serde_json::to_value) + .collect::, _>>() + .map_err(Error::EncodeEvents)?; + + let (version, check) = match version { + Expected::Any => (0i32, false), + Expected::Exact(v) => (v as i32, true), + }; + + let params: Params = &[ + &self.type_name, + &id.to_string(), + &version, + &check, + &serialized, + ]; + + let row = self.client.query_one(APPEND, params).await?; + + let id: i32 = row.try_get("version")?; + Ok(id as u32) + }) } - fn stream( - &self, - id: Self::SourceId, - select: Select, - ) -> BoxFuture, Self::Error>> { - let from = match select { - Select::All => 0, - Select::From(v) => v, - }; + fn stream(&self, id: Self::SourceId, select: Select) -> BoxFuture>> { + Box::pin(async move { + let from = match select { + Select::All => 0i32, + Select::From(v) => v as i32, + }; + let id = id.to_string(); + + let params: Params = &[&self.type_name, &id, &from]; + + self.stream_query(STREAM, params).await + }) + } + + fn stream_all(&self, select: Select) -> BoxFuture>> { Box::pin(async move { - let params: Params = &[&id.to_string(), &from]; + let from = match select { + Select::All => 0i64, + Select::From(v) => v as i64, + }; + + let params: Params = &[&self.type_name, &from]; + + self.stream_query(STREAM_ALL, params).await + }) + } + fn remove(&mut self, id: Self::SourceId) -> BoxFuture> { + Box::pin(async move { Ok(self .client - .read() - .await - .query_raw(&*self.stream_query, slice_iter(params)) + .execute(REMOVE, &[&self.type_name, &id.to_string()]) .await - .map_err(EventStoreError::from)? - .map_ok(|row| { - let event: Event = serde_json::from_value(row.get("event")).unwrap(); - let id: String = row.get("source_id"); - - PersistedEvent::from(Id::try_from(id).unwrap(), event) - .version(row.get("version")) - .sequence_number(row.get("offset")) - }) - .map_err(EventStoreError::from) - .boxed()) + .map(|_| ())?) }) } +} + +impl EventStore { + async fn create_aggregate_type(&self) -> Result<()> { + let params: Params = &[&self.type_name]; + + self.client.execute(CREATE_AGGREGATE_TYPE, params).await?; - fn stream_all(&self, select: Select) -> BoxFuture, Self::Error>> { - unimplemented!() + Ok(()) } +} - fn remove(&mut self, id: Self::SourceId) -> BoxFuture> { - Box::pin(async move { - self.client - .read() - .await - .execute(&*self.remove_query, &[&id.to_string()]) - .await - .map(|_| ()) - .map_err(EventStoreError::from) - }) +impl EventStore +where + Id: TryFrom + Display + Eq + Send + Sync, + Event: Serialize + Send + Sync, + for<'de> Event: Deserialize<'de>, +{ + async fn stream_query(&self, query: &str, params: Params<'_>) -> Result> { + Ok(self + .client + .query_raw(query, slice_iter(params)) + .await + .map_err(Error::from)? + .and_then(|row| async move { + // FIXME: can we remove this unwrap here? + let event: Event = serde_json::from_value(row.try_get("event")?).unwrap(); + + let id: String = row.try_get("aggregate_id")?; + let version: i32 = row.try_get("version")?; + let sequence_number: i64 = row.try_get("sequence_number")?; + + // FIXME: can we also remove this unwrap here? + Ok(PersistedEvent::from(Id::try_from(id).unwrap(), event) + .version(version as u32) + .sequence_number(sequence_number as u32)) + }) + .map_err(Error::from) + .boxed()) } }