Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions eventually-core/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::fmt::Debug;
use futures::stream::TryStreamExt;

use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
use crate::store::{EventStore, Select};
use crate::store::{EventStore, Expected, Select};
use crate::versioning::Versioned;

/// Error type returned by the [`Repository`].
Expand Down Expand Up @@ -145,11 +145,10 @@ where

if let Some(events) = events_to_commit {
if !events.is_empty() {
// Version is incremented at each events flush.
version += 1;

self.store
.append(root.id().clone(), version, events)
// Version is incremented at each events flush by the EventStore.
version = self
.store
.append(root.id().clone(), Expected::Exact(version), events)
.await
.map_err(Error::Store)?;
}
Expand Down
163 changes: 139 additions & 24 deletions eventually-core/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,108 @@
//!
//! [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event

use std::ops::Deref;

use futures::future::BoxFuture;
use futures::stream::BoxStream;

use serde::{Deserialize, Serialize};

use crate::versioning::Versioned;

/// Contains a type-state builder for [`PersistentEvent`] type.
///
/// [`PersistentEvent`]: struct.PersistedEvent.html
pub mod persistent {
/// Creates a new [`PersistedEvent`] by wrapping an Event value.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
pub struct EventBuilder<T> {
pub(super) event: T,
}

impl<T> From<T> for EventBuilder<T> {
#[inline]
fn from(event: T) -> Self {
Self { event }
}
}

impl<T> EventBuilder<T> {
/// Specifies the [`PersistentEvent`] version and moves to the next
/// builder state.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
#[inline]
pub fn version(self, value: u32) -> EventBuilderWithVersion<T> {
EventBuilderWithVersion {
version: value,
event: self.event,
}
}

/// Specifies the [`PersistentEvent`] sequence number and moves to the next
/// builder state.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
#[inline]
pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<T> {
EventBuilderWithSequenceNumber {
sequence_number: value,
event: self.event,
}
}
}

/// Next step in creating a new [`PersistedEvent`] carrying an Event value
/// and its version.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
pub struct EventBuilderWithVersion<T> {
version: u32,
event: T,
}

impl<T> EventBuilderWithVersion<T> {
/// Specifies the [`PersistentEvent`] sequence number and moves to the next
/// builder state.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
#[inline]
pub fn sequence_number(self, value: u32) -> super::PersistedEvent<T> {
super::PersistedEvent {
version: self.version,
event: self.event,
sequence_number: value,
}
}
}

/// Next step in creating a new [`PersistedEvent`] carrying an Event value
/// and its sequence number.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
pub struct EventBuilderWithSequenceNumber<T> {
sequence_number: u32,
event: T,
}

impl<T> EventBuilderWithSequenceNumber<T> {
/// Specifies the [`PersistentEvent`] version and moves to the next
/// builder state.
///
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
#[inline]
pub fn version(self, value: u32) -> super::PersistedEvent<T> {
super::PersistedEvent {
version: value,
event: self.event,
sequence_number: self.sequence_number,
}
}
}
}

/// An [`Event`] wrapper for events that have been
/// successfully committed to the [`EventStore`].
///
Expand All @@ -25,37 +120,28 @@ pub struct PersistedEvent<T> {
event: T,
}

impl<T> From<T> for PersistedEvent<T> {
#[inline]
fn from(event: T) -> Self {
Self {
event,
version: 0,
sequence_number: 0,
}
}
}

impl<T> Versioned for PersistedEvent<T> {
#[inline]
fn version(&self) -> u32 {
self.version
}
}

impl<T> PersistedEvent<T> {
/// Updates the event version to the one specified.
#[inline]
pub fn with_version(mut self, version: u32) -> Self {
self.version = version;
self
impl<T> Deref for PersistedEvent<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.event
}
}

/// Updates the sequence number version to the one specified.
impl<T> PersistedEvent<T> {
/// Creates a new [`EventBuilder`] from the provided Event value.
///
/// [`EventBuilder`]: persistent/struct.EventBuilder.html
#[inline]
pub fn with_sequence_number(mut self, sequence_number: u32) -> Self {
self.sequence_number = sequence_number;
self
pub fn from(event: T) -> persistent::EventBuilder<T> {
persistent::EventBuilder { event }
}

/// Returns the event sequence number.
Expand Down Expand Up @@ -93,6 +179,27 @@ pub enum Select {
From(u32),
}

/// Specifies the optimistic locking level when performing [`append`] from
/// an [`EventStore`].
///
/// Check out [`append`] documentation for more info.
///
/// [`append`]: trait.EventStore.html#method.append
/// [`EventStore`]: trait.EventStore.html
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Expected {
/// Append events disregarding the current [`Aggregate`] version.
///
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
Any,

/// Append events only if the current version of the [`Aggregate`]
/// is the one specified by the value provided here.
///
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
Exact(u32),
}

/// Stream type returned by the [`EventStore::stream`] method.
///
/// [`EventStore::stream`]: trait.EventStore.html#method.stream
Expand Down Expand Up @@ -142,7 +249,15 @@ pub trait EventStore {
/// `append` is a transactional operation: it either appends all the events,
/// or none at all and returns an [`AppendError`].
///
/// The desired version for the new [`Event`]s to append must be specified.
/// The desired version for the new [`Event`]s to append must be specified
/// through an [`Expected`] element.
///
/// When using `Expected::Any`, no checks on the current [`Aggregate`]
/// values will be performed, disregarding optimistic locking.
///
/// When using `Expected::Exact`, the Store will check that the current
/// version of the [`Aggregate`] is _exactly_ the one specified.
///
/// If the version is not the one expected from the Store, implementations
/// should raise an [`AppendError::Conflict`] error.
///
Expand All @@ -156,9 +271,9 @@ pub trait EventStore {
fn append(
&mut self,
id: Self::SourceId,
version: u32,
version: Expected,
events: Vec<Self::Event>,
) -> BoxFuture<Result<(), Self::Error>>;
) -> BoxFuture<Result<u32, Self::Error>>;

/// Streams a list of [`Event`]s from the `EventStore` back to the application,
/// by specifying the desired [`SourceId`] and [`Offset`].
Expand Down
51 changes: 27 additions & 24 deletions eventually-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

use std::sync::Arc;

use eventually::store::{AppendError, EventStream, PersistedEvent, Select};
use eventually::store::{AppendError, EventStream, Expected, PersistedEvent, Select};
use eventually::{Aggregate, AggregateId};

use futures::future::BoxFuture;
Expand Down Expand Up @@ -241,31 +241,34 @@ where
fn append(
&mut self,
id: Self::SourceId,
version: u32,
version: Expected,
events: Vec<Self::Event>,
) -> BoxFuture<Result<(), Self::Error>> {
let serialized = events
.into_iter()
.enumerate()
.map(|(i, event)| serde_json::to_value(event).map(|value| (i, value)))
.collect::<Result<Vec<_>, _>>()
.unwrap();
) -> BoxFuture<Result<u32, Self::Error>> {
// FIXME(ar3s3ru): this crate needs a new overhaul.
unimplemented!("crate needs a new overhaul, after the changes with the EventStore")

Box::pin(async move {
let mut tx = self.client.write().await;
let tx = tx.transaction().await.map_err(EventStoreError::from)?;
// let serialized = events
// .into_iter()
// .enumerate()
// .map(|(i, event)| serde_json::to_value(event).map(|value| (i, value)))
// .collect::<Result<Vec<_>, _>>()
// .unwrap();

for (i, event) in serialized {
tx.execute(
&*self.append_query,
&[&id.to_string(), &event, &version, &(i as u32)],
)
.await
.map_err(EventStoreError::from)?;
}
// Box::pin(async move {
// let mut tx = self.client.write().await;
// let tx = tx.transaction().await.map_err(EventStoreError::from)?;

tx.commit().await.map_err(EventStoreError::from)
})
// for (i, event) in serialized {
// tx.execute(
// &*self.append_query,
// &[&id.to_string(), &event, &version, &(i as u32)],
// )
// .await
// .map_err(EventStoreError::from)?;
// }

// tx.commit().await.map_err(EventStoreError::from)
// })
}

fn stream(
Expand All @@ -292,8 +295,8 @@ where
let event: Event = serde_json::from_value(row.get("event")).unwrap();

PersistedEvent::from(event)
.with_version(row.get("version"))
.with_sequence_number(row.get("offset"))
.version(row.get("version"))
.sequence_number(row.get("offset"))
})
.map_err(EventStoreError::from)
.boxed())
Expand Down
Loading