Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create replies directly out of handle_cmd using closure #219

Merged
merged 9 commits into from Apr 10, 2024
105 changes: 70 additions & 35 deletions eventsourced/src/lib.rs
Expand Up @@ -61,6 +61,11 @@ use tokio::{
use tracing::{debug, error, instrument};

type BoxedCmd<E> = Box<dyn ErasedCmd<E> + Send>;
#[allow(type_alias_bounds)]
type BoxedCmdEffect<E>
where
E: EventSourced,
= Result<(E::Evt, Box<dyn FnOnce(&E) -> BoxedAny + Send + Sync>), BoxedAny>;
type BoxedAny = Box<dyn Any + Send>;
type BoxedMsg<E> = (BoxedCmd<E>, oneshot::Sender<Result<BoxedAny, BoxedAny>>);

Expand Down Expand Up @@ -92,13 +97,48 @@ where
type Error: Send + 'static;

/// The command handler, taking this command, and references to the ID and the state of
/// the event sourced entity, either rejecting this command via [Self::Error] or returning an
/// event.
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, Self::Error>;
/// the event sourced entity, either rejecting this command via [CmdEffect::reject] or returning an
/// event using [CmdEffect::emit_and_reply] (or [CmdEffect::emit] in case Reply = ())).
fn handle_cmd(self, id: &E::Id, state: &E) -> CmdEffect<E, Self::Reply, Self::Error>;
}

/// The reply factory, which is applied if the command handler has returned an event (as
/// opposed to a rejection) and after that has been persisted successfully.
fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> Self::Reply;
/// The result of handling a command, either emitting an event and replying or rejecting the
/// command.
pub enum CmdEffect<E, Reply, Error>
where
E: EventSourced,
{
EmitAndReply(E::Evt, Box<dyn FnOnce(&E) -> Reply + Send + Sync>),
Reject(Error),
}

impl<E, Reply, Error> CmdEffect<E, Reply, Error>
where
E: EventSourced,
{
/// Emit the given event, persist it, and after applying it to the state, use the given function
/// to create a reply. The new state is passed to the function after applying the event.
pub fn emit_and_reply(
evt: E::Evt,
make_reply: impl FnOnce(&E) -> Reply + Send + Sync + 'static,
) -> Self {
Self::EmitAndReply(evt, Box::new(make_reply))
}

/// Reject this command with the given error.
pub fn reject(error: Error) -> Self {
Self::Reject(error)
}
}

impl<E, Error> CmdEffect<E, (), Error>
where
E: EventSourced,
{
/// Persist the given event (and don't give a reply for Cmds with Reply = ()).
pub fn emit(evt: E::Evt) -> Self {
hseeberger marked this conversation as resolved.
Show resolved Hide resolved
Self::emit_and_reply(evt, |_| ())
}
}

/// A handle representing a spawned [EventSourcedEntity], which can be used to pass it commands.
Expand All @@ -114,7 +154,7 @@ where

impl<E> EntityRef<E>
where
E: EventSourced,
E: EventSourced + 'static,
{
/// The ID of the represented [EventSourcedEntity].
pub fn id(&self) -> &E::Id {
Expand Down Expand Up @@ -248,7 +288,7 @@ where

let result = cmd.handle_cmd(&id, &state);
match result {
Ok(evt) => {
Ok((evt, make_reply)) => {
debug!(?id, ?evt, "persisting event");

match evt_log
Expand Down Expand Up @@ -288,7 +328,7 @@ where
};
}

let reply = cmd.make_reply(&id, &state, evt);
let reply = make_reply(&state);
if result_sender.send(Ok(reply)).is_err() {
error!(?id, "cannot send cmd reply");
};
Expand Down Expand Up @@ -352,25 +392,23 @@ where
Self: Debug,
E: EventSourced,
{
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, BoxedAny>;

fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> BoxedAny;
fn handle_cmd(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCmdEffect<E>;
}

impl<C, E, Reply, Error> ErasedCmd<E> for C
where
C: Cmd<E, Reply = Reply, Error = Error>,
E: EventSourced,
E: EventSourced + 'static,
Reply: Send + 'static,
Error: Send + 'static,
{
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, BoxedAny> {
let result = self.handle_cmd(id, state);
result.map_err(|error| Box::new(error) as BoxedAny)
}

fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> BoxedAny {
Box::new(self.make_reply(id, state, evt))
fn handle_cmd(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCmdEffect<E> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL self: Box<Self> to allow owning Box<dyn ...> values in trait functions.

match <C as Cmd<E>>::handle_cmd(*self, id, state) {
CmdEffect::EmitAndReply(evt, make_reply) => {
Ok((evt, Box::new(|s| Box::new(make_reply(s)))))
}
CmdEffect::Reject(error) => Err(Box::new(error) as BoxedAny),
}
}
}

Expand All @@ -380,7 +418,7 @@ mod tests {
binarize::serde_json::*,
evt_log::{test::TestEvtLog, EvtLog},
snapshot_store::{test::TestSnapshotStore, SnapshotStore},
Cmd, EntityRef, EventSourced, EventSourcedExt,
Cmd, CmdEffect, EntityRef, EventSourced, EventSourcedExt,
};
use assert_matches::assert_matches;
use error_ext::BoxError;
Expand Down Expand Up @@ -418,17 +456,15 @@ mod tests {
type Error = Overflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Overflow> {
if u64::MAX - state.0 < self.0 {
Err(Overflow)
CmdEffect::reject(Overflow)
} else {
Ok(CounterEvt::Increased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Increased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug)]
Expand All @@ -441,17 +477,16 @@ mod tests {
type Error = Underflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Underflow> {
if state.0 < self.0 {
Err(Underflow)
CmdEffect::reject(Underflow)
} else {
Ok(CounterEvt::Decreased(*id, self.0))
CmdEffect::emit_and_reply(
CounterEvt::Decreased(*id, self.0),
move |state: &Counter| state.0 + self.0 - self.0, /* Simple no-op test to verify that closing over this cmd is possible */
)
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down
26 changes: 11 additions & 15 deletions examples/counter/src/counter.rs
@@ -1,4 +1,4 @@
use eventsourced::{Cmd, EventSourced};
use eventsourced::{Cmd, CmdEffect, EventSourced};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -32,17 +32,15 @@ impl Cmd<Counter> for IncreaseCounter {
type Error = Overflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Overflow> {
if u64::MAX - state.0 < self.0 {
Err(Overflow)
CmdEffect::reject(Overflow)
} else {
Ok(CounterEvt::Increased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Increased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug)]
Expand All @@ -55,17 +53,15 @@ impl Cmd<Counter> for DecreaseCounter {
type Error = Underflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Underflow> {
if state.0 < self.0 {
Err(Underflow)
CmdEffect::reject(Underflow)
} else {
Ok(CounterEvt::Decreased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Decreased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down