Skip to content

Commit

Permalink
fix TestEvtLog/SnapShotStore
Browse files Browse the repository at this point in the history
  • Loading branch information
hseeberger committed Mar 13, 2024
1 parent bd82bf7 commit 920e6f5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
12 changes: 11 additions & 1 deletion eventsourced/src/evt_log/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use error_ext::BoxError;
use futures::{stream, Stream};
use std::{
collections::HashMap, error::Error as StdError, fmt::Debug, hash::Hash, iter, num::NonZeroU64,
sync::Arc,
};
use thiserror::Error;
use tokio::sync::RwLock;

type Evts<I> = HashMap<&'static str, HashMap<I, Vec<(NonZeroU64, Bytes)>>>;
type Evts<I> = Arc<RwLock<HashMap<&'static str, HashMap<I, Vec<(NonZeroU64, Bytes)>>>>>;

/// An in-memory implementation of [EvtLog] for testing purposes.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -48,6 +50,8 @@ where
let bytes = to_bytes(evt).map_err(|error| Error(error.into()))?;

self.evts
.write()
.await
.entry(type_name)
.and_modify(|evts| {
evts.entry(id.to_owned())
Expand All @@ -74,6 +78,8 @@ where
) -> Result<Option<NonZeroU64>, Self::Error> {
let seq_no = self
.evts
.read()
.await
.get(type_name)
.and_then(|evts| evts.get(id))
.and_then(|evts| evts.last())
Expand All @@ -97,6 +103,8 @@ where
{
let evts = self
.evts
.read()
.await
.get(type_name)
.and_then(|evts| evts.get(id).cloned())
.unwrap_or_default()
Expand Down Expand Up @@ -124,6 +132,8 @@ where
{
let evts = self
.evts
.read()
.await
.get(type_name)
.cloned()
.unwrap_or_default()
Expand Down
11 changes: 9 additions & 2 deletions eventsourced/src/snapshot_store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use bytes::Bytes;
use error_ext::BoxError;
use std::{
collections::HashMap, error::Error as StdError, fmt::Debug, hash::Hash, num::NonZeroU64,
sync::Arc,
};
use thiserror::Error;
use tokio::sync::RwLock;

/// An in-memory implementation of [SnapshotStore] for testing purposes.
#[derive(Debug, Default, Clone)]
pub struct TestSnapshotStore<I> {
snapshots: HashMap<I, (NonZeroU64, Bytes)>,
snapshots: Arc<RwLock<HashMap<I, (NonZeroU64, Bytes)>>>,
}

impl<I> SnapshotStore for TestSnapshotStore<I>
Expand All @@ -33,7 +35,10 @@ where
ToBytesError: StdError + Send + Sync + 'static,
{
let bytes = to_bytes(state).map_err(|error| Error(error.into()))?;
self.snapshots.insert(id.to_owned(), (seq_no, bytes));
self.snapshots
.write()
.await
.insert(id.to_owned(), (seq_no, bytes));
Ok(())
}

Expand All @@ -47,6 +52,8 @@ where
FromBytesError: StdError + Send + Sync + 'static,
{
self.snapshots
.read()
.await
.get(id)
.map(|(seq_no, bytes)| {
from_bytes(bytes.to_owned())
Expand Down

0 comments on commit 920e6f5

Please sign in to comment.