Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create replies directly out of handle_cmd using closure #219

Merged
merged 9 commits into from Apr 10, 2024
93 changes: 60 additions & 33 deletions eventsourced/src/lib.rs
Expand Up @@ -94,11 +94,36 @@ where
/// The command handler, taking this command, and references to the ID and the state of
/// the event sourced entity, either rejecting this command via [Self::Error] or returning an
/// event.
jrudolph marked this conversation as resolved.
Show resolved Hide resolved
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, Self::Error>;
fn handle_cmd(&self, id: &E::Id, state: &E) -> CmdEffect<E, Self::Reply, Self::Error>;
}

/// The reply factory, which is applied if the command handler has returned an event (as
/// opposed to a rejection) and after that has been persisted successfully.
fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> Self::Reply;
/// The result of handling a command, either emitting an event and replying or rejecting the
/// command.
pub enum CmdEffect<E, Reply, Error>
where
E: EventSourced,
{
EmitAndReply(E::Evt, Box<dyn FnOnce(&E) -> Reply + Send + Sync>),
Reject(Error),
}

impl<E, Reply, Error> CmdEffect<E, Reply, Error>
where
E: EventSourced,
{
/// Emit the given event, persist it, and after applying it to the state, use the given function
/// to create a reply. The new state is passed to the function after applying the event.
pub fn emit_and_reply(
evt: E::Evt,
make_reply: impl FnOnce(&E) -> Reply + Send + Sync + 'static,
) -> Self {
Self::EmitAndReply(evt, Box::new(make_reply))
}

/// Reject this command with the given error.
pub fn reject(error: Error) -> Self {
Self::Reject(error)
}
}

/// A handle representing a spawned [EventSourcedEntity], which can be used to pass it commands.
Expand All @@ -114,7 +139,7 @@ where

impl<E> EntityRef<E>
where
E: EventSourced,
E: EventSourced + 'static,
{
/// The ID of the represented [EventSourcedEntity].
pub fn id(&self) -> &E::Id {
Expand Down Expand Up @@ -248,7 +273,7 @@ where

let result = cmd.handle_cmd(&id, &state);
match result {
Ok(evt) => {
Ok((evt, make_reply)) => {
debug!(?id, ?evt, "persisting event");

match evt_log
Expand Down Expand Up @@ -288,7 +313,7 @@ where
};
}

let reply = cmd.make_reply(&id, &state, evt);
let reply = make_reply(&state);
if result_sender.send(Ok(reply)).is_err() {
error!(?id, "cannot send cmd reply");
};
Expand Down Expand Up @@ -352,25 +377,31 @@ where
Self: Debug,
E: EventSourced,
{
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, BoxedAny>;

fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> BoxedAny;
fn handle_cmd(
&self,
id: &E::Id,
state: &E,
) -> Result<(E::Evt, Box<dyn FnOnce(&E) -> BoxedAny + Send + Sync>), BoxedAny>;
}

impl<C, E, Reply, Error> ErasedCmd<E> for C
where
C: Cmd<E, Reply = Reply, Error = Error>,
E: EventSourced,
E: EventSourced + 'static,
Reply: Send + 'static,
Error: Send + 'static,
{
fn handle_cmd(&self, id: &E::Id, state: &E) -> Result<E::Evt, BoxedAny> {
let result = self.handle_cmd(id, state);
result.map_err(|error| Box::new(error) as BoxedAny)
}

fn make_reply(&self, id: &E::Id, state: &E, evt: E::Evt) -> BoxedAny {
Box::new(self.make_reply(id, state, evt))
fn handle_cmd(
&self,
id: &E::Id,
state: &E,
) -> Result<(E::Evt, Box<dyn FnOnce(&E) -> BoxedAny + Send + Sync>), BoxedAny> {
jrudolph marked this conversation as resolved.
Show resolved Hide resolved
match self.handle_cmd(id, state) {
CmdEffect::EmitAndReply(evt, make_reply) => {
Ok((evt, Box::new(|s| Box::new(make_reply(s)))))
}
CmdEffect::Reject(error) => Err(Box::new(error) as BoxedAny),
}
}
}

Expand All @@ -380,7 +411,7 @@ mod tests {
binarize::serde_json::*,
evt_log::{test::TestEvtLog, EvtLog},
snapshot_store::{test::TestSnapshotStore, SnapshotStore},
Cmd, EntityRef, EventSourced, EventSourcedExt,
Cmd, CmdEffect, EntityRef, EventSourced, EventSourcedExt,
};
use assert_matches::assert_matches;
use error_ext::BoxError;
Expand Down Expand Up @@ -418,17 +449,15 @@ mod tests {
type Error = Overflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(&self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Overflow> {
if u64::MAX - state.0 < self.0 {
Err(Overflow)
CmdEffect::reject(Overflow)
} else {
Ok(CounterEvt::Increased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Increased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug)]
Expand All @@ -441,17 +470,15 @@ mod tests {
type Error = Underflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(&self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Underflow> {
if state.0 < self.0 {
Err(Underflow)
CmdEffect::reject(Underflow)
} else {
Ok(CounterEvt::Decreased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Decreased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down
26 changes: 11 additions & 15 deletions examples/counter/src/counter.rs
@@ -1,4 +1,4 @@
use eventsourced::{Cmd, EventSourced};
use eventsourced::{Cmd, CmdEffect, EventSourced};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -32,17 +32,15 @@ impl Cmd<Counter> for IncreaseCounter {
type Error = Overflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(&self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Overflow> {
if u64::MAX - state.0 < self.0 {
Err(Overflow)
CmdEffect::reject(Overflow)
} else {
Ok(CounterEvt::Increased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Increased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug)]
Expand All @@ -55,17 +53,15 @@ impl Cmd<Counter> for DecreaseCounter {
type Error = Underflow;
type Reply = u64;

fn handle_cmd(&self, id: &Uuid, state: &Counter) -> Result<CounterEvt, Self::Error> {
fn handle_cmd(&self, id: &Uuid, state: &Counter) -> CmdEffect<Counter, u64, Underflow> {
if state.0 < self.0 {
Err(Underflow)
CmdEffect::reject(Underflow)
} else {
Ok(CounterEvt::Decreased(*id, self.0))
CmdEffect::emit_and_reply(CounterEvt::Decreased(*id, self.0), |state: &Counter| {
state.0
})
}
}

fn make_reply(&self, _id: &Uuid, state: &Counter, _evt: CounterEvt) -> Self::Reply {
state.0
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down