Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions src/commit_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ impl<R: AsyncTransactionalCommit> AsyncReadModelWritePlanCommitExt for R {}
mod tests {
use super::*;
use crate::{
impl_aggregate, AsyncTransactionalCommit, Entity, EventRecord, Get, HashMapRepository,
ReadModelWorkspaceExt, RowKey, RowValue,
sourced, AsyncTransactionalCommit, Entity, Get, HashMapRepository, ReadModelWorkspaceExt,
RowKey, RowValue,
};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
Expand All @@ -590,25 +590,20 @@ mod tests {
entity: Entity,
}

#[sourced(entity)]
impl TestAggregate {
#[event("Touched")]
fn touch(&mut self) {
if self.entity.id().is_empty() {
self.entity.set_id("agg-1");
}
self.entity.digest_empty("Touched").unwrap();
}

fn replay(&mut self, _event: &EventRecord) -> Result<(), String> {
Ok(())
}
}

impl_aggregate!(TestAggregate, entity, replay);

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, crate::ReadModel)]
#[readmodel(table = "commit_builder_views")]
#[table("commit_builder_views")]
struct RelationalView {
#[readmodel(id)]
#[id]
id: String,
counter: i32,
}
Expand Down Expand Up @@ -773,7 +768,7 @@ mod tests {
};

let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
.commit_sync(&mut agg)
Expand All @@ -798,7 +793,7 @@ mod tests {
};

let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

let mut read_models = crate::read_model::ReadModelWritePlanBuilder::new();
read_models.upsert(&view1).unwrap().upsert(&view2).unwrap();
Expand All @@ -823,7 +818,7 @@ mod tests {
let outbox = OutboxMessage::create("msg-1", "TestEvent", b"{}".to_vec()).unwrap();

let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
.outbox_sync(outbox)
Expand All @@ -845,7 +840,7 @@ mod tests {
let outbox = OutboxMessage::create("msg-2", "TestEvent", b"{}".to_vec()).unwrap();

let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

SyncCommitBuilderExt::outbox_sync(&repo, outbox)
.read_models_sync(read_models(&view))
Expand Down Expand Up @@ -895,11 +890,11 @@ mod tests {
};

let mut agg1 = TestAggregate::default();
agg1.touch();
agg1.touch().unwrap();
agg1.entity.set_id("agg-1");

let mut agg2 = TestAggregate::default();
agg2.touch();
agg2.touch().unwrap();
agg2.entity.set_id("agg-2");

SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
Expand All @@ -924,7 +919,7 @@ mod tests {
};
let outbox = OutboxMessage::create("ordered-msg", "TestEvent", b"{}".to_vec()).unwrap();
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

match order {
0 => SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
Expand Down Expand Up @@ -961,7 +956,7 @@ mod tests {
fn staged_commit_sets_outbox_source_from_single_aggregate() {
let repo = RecordingBatchRepo::default();
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();
let outbox = OutboxMessage::create("sourced-msg", "TestEvent", b"{}".to_vec()).unwrap();

SyncReadModelWritePlanCommitExt::aggregate_sync(&repo, &mut agg)
Expand All @@ -988,10 +983,10 @@ mod tests {
counter: 77,
};
let mut agg1 = TestAggregate::default();
agg1.touch();
agg1.touch().unwrap();
agg1.entity.set_id("agg-1");
let mut agg2 = TestAggregate::default();
agg2.touch();
agg2.touch().unwrap();
agg2.entity.set_id("agg-2");

SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
Expand All @@ -1016,7 +1011,7 @@ mod tests {
let mut session = crate::read_model::ReadModelWritePlanBuilder::new();
session.mark_processed("", "message-1");
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

let err = SyncReadModelWritePlanCommitExt::read_models_sync(&repo, session)
.commit_sync(&mut agg)
Expand All @@ -1042,7 +1037,7 @@ mod tests {
};
let outbox = OutboxMessage::create("msg-rollback", "TestEvent", b"{}".to_vec()).unwrap();
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

let err = SyncReadModelWritePlanCommitExt::read_models_sync(&repo, read_models(&view))
.outbox_sync(outbox)
Expand Down Expand Up @@ -1082,7 +1077,7 @@ mod tests {
counter: 42,
};
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

repo.read_models(read_models(&view))
.commit(&mut agg)
Expand Down Expand Up @@ -1111,7 +1106,7 @@ mod tests {
counter: 7,
};
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();
let outbox = OutboxMessage::create("async-msg", "TestEvent", b"{}".to_vec()).unwrap();

repo.read_models(read_models(&view))
Expand Down Expand Up @@ -1140,10 +1135,10 @@ mod tests {
async fn async_commit_many_supports_same_type_aggregates() {
let repo = RecordingAsyncBatchRepo::default();
let mut agg1 = TestAggregate::default();
agg1.touch();
agg1.touch().unwrap();
agg1.entity.set_id("async-agg-1");
let mut agg2 = TestAggregate::default();
agg2.touch();
agg2.touch().unwrap();
agg2.entity.set_id("async-agg-2");

AsyncCommitBuilder::new(&repo)
Expand Down Expand Up @@ -1172,7 +1167,7 @@ mod tests {
let mut read_models = crate::read_model::ReadModelWritePlanBuilder::new();
read_models.mark_processed("", "message-1");
let mut agg = TestAggregate::default();
agg.touch();
agg.touch().unwrap();

let err = repo
.read_models(read_models)
Expand Down
17 changes: 6 additions & 11 deletions src/outbox/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl<R, A> AsyncAggregateRepository<R, A> {
mod tests {
use super::*;
use crate::{
impl_aggregate, AggregateBuilder, CommitBatch, Entity, EventRecord, HashMapRepository,
OutboxStore, TransactionalCommit,
sourced, AggregateBuilder, CommitBatch, Entity, HashMapRepository, OutboxStore,
TransactionalCommit,
};
use std::cell::RefCell;

Expand All @@ -96,21 +96,16 @@ mod tests {
entity: Entity,
}

#[sourced(entity)]
impl Dummy {
#[event("Touched")]
fn touch(&mut self) {
if self.entity.id().is_empty() {
self.entity.set_id("dummy-1");
}
self.entity.digest_empty("Touched").unwrap();
}

fn replay(&mut self, _event: &EventRecord) -> Result<(), String> {
Ok(())
}
}

impl_aggregate!(Dummy, entity, replay);

#[derive(Default)]
struct FailingOutboxRepo {
seen_ids: RefCell<Vec<String>>,
Expand Down Expand Up @@ -139,7 +134,7 @@ mod tests {
let repo = HashMapRepository::new().aggregate::<Dummy>();

let mut aggregate = Dummy::default();
aggregate.touch();
aggregate.touch().unwrap();

let event = OutboxMessage::create("msg-1", "DummyTouched", b"{}".to_vec()).unwrap();

Expand All @@ -155,7 +150,7 @@ mod tests {
let repo = AggregateRepository::<_, Dummy>::new(FailingOutboxRepo::default());

let mut aggregate = Dummy::default();
aggregate.touch();
aggregate.touch().unwrap();

let event = OutboxMessage::create("msg-fail", "DummyTouched", b"{}".to_vec()).unwrap();

Expand Down
13 changes: 4 additions & 9 deletions src/snapshot/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{impl_aggregate, Aggregate, AggregateRepository, Entity, EventRecord};
use crate::{sourced, Aggregate, AggregateRepository, Entity, EventRecord};
use std::cell::RefCell;

#[derive(Default)]
Expand All @@ -551,22 +551,17 @@ mod tests {
value: u32,
}

#[sourced(entity)]
impl TestAggregate {
#[event("Touched")]
fn touch(&mut self) {
if self.entity.id().is_empty() {
self.entity.set_id("snap-1");
}
self.value += 1;
self.entity.digest_empty("Touched").unwrap();
}

fn replay(&mut self, _event: &EventRecord) -> Result<(), String> {
Ok(())
}
}

impl_aggregate!(TestAggregate, entity, replay);

impl Snapshottable for TestAggregate {
type Snapshot = u32;

Expand Down Expand Up @@ -605,7 +600,7 @@ mod tests {
let snapshot_repo = SnapshotAggregateRepository::new(aggregate_repo, 1);

let mut aggregate = TestAggregate::default();
aggregate.touch();
aggregate.touch().unwrap();

let err = snapshot_repo.commit(&mut aggregate).unwrap_err();

Expand Down
Loading
Loading