Skip to content

Commit

Permalink
chore: rewrite api
Browse files Browse the repository at this point in the history
  • Loading branch information
hseeberger committed Mar 11, 2024
1 parent a9f41d4 commit 06fe8b8
Show file tree
Hide file tree
Showing 15 changed files with 809 additions and 1,058 deletions.
628 changes: 35 additions & 593 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"eventsourced",
"eventsourced-nats",
"eventsourced-postgres",
"eventsourced-projection",
# "eventsourced-projection",
"examples/counter",
"examples/counter-nats",
"examples/counter-postgres",
Expand All @@ -19,6 +19,7 @@ repository = "https://github.com/hseeberger/eventsourced"

[workspace.dependencies]
anyhow = { version = "1.0" }
assert_matches = { version = "1.5" }
async-nats = { version = "0.34" }
async-stream = { version = "0.3" }
bb8-postgres = { version = "0.8" }
Expand All @@ -27,6 +28,7 @@ configured = { version = "0.7" }
error-ext = { version = "0.1" }
futures = { version = "0.3" }
humantime-serde = { version = "1.1" }
pin-project = { version = "1.1" }
prost = { version = "0.12" }
prost-build = { version = "0.12" }
serde = { version = "1.0", features = [ "derive" ] }
Expand Down
127 changes: 66 additions & 61 deletions eventsourced-nats/src/evt_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use async_nats::{
ConnectOptions,
};
use bytes::Bytes;
use eventsourced::{EventSourced, EvtLog};
use eventsourced::EvtLog;
use futures::{future::ready, Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -133,14 +133,14 @@ where
#[instrument(skip(self, evt, to_bytes))]
async fn persist<E, ToBytes, ToBytesError>(
&mut self,
evt: &E::Evt,
type_name: &'static str,
id: &Self::Id,
last_seq_no: Option<NonZeroU64>,
evt: &E,
to_bytes: &ToBytes,
) -> Result<NonZeroU64, Self::Error>
where
E: EventSourced,
ToBytes: Fn(&E::Evt) -> Result<Bytes, ToBytesError> + Sync,
ToBytes: Fn(&E) -> Result<Bytes, ToBytesError> + Sync,
ToBytesError: StdError + Send + Sync + 'static,
{
let bytes = to_bytes(evt).map_err(|error| Error::IntoBytes(error.into()))?;
Expand All @@ -149,7 +149,7 @@ where
p.expected_last_subject_sequence(last_seq_no.get())
});

let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
let subject = format!("{}.{type_name}.{id}", self.evt_stream_name);
self.jetstream
.send_publish(subject, publish)
.await
Expand All @@ -164,11 +164,12 @@ where
}

#[instrument(skip(self))]
async fn last_seq_no<E>(&self, id: &Self::Id) -> Result<Option<NonZeroU64>, Self::Error>
where
E: EventSourced,
{
let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
async fn last_seq_no(
&self,
type_name: &'static str,
id: &Self::Id,
) -> Result<Option<NonZeroU64>, Self::Error> {
let subject = format!("{}.{type_name}.{id}", self.evt_stream_name);
stream(&self.jetstream, &self.evt_stream_name)
.await?
.get_last_raw_message_by_subject(&subject)
Expand Down Expand Up @@ -202,41 +203,40 @@ where
#[instrument(skip(self, from_bytes))]
async fn evts_by_id<E, FromBytes, FromBytesError>(
&self,
type_name: &'static str,
id: &Self::Id,
seq_no: NonZeroU64,
from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E::Evt), Self::Error>> + Send, Self::Error>
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: EventSourced,
FromBytes: Fn(Bytes) -> Result<E::Evt, FromBytesError> + Copy + Send + Sync + 'static,
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync + 'static,
FromBytesError: StdError + Send + Sync + 'static,
{
debug!(
type_name = E::TYPE_NAME,
type_name,
%id,
seq_no,
"building events by ID stream"
);
let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
let subject = format!("{}.{}.{id}", self.evt_stream_name, type_name);
self.evts(subject, seq_no, |_| true, from_bytes).await
}

#[instrument(skip(self, from_bytes))]
async fn evts_by_type<E, FromBytes, FromBytesError>(
&self,
type_name: &'static str,
seq_no: NonZeroU64,
from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E::Evt), Self::Error>> + Send, Self::Error>
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: EventSourced,
FromBytes: Fn(Bytes) -> Result<E::Evt, FromBytesError> + Copy + Send + Sync + 'static,
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync + 'static,
FromBytesError: StdError + Send + Sync + 'static,
{
debug!(
type_name = E::TYPE_NAME,
seq_no, "building events by type stream"
);
let subject = format!("{}.{}.*", self.evt_stream_name, E::TYPE_NAME);
debug!(type_name, seq_no, "building events by type stream");
let subject = format!("{}.{}.*", self.evt_stream_name, type_name);
self.evts(subject, seq_no, |_| true, from_bytes).await
}
}
Expand Down Expand Up @@ -378,36 +378,11 @@ mod tests {
use error_ext::BoxError;
use eventsourced::binarize;
use futures::TryStreamExt;
use std::{convert::Infallible, future};
use std::future;
use testcontainers::{clients::Cli, core::WaitFor};
use testcontainers_modules::testcontainers::GenericImage;
use uuid::Uuid;

#[derive(Debug)]
struct Dummy;

impl EventSourced for Dummy {
type Id = Uuid;
type Cmd = ();
type Evt = u32;
type State = u64;
type Error = Infallible;

const TYPE_NAME: &'static str = "simple";

fn handle_cmd(
_id: &Self::Id,
_state: &Self::State,
_cmd: Self::Cmd,
) -> Result<Self::Evt, Self::Error> {
todo!()
}

fn handle_evt(_state: Self::State, _evt: Self::Evt) -> Self::State {
todo!()
}
}

#[tokio::test]
async fn test_evt_log() -> Result<(), BoxError> {
let client = Cli::default();
Expand All @@ -427,37 +402,55 @@ mod tests {

// Start testing.

let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, None);

let last_seq_no = evt_log
.persist::<Dummy, _, _>(&1, &id, None, &binarize::serde_json::to_bytes)
.persist("counter", &id, None, &1, &binarize::serde_json::to_bytes)
.await?;
assert!(last_seq_no.get() == 1);

evt_log
.persist::<Dummy, _, _>(&2, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&2,
&binarize::serde_json::to_bytes,
)
.await?;

let result = evt_log
.persist::<Dummy, _, _>(&3, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&3,
&binarize::serde_json::to_bytes,
)
.await;
assert!(result.is_err());

evt_log
.persist::<Dummy, _, _>(
&3,
.persist(
"counter",
&id,
Some(last_seq_no.checked_add(1).expect("overflow")),
&3,
&binarize::serde_json::to_bytes,
)
.await?;

let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, Some(3.try_into()?));

let evts = evt_log
.evts_by_id::<Dummy, _, _>(&id, 2.try_into()?, binarize::serde_json::from_bytes)
.evts_by_id::<u32, _, _>(
"counter",
&id,
2.try_into()?,
binarize::serde_json::from_bytes,
)
.await?;
let sum = evts
.take(2)
Expand All @@ -466,18 +459,30 @@ mod tests {
assert_eq!(sum, 5);

let evts = evt_log
.evts_by_type::<Dummy, _, _>(NonZeroU64::MIN, binarize::serde_json::from_bytes)
.evts_by_type::<u32, _, _>("counter", NonZeroU64::MIN, binarize::serde_json::from_bytes)
.await?;

let last_seq_no = evt_log
.clone()
.persist::<Dummy, _, _>(&4, &id, last_seq_no, &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
last_seq_no,
&4,
&binarize::serde_json::to_bytes,
)
.await?;
evt_log
.clone()
.persist::<Dummy, _, _>(&5, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&5,
&binarize::serde_json::to_bytes,
)
.await?;
let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, Some(5.try_into()?));

let sum = evts
Expand Down
Loading

0 comments on commit 06fe8b8

Please sign in to comment.