Skip to content

Commit

Permalink
re-added publishing PolicyEvent in PolicyPersistenceActor + adjusted …
Browse files Browse the repository at this point in the history
…test to fish for event

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Mar 29, 2022
1 parent eaf04da commit 13531e6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ protected DittoRuntimeExceptionBuilder<?> newNotAccessibleExceptionBuilder() {

@Override
protected void publishEvent(final PolicyEvent<?> event) {
pubSubMediator.tell(DistPubSubAccess.publishViaGroup(PolicyEvent.TYPE_PREFIX, event), getSender());

final PolicyTag policyTag = PolicyTag.of(entityId, event.getRevision());
pubSubMediator.tell(DistPubSubAccess.publishViaGroup(PolicyTag.PUB_SUB_TOPIC_MODIFIED, policyTag), getSender());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;
import scala.PartialFunction;

/**
* Unit test for the {@link PolicyPersistenceActor}.
Expand Down Expand Up @@ -784,7 +784,10 @@ public void createSubjectWithExpiry() {

// THEN: a SubjectCreated event should be emitted
final DistributedPubSubMediator.Publish subjectCreatedPublish =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(2, "s"),
"publish event",
publishedPolicyEvent());
assertThat(subjectCreatedPublish.msg()).isInstanceOf(SubjectCreated.class);

final long secondsToAdd = 10 - (expiryInstant.getEpochSecond() % 10);
Expand Down Expand Up @@ -813,9 +816,11 @@ public void createSubjectWithExpiry() {
expectedRoundedExpiryInstant);
final long secondsToWaitForSubjectDeletedEvent = between.getSeconds() + 2;
final DistributedPubSubMediator.Publish policySubjectDeleted =
pubSubMediatorTestProbe.expectMsgClass(
FiniteDuration.apply(secondsToWaitForSubjectDeletedEvent, TimeUnit.SECONDS),
DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(secondsToWaitForSubjectDeletedEvent, "s"),
"publish event",
publishedPolicyEvent()
);
final Object subjectDeletedMsg = policySubjectDeleted.msg();
assertThat(subjectDeletedMsg).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subjectDeletedMsg).getSubjectId())
Expand Down Expand Up @@ -1004,9 +1009,10 @@ public void createPolicyWith2SubjectsWithExpiry() {
expectedRoundedExpiryInstant);
final long secondsToWaitForSubjectDeletedEvent = between.getSeconds() + 2;
final DistributedPubSubMediator.Publish subject1Deleted =
pubSubMediatorTestProbe.expectMsgClass(
FiniteDuration.apply(secondsToWaitForSubjectDeletedEvent, TimeUnit.SECONDS),
DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(secondsToWaitForSubjectDeletedEvent, "s"),
"publish event",
publishedPolicyEvent());
assertThat(subject1Deleted.msg()).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subject1Deleted.msg()).getSubjectId())
.isIn(subject1.getId(), subject2.getId());
Expand All @@ -1016,7 +1022,10 @@ public void createPolicyWith2SubjectsWithExpiry() {

// THEN: subject2 is deleted immediately
final DistributedPubSubMediator.Publish subject2Deleted =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(secondsToWaitForSubjectDeletedEvent, "s"),
"publish event",
publishedPolicyEvent());
assertThat(subject2Deleted.msg()).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subject2Deleted.msg()).getSubjectId())
.isIn(subject1.getId(), subject2.getId());
Expand All @@ -1030,6 +1039,12 @@ public void createPolicyWith2SubjectsWithExpiry() {
}};
}

private PartialFunction<Object, Object> publishedPolicyEvent() {
return PartialFunction.fromFunction(msg ->
msg instanceof DistributedPubSubMediator.Publish publish &&
publish.topic().startsWith(PolicyEvent.TYPE_PREFIX));
}

@Test
public void impossibleToMakePolicyInvalidByExpiringSubjects() {
new TestKit(actorSystem) {{
Expand Down Expand Up @@ -1197,7 +1212,11 @@ public void recoverPolicyEntryModified() {

// event published with group:
final DistributedPubSubMediator.Publish policyEntryModifiedPublishSecond =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(2, "s"),
"publish event",
publishedPolicyEvent()
);
assertThat(policyEntryModifiedPublishSecond.msg()).isInstanceOf(PolicyEntryCreated.class);

// restart
Expand Down Expand Up @@ -1325,9 +1344,11 @@ public void ensureSubjectExpiryIsCleanedUpAfterRecovery() {
expectedRoundedExpiryInstant);
final long secondsToWaitForSubjectDeletedEvent = between.getSeconds() + 2;
final DistributedPubSubMediator.Publish policySubjectDeleted =
pubSubMediatorTestProbe.expectMsgClass(
FiniteDuration.apply(secondsToWaitForSubjectDeletedEvent, TimeUnit.SECONDS),
DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(secondsToWaitForSubjectDeletedEvent, "s"),
"publish event",
publishedPolicyEvent()
);
final Object subjectDeletedMsg = policySubjectDeleted.msg();
assertThat(subjectDeletedMsg).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subjectDeletedMsg).getSubjectId())
Expand Down Expand Up @@ -1406,7 +1427,11 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce

// THEN: the created event should be emitted
final DistributedPubSubMediator.Publish policyCreatedPublishSecond =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(2, "s"),
"publish event",
publishedPolicyEvent()
);
assertThat(policyCreatedPublishSecond.msg()).isInstanceOf(PolicyCreated.class);

// WHEN: now the persistence actor is terminated
Expand Down Expand Up @@ -1450,7 +1475,10 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce

// THEN: waiting until the expiry interval should emit a SubjectDeleted event
final DistributedPubSubMediator.Publish policySubjectDeleted =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
(DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(2, "s"),
"publish event",
publishedPolicyEvent());
final Object subjectDeletedMsg = policySubjectDeleted.msg();
assertThat(subjectDeletedMsg).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subjectDeletedMsg).getSubjectId())
Expand Down Expand Up @@ -1496,9 +1524,9 @@ public void ensureSequenceNumberCorrectness() {

// retrieve the policy's sequence number
final long versionExpected = 2;
final Policy policyExpected = PoliciesModelFactory.newPolicyBuilder(policy) //
.remove(ANOTHER_POLICY_LABEL) //
.setRevision(versionExpected) //
final Policy policyExpected = PoliciesModelFactory.newPolicyBuilder(policy)
.remove(ANOTHER_POLICY_LABEL)
.setRevision(versionExpected)
.build();
final RetrievePolicy retrievePolicy =
RetrievePolicy.of(policy.getEntityId().orElse(null), dittoHeadersV2);
Expand Down Expand Up @@ -1531,9 +1559,9 @@ public void ensureSequenceNumberCorrectnessAfterRecovery() {

// retrieve the policy's sequence number from recovered actor
final long versionExpected = 2;
final Policy policyExpected = PoliciesModelFactory.newPolicyBuilder(policy) //
.remove(ANOTHER_POLICY_LABEL) //
.setRevision(versionExpected) //
final Policy policyExpected = PoliciesModelFactory.newPolicyBuilder(policy)
.remove(ANOTHER_POLICY_LABEL)
.setRevision(versionExpected)
.build();

// restart
Expand Down Expand Up @@ -1751,7 +1779,11 @@ private void waitPastTimeBorder() {
}

private PolicyEvent<?> expectPolicyEvent() {
final var publish = pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
final var publish = (DistributedPubSubMediator.Publish) pubSubMediatorTestProbe.fishForMessage(
scala.concurrent.duration.Duration.create(2, "s"),
"publish event",
publishedPolicyEvent()
);
assertThat(publish.message()).isInstanceOf(PolicyEvent.class);
return (PolicyEvent<?>) publish.message();
}
Expand Down

0 comments on commit 13531e6

Please sign in to comment.