Skip to content

Commit

Permalink
eclipse-ditto#890: schedule expiry cleanup also after snapshot recovery
Browse files Browse the repository at this point in the history
* added unit test for correct expiration after actor restart/recovery

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Dec 10, 2020
1 parent 9fb9007 commit 82950f0
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import akka.actor.Props;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.RecoveryCompleted;

/**
* PersistentActor which "knows" the state of a single {@link Policy}.
Expand Down Expand Up @@ -187,7 +188,15 @@ protected Receive matchAnyAfterInitialization() {
}

@Override
protected void onEntityModified(final PolicyEvent event) {
protected void recoveryCompleted(final RecoveryCompleted event) {
if (entity != null) {
onEntityModified();
becomeCreatedOrDeletedHandler();
}
}

@Override
protected void onEntityModified() {
findEarliestSubjectExpiryTimestamp(entity).ifPresent(earliestSubjectExpiryTimestamp -> {
if (timers().isTimerActive(NEXT_SUBJECT_EXPIRY_TIMER)) {
timers().cancel(NEXT_SUBJECT_EXPIRY_TIMER);
Expand All @@ -198,8 +207,7 @@ protected void onEntityModified(final PolicyEvent event) {
// there are currently expired subjects, so delete the oldest right away:
getSelf().tell(DeleteOldestExpiredSubject.INSTANCE, getSelf());
} else {
log.withCorrelationId(event).info(
"Scheduling message for deleting next expired subject in: <{}> - at: <{}>",
log.info("Scheduling message for deleting next expired subject in: <{}> - at: <{}>",
durationBetweenNowAndEarliestExpiry, earliestExpiry);
timers().startSingleTimer(NEXT_SUBJECT_EXPIRY_TIMER, DeleteOldestExpiredSubject.INSTANCE,
durationBetweenNowAndEarliestExpiry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,86 @@ public void recoverPolicyEntryDeleted() {
};
}

@Test
public void ensureSubjectExpiryIsCleanedUpAfterRecovery() {
new TestKit(actorSystem) {
{
final Policy policy = createPolicyWithRandomId();
final PolicyId policyId = policy.getEntityId().orElseThrow();
final ActorRef underTest = createPersistenceActorFor(this, policy);

final Instant expiryInstant = LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS)
.plus(2, ChronoUnit.SECONDS)
.atZone(ZoneId.systemDefault()).toInstant();
final SubjectExpiry subjectExpiry = SubjectExpiry.newInstance(expiryInstant);
final Subject expiringSubject =
Subject.newInstance(SubjectId.newInstance(SubjectIssuer.GOOGLE, "about-to-expire"),
SubjectType.GENERATED, subjectExpiry);
final Policy policyWithExpiringSubject = policy.toBuilder()
.setSubjectFor(TestConstants.Policy.SUPPORT_LABEL, expiringSubject)
.build();

final long secondsToAdd = 10 - (expiryInstant.getEpochSecond() % 10);
final Instant expectedRoundedExpiryInstant = expiryInstant.plusSeconds(secondsToAdd); // to next 10s rounded up
final SubjectExpiry expectedSubjectExpiry = SubjectExpiry.newInstance(expectedRoundedExpiryInstant);
final Subject expectedAdjustedSubjectToAdd = Subject.newInstance(expiringSubject.getId(),
expiringSubject.getType(), expectedSubjectExpiry);

final Policy adjustedPolicyWithExpiringSubject = policyWithExpiringSubject.toBuilder()
.setSubjectFor(TestConstants.Policy.SUPPORT_LABEL, expectedAdjustedSubjectToAdd)
.build();

// WHEN: a new Policy is created containing an expiring subject
final CreatePolicy createPolicyCommand = CreatePolicy.of(policyWithExpiringSubject, dittoHeadersV2);
underTest.tell(createPolicyCommand, getRef());

// THEN: the response should contain the adjusted (rounded up) expiry time
final CreatePolicyResponse createPolicy1Response = expectMsgClass(CreatePolicyResponse.class);
DittoPolicyAssertions.assertThat(createPolicy1Response.getPolicyCreated().get())
.isEqualEqualToButModified(adjustedPolicyWithExpiringSubject);

// THEN: the created event should be emitted
final DistributedPubSubMediator.Publish policyCreatedPublishSecond =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
assertThat(policyCreatedPublishSecond.msg()).isInstanceOf(PolicyCreated.class);

// WHEN: now the persistence actor is restarted
terminate(this, underTest);
final ActorRef underTestRecovered = createPersistenceActorFor(this, policy);

// AND WHEN: the policy is retrieved (and restored as a consequence)
final RetrievePolicy retrievePolicy = RetrievePolicy.of(policyId, dittoHeadersV2);
final RetrievePolicyResponse expectedResponse =
retrievePolicyResponse(incrementRevision(adjustedPolicyWithExpiringSubject, 1), dittoHeadersV2);

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
underTestRecovered.tell(retrievePolicy, getRef());
expectMsgEquals(expectedResponse);
});
assertThat(getLastSender()).isEqualTo(underTestRecovered);

// THEN: waiting until the expiry interval should emit a SubjectDeleted event
final Duration between = Duration.between(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant(),
expectedRoundedExpiryInstant);
final long secondsToWaitForSubjectDeletedEvent = between.getSeconds() + 2;
final DistributedPubSubMediator.Publish policySubjectDeleted =
pubSubMediatorTestProbe.expectMsgClass(
FiniteDuration.apply(secondsToWaitForSubjectDeletedEvent, TimeUnit.SECONDS),
DistributedPubSubMediator.Publish.class);
final Object subjectDeletedMsg = policySubjectDeleted.msg();
assertThat(subjectDeletedMsg).isInstanceOf(SubjectDeleted.class);
assertThat(((SubjectDeleted) subjectDeletedMsg).getSubjectId())
.isEqualTo(expectedAdjustedSubjectToAdd.getId());

// THEN: retrieving the expired subject should fail
final RetrieveSubject retrieveSubject =
RetrieveSubject.of(policyId, POLICY_LABEL, expiringSubject.getId(), dittoHeadersV2);
underTestRecovered.tell(retrieveSubject, getRef());
expectMsgClass(SubjectNotAccessibleException.class);
}
};
}

@Test
public void ensureSequenceNumberCorrectness() {
new TestKit(actorSystem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte
handleEvents = ReceiveBuilder.create()
.match(getEventClass(), event -> {
entity = getEventStrategy().handle(event, entity, getRevisionNumber());
onEntityModified(event);
onEntityModified();
})
.build();

Expand All @@ -107,10 +107,8 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte

/**
* Invoked whenever the locally cached entity by this PersistenceActor was modified.
*
* @param event the Event which caused the modification.
*/
protected void onEntityModified(final E event) {
protected void onEntityModified() {
// default: no-op
}

Expand Down

0 comments on commit 82950f0

Please sign in to comment.