Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rewrite api #192

Merged
merged 18 commits into from
Mar 15, 2024
Merged
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"rust-analyzer.cargo.features": "all"
"rust-analyzer.cargo.features": "all",
"rust-analyzer.showUnlinkedFileNotification": false
}
69 changes: 65 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ repository = "https://github.com/hseeberger/eventsourced"

[workspace.dependencies]
anyhow = { version = "1.0" }
assert_matches = { version = "1.5" }
async-nats = { version = "0.34" }
async-stream = { version = "0.3" }
bb8-postgres = { version = "0.8" }
bytes = { version = "1.5" }
configured = { version = "0.7" }
error-ext = { version = "0.1" }
frunk = { version = "0.4" }
futures = { version = "0.3" }
humantime-serde = { version = "1.1" }
pin-project = { version = "1.1" }
Expand Down
129 changes: 67 additions & 62 deletions eventsourced-nats/src/evt_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use async_nats::{
ConnectOptions,
};
use bytes::Bytes;
use eventsourced::{EventSourced, EvtLog};
use eventsourced::evt_log::EvtLog;
use futures::{future::ready, Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -133,23 +133,23 @@ where
#[instrument(skip(self, evt, to_bytes))]
async fn persist<E, ToBytes, ToBytesError>(
&mut self,
evt: &E::Evt,
type_name: &'static str,
id: &Self::Id,
last_seq_no: Option<NonZeroU64>,
evt: &E,
to_bytes: &ToBytes,
) -> Result<NonZeroU64, Self::Error>
where
E: EventSourced,
ToBytes: Fn(&E::Evt) -> Result<Bytes, ToBytesError> + Sync,
ToBytes: Fn(&E) -> Result<Bytes, ToBytesError> + Sync,
ToBytesError: StdError + Send + Sync + 'static,
{
let bytes = to_bytes(evt).map_err(|error| Error::IntoBytes(error.into()))?;
let bytes = to_bytes(evt).map_err(|error| Error::ToBytes(error.into()))?;
let publish = Publish::build().payload(bytes);
let publish = last_seq_no.into_iter().fold(publish, |p, last_seq_no| {
p.expected_last_subject_sequence(last_seq_no.get())
});

let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
let subject = format!("{}.{type_name}.{id}", self.evt_stream_name);
self.jetstream
.send_publish(subject, publish)
.await
Expand All @@ -164,11 +164,12 @@ where
}

#[instrument(skip(self))]
async fn last_seq_no<E>(&self, id: &Self::Id) -> Result<Option<NonZeroU64>, Self::Error>
where
E: EventSourced,
{
let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
async fn last_seq_no(
&self,
type_name: &'static str,
id: &Self::Id,
) -> Result<Option<NonZeroU64>, Self::Error> {
let subject = format!("{}.{type_name}.{id}", self.evt_stream_name);
stream(&self.jetstream, &self.evt_stream_name)
.await?
.get_last_raw_message_by_subject(&subject)
Expand Down Expand Up @@ -202,41 +203,40 @@ where
#[instrument(skip(self, from_bytes))]
async fn evts_by_id<E, FromBytes, FromBytesError>(
&self,
type_name: &'static str,
id: &Self::Id,
seq_no: NonZeroU64,
from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E::Evt), Self::Error>> + Send, Self::Error>
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: EventSourced,
FromBytes: Fn(Bytes) -> Result<E::Evt, FromBytesError> + Copy + Send + Sync + 'static,
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync + 'static,
FromBytesError: StdError + Send + Sync + 'static,
{
debug!(
type_name = E::TYPE_NAME,
type_name,
%id,
seq_no,
"building events by ID stream"
);
let subject = format!("{}.{}.{id}", self.evt_stream_name, E::TYPE_NAME);
let subject = format!("{}.{}.{id}", self.evt_stream_name, type_name);
self.evts(subject, seq_no, |_| true, from_bytes).await
}

#[instrument(skip(self, from_bytes))]
async fn evts_by_type<E, FromBytes, FromBytesError>(
&self,
type_name: &'static str,
seq_no: NonZeroU64,
from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E::Evt), Self::Error>> + Send, Self::Error>
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: EventSourced,
FromBytes: Fn(Bytes) -> Result<E::Evt, FromBytesError> + Copy + Send + Sync + 'static,
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync + 'static,
FromBytesError: StdError + Send + Sync + 'static,
{
debug!(
type_name = E::TYPE_NAME,
seq_no, "building events by type stream"
);
let subject = format!("{}.{}.*", self.evt_stream_name, E::TYPE_NAME);
debug!(type_name, seq_no, "building events by type stream");
let subject = format!("{}.{}.*", self.evt_stream_name, type_name);
self.evts(subject, seq_no, |_| true, from_bytes).await
}
}
Expand Down Expand Up @@ -378,36 +378,11 @@ mod tests {
use error_ext::BoxError;
use eventsourced::binarize;
use futures::TryStreamExt;
use std::{convert::Infallible, future};
use std::future;
use testcontainers::{clients::Cli, core::WaitFor};
use testcontainers_modules::testcontainers::GenericImage;
use uuid::Uuid;

#[derive(Debug)]
struct Dummy;

impl EventSourced for Dummy {
type Id = Uuid;
type Cmd = ();
type Evt = u32;
type State = u64;
type Error = Infallible;

const TYPE_NAME: &'static str = "simple";

fn handle_cmd(
_id: &Self::Id,
_state: &Self::State,
_cmd: Self::Cmd,
) -> Result<Self::Evt, Self::Error> {
todo!()
}

fn handle_evt(_state: Self::State, _evt: Self::Evt) -> Self::State {
todo!()
}
}

#[tokio::test]
async fn test_evt_log() -> Result<(), BoxError> {
let client = Cli::default();
Expand All @@ -427,37 +402,55 @@ mod tests {

// Start testing.

let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, None);

let last_seq_no = evt_log
.persist::<Dummy, _, _>(&1, &id, None, &binarize::serde_json::to_bytes)
.persist("counter", &id, None, &1, &binarize::serde_json::to_bytes)
.await?;
assert!(last_seq_no.get() == 1);

evt_log
.persist::<Dummy, _, _>(&2, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&2,
&binarize::serde_json::to_bytes,
)
.await?;

let result = evt_log
.persist::<Dummy, _, _>(&3, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&3,
&binarize::serde_json::to_bytes,
)
.await;
assert!(result.is_err());

evt_log
.persist::<Dummy, _, _>(
&3,
.persist(
"counter",
&id,
Some(last_seq_no.checked_add(1).expect("overflow")),
&3,
&binarize::serde_json::to_bytes,
)
.await?;

let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, Some(3.try_into()?));

let evts = evt_log
.evts_by_id::<Dummy, _, _>(&id, 2.try_into()?, binarize::serde_json::from_bytes)
.evts_by_id::<u32, _, _>(
"counter",
&id,
2.try_into()?,
binarize::serde_json::from_bytes,
)
.await?;
let sum = evts
.take(2)
Expand All @@ -466,18 +459,30 @@ mod tests {
assert_eq!(sum, 5);

let evts = evt_log
.evts_by_type::<Dummy, _, _>(NonZeroU64::MIN, binarize::serde_json::from_bytes)
.evts_by_type::<u32, _, _>("counter", NonZeroU64::MIN, binarize::serde_json::from_bytes)
.await?;

let last_seq_no = evt_log
.clone()
.persist::<Dummy, _, _>(&4, &id, last_seq_no, &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
last_seq_no,
&4,
&binarize::serde_json::to_bytes,
)
.await?;
evt_log
.clone()
.persist::<Dummy, _, _>(&5, &id, Some(last_seq_no), &binarize::serde_json::to_bytes)
.persist(
"counter",
&id,
Some(last_seq_no),
&5,
&binarize::serde_json::to_bytes,
)
.await?;
let last_seq_no = evt_log.last_seq_no::<Dummy>(&id).await?;
let last_seq_no = evt_log.last_seq_no("counter", &id).await?;
assert_eq!(last_seq_no, Some(5.try_into()?));

let sum = evts
Expand Down
Loading