Skip to content

Commit

Permalink
[#964] Move 'onEntityModified' after all other event persistence call…
Browse files Browse the repository at this point in the history
…backs; fix persistence actor test config.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 4, 2021
1 parent 84062ad commit bacbe91
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 6 deletions.
9 changes: 9 additions & 0 deletions services/connectivity/messaging/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ akka {
}
}

akka.persistence {
journal.auto-start-journals = [
"akka-contrib-mongodb-persistence-connection-journal"
]
snapshot-store.auto-start-snapshot-stores = [
"akka-contrib-mongodb-persistence-connection-snapshots"
]
}

akka-contrib-mongodb-persistence-connection-journal {
class = "akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
plugin-dispatcher = "connection-persistence-dispatcher"
Expand Down
2 changes: 1 addition & 1 deletion services/policies/config/src/main/resources/policies.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ akka-contrib-mongodb-persistence-policies-journal {
}

event-adapter-bindings {
"org.eclipse.ditto.signals.events.policies.PolicyEvent" = mongodbobject
"org.eclipse.ditto.signals.events.base.Event" = mongodbobject
"org.bson.BsonValue" = mongodbobject
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public void onMutation(final Command<?> command, final PolicyEvent<?> event, fin

final Policy previousEntity = entity;
persistAndApplyEvent(event, (persistedEvent, resultingEntity) -> {
announceSubjectDeletion(previousEntity, entity, persistedEvent.getDittoHeaders());
sendPastDueAnnouncementsOfNewSubjects(previousEntity, entity);
announceSubjectDeletion(previousEntity, entity, persistedEvent.getDittoHeaders());
if (shouldSendResponse(command.getDittoHeaders())) {
notifySender(getSender(), response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.eclipse.ditto.services.policies.persistence.testhelper.ETagTestUtils.retrieveSubjectResponse;
import static org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor.JOURNAL_TAG_ALWAYS_ALIVE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -935,7 +936,7 @@ public void sendAnnouncementBeforeExpiry() {

// THEN: announcements are published
final var captor = ArgumentCaptor.forClass(SubjectDeletionAnnouncement.class);
verify(policyAnnouncementPub, times(2)).publish(captor.capture(), any());
verify(policyAnnouncementPub, timeout(5000).times(2)).publish(captor.capture(), any());
final SubjectDeletionAnnouncement announcement1 = captor.getAllValues().get(0);
final SubjectDeletionAnnouncement announcement2 = captor.getAllValues().get(1);
Assertions.assertThat(announcement1.getSubjectIds()).containsExactly(subject1.getId());
Expand Down
11 changes: 10 additions & 1 deletion services/policies/persistence/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ akka.contrib.persistence.mongodb.mongo {
driver = "akka.contrib.persistence.mongodb.ScalaDriverPersistenceExtension"
}

akka.persistence {
journal.auto-start-journals = [
"akka-contrib-mongodb-persistence-policies-journal"
]
snapshot-store.auto-start-snapshot-stores = [
"akka-contrib-mongodb-persistence-policies-snapshots"
]
}

akka-contrib-mongodb-persistence-policies-journal {
class = "akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
plugin-dispatcher = "policy-persistence-dispatcher"
Expand All @@ -74,7 +83,7 @@ akka-contrib-mongodb-persistence-policies-journal {
}

event-adapter-bindings {
"org.eclipse.ditto.signals.events.policies.PolicyEvent" = mongodbobject
"org.eclipse.ditto.signals.events.base.Event" = mongodbobject
"org.bson.BsonValue" = mongodbobject
}
}
Expand Down
11 changes: 10 additions & 1 deletion services/things/persistence/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ akka.contrib.persistence.mongodb.mongo {
driver = "akka.contrib.persistence.mongodb.ScalaDriverPersistenceExtension"
}

akka.persistence {
journal.auto-start-journals = [
"akka-contrib-mongodb-persistence-things-journal"
]
snapshot-store.auto-start-snapshot-stores = [
"akka-contrib-mongodb-persistence-things-snapshots"
]
}

akka-contrib-mongodb-persistence-things-journal {
class = "akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
plugin-dispatcher = "thing-persistence-dispatcher"
Expand All @@ -60,7 +69,7 @@ akka-contrib-mongodb-persistence-things-journal {
}

event-adapter-bindings {
"org.eclipse.ditto.signals.events.things.ThingEvent" = mongodbobject
"org.eclipse.ditto.signals.events.base.Event" = mongodbobject
"org.bson.BsonValue" = mongodbobject
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte
handleEvents = ReceiveBuilder.create()
.match(getEventClass(), event -> {
entity = getEventStrategy().handle((E) event, entity, getRevisionNumber());
onEntityModified();
})
.match(EmptyEvent.class,
event -> log.withCorrelationId(event).debug("Recovered EmptyEvent: <{}>", event))
Expand Down Expand Up @@ -525,6 +524,7 @@ sequence no (e.g. 2), but old entity revision no (e.g. 1) will be created -> can
aftereffects.
*/
handler.accept(persistedEvent);
onEntityModified();

// save a snapshot if there were too many changes since the last snapshot
if (snapshotThresholdPassed()) {
Expand Down

0 comments on commit bacbe91

Please sign in to comment.