Skip to content

Commit

Permalink
[eclipse-ditto#890] Review: PolicyPersistenceActorTest.ensureSubjectE…
Browse files Browse the repository at this point in the history
…xpiryIsCleanedUpAfterRecovery

- Kill the persistence actor right away, so that it has no chance
  to delete the expired subject before termination

- Test the persistence actor in a shard region in order for
  SudoRetrievePolicy to be the first message the actor processes
  when restored.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 20, 2020
1 parent d3b92a6 commit 5f43b25
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@
import org.eclipse.ditto.model.policies.SubjectType;
import org.eclipse.ditto.model.policies.Subjects;
import org.eclipse.ditto.model.policies.assertions.DittoPolicyAssertions;
import org.eclipse.ditto.services.models.policies.PoliciesMessagingConstants;
import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicy;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicyResponse;
import org.eclipse.ditto.services.policies.persistence.TestConstants;
import org.eclipse.ditto.services.policies.persistence.serializer.PolicyMongoSnapshotAdapter;
import org.eclipse.ditto.services.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor;
import org.eclipse.ditto.signals.commands.cleanup.CleanupPersistence;
Expand Down Expand Up @@ -111,7 +113,10 @@
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.Cluster;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.TestProbe;
Expand Down Expand Up @@ -1187,7 +1192,15 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce
{
final Policy policy = createPolicyWithRandomId();
final PolicyId policyId = policy.getEntityId().orElseThrow();
final ActorRef underTest = createPersistenceActorFor(this, policy);
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.apply(actorSystem).withRole("policies");
final Props props =
PolicyPersistenceActor.props(policyId, new PolicyMongoSnapshotAdapter(), pubSubMediator);
final Cluster cluster = Cluster.get(actorSystem);
cluster.join(cluster.selfAddress());
final ActorRef underTest = ClusterSharding.get(actorSystem)
.start(PoliciesMessagingConstants.SHARD_REGION, props, shardingSettings,
ShardRegionExtractor.of(30, actorSystem));

final Instant expiryInstant = LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS)
.plus(2, ChronoUnit.SECONDS)
Expand Down Expand Up @@ -1217,6 +1230,7 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce

// THEN: the response should contain the adjusted (rounded up) expiry time
final CreatePolicyResponse createPolicy1Response = expectMsgClass(CreatePolicyResponse.class);
final ActorRef firstPersistenceActor = getLastSender();
DittoPolicyAssertions.assertThat(createPolicy1Response.getPolicyCreated().get())
.isEqualEqualToButModified(adjustedPolicyWithExpiringSubject);

Expand All @@ -1225,22 +1239,25 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
assertThat(policyCreatedPublishSecond.msg()).isInstanceOf(PolicyCreated.class);

// WHEN: now the persistence actor is terminated
firstPersistenceActor.tell(PoisonPill.getInstance(), ActorRef.noSender());

// WHEN: it is waited until the subject expired
final Duration between =
Duration.between(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant(),
expectedRoundedExpiryInstant);
TimeUnit.MILLISECONDS.sleep(between.toMillis() + 200L);

// WHEN: now the persistence actor is restarted
terminate(this, underTest);
final ActorRef underTestRecovered = createPersistenceActorFor(this, policy);

// AND WHEN: the policy is retrieved via concierge (and restored as a consequence)
final SudoRetrievePolicy sudoRetrievePolicy = SudoRetrievePolicy.of(policyId, dittoHeadersV2);
underTestRecovered.tell(sudoRetrievePolicy, getRef());
underTest.tell(sudoRetrievePolicy, getRef());
final SudoRetrievePolicyResponse sudoRetrievePolicyResponse =
expectMsgClass(SudoRetrievePolicyResponse.class);
assertThat(getLastSender()).isEqualTo(underTestRecovered);
final ActorRef secondPersistenceActor = getLastSender();

// THEN: the restored policy persistence actor has a different reference and the same actor path
assertThat(secondPersistenceActor).isNotEqualTo(firstPersistenceActor);
assertThat(secondPersistenceActor.path()).isEqualTo(firstPersistenceActor.path());

// THEN: returned policy via SudoRetrievePolicyResponse does no longer contain the already expired subject:
final Policy expectedPolicyWithoutExpiredSubject = incrementRevision(policy.toBuilder()
Expand Down Expand Up @@ -1270,7 +1287,7 @@ public void ensureExpiredSubjectIsRemovedDuringRecovery() throws InterruptedExce
// THEN: retrieving the expired subject should fail
final RetrieveSubject retrieveSubject =
RetrieveSubject.of(policyId, POLICY_LABEL, expiringSubject.getId(), dittoHeadersV2);
underTestRecovered.tell(retrieveSubject, getRef());
underTest.tell(retrieveSubject, getRef());
expectMsgClass(SubjectNotAccessibleException.class);
}
};
Expand Down
7 changes: 7 additions & 0 deletions services/policies/persistence/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ ditto {
policies {
include "policy-test"
}

mapping-strategy.implementation = "org.eclipse.ditto.services.models.policies.PoliciesMappingStrategies"
}

akka {
Expand All @@ -19,6 +21,7 @@ akka {
log-config-on-start = off

actor.allow-java-serialization = on
actor.provider = cluster

test {
# factor by which to scale timeouts during tests, e.g. to account for shared
Expand All @@ -40,6 +43,10 @@ akka {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}

cluster {
roles = ["policies"]
}
}

akka.contrib.persistence.mongodb.mongo {
Expand Down

0 comments on commit 5f43b25

Please sign in to comment.