Skip to content

Commit

Permalink
terminate persistence cleanup stream when graceful shutdown is trigge…
Browse files Browse the repository at this point in the history
…red;

add test to PersistenceCleanupActor;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 21, 2022
1 parent 4a8c312 commit 83fb8cb
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,

final var cleanupConfig = connectivityConfig.getConnectionConfig().getCleanupConfig();
final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE);
startChildActor(PersistenceCleanupActor.NAME, cleanupActorProps);
startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps);

final ActorRef healthCheckingActor = getHealthCheckingActor(connectivityConfig);
bindHttpStatusRoute(connectivityConfig.getHttpConfig(), healthCheckingActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.operations.AbstractPersistenceOperationsActor;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;

Expand All @@ -40,11 +41,13 @@
import akka.Done;
import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.actor.CoordinatedShutdown;
import akka.actor.FSM;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.japi.Pair;
import akka.japi.pf.FSMStateFunctionBuilder;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
Expand All @@ -61,13 +64,19 @@ public final class PersistenceCleanupActor extends AbstractFSM<PersistenceCleanu
/**
* Name of this actor.
*/
public static final String NAME = "persistenceCleanup";
public static final String ACTOR_NAME = "persistenceCleanup";

/**
* JSON field of ModifyConfig commands that
*/
private static final String SET_LAST_PID = "last-pid";

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough but ultimately does not
* matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

private static final Throwable KILL_SWITCH_EXCEPTION = new IllegalStateException();

private final ThreadSafeDittoLoggingAdapter logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
Expand Down Expand Up @@ -123,6 +132,15 @@ public static Props props(final CleanupConfig config,
@Override
public void preStart() throws Exception {
super.preStart();

final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
final var serviceRequestsDoneTask = "service-requests-done-" + ACTOR_NAME;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
() -> Patterns.ask(getSelf(), AbstractPersistenceOperationsActor.Control.SERVICE_REQUESTS_DONE,
SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);

if (config.isEnabled()) {
startWith(State.IN_QUIET_PERIOD, "", randomizeQuietPeriod());
} else {
Expand All @@ -136,14 +154,16 @@ public void preStart() throws Exception {

private FSMStateFunctionBuilder<State, String> inQuietPeriod() {
return matchEventEquals(StateTimeout(), this::startStream)
.eventEquals(Control.SHUTDOWN, this::shutdownInQuietPeriod);
.eventEquals(Control.SHUTDOWN, this::shutdownInQuietPeriod)
.eventEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone);
}

private FSMStateFunctionBuilder<State, String> running() {
return matchEvent(CleanupResult.class, this::logCleanupResult)
.eventEquals(Control.STREAM_COMPLETE, this::streamComplete)
.eventEquals(Control.STREAM_FAILED, this::streamFailed)
.eventEquals(Control.SHUTDOWN, this::shutdownRunningStream);
.eventEquals(Control.SHUTDOWN, this::shutdownRunningStream)
.eventEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone);
}

private FSMStateFunctionBuilder<State, String> inAnyState() {
Expand Down Expand Up @@ -180,6 +200,7 @@ private FSM.State<State, String> startStream(final StateTimeout$ stateTimeout, f

killSwitch = materializedValues.first();
materializedValues.second().handle(this::streamCompletedOrFailed);

return goTo(State.RUNNING);
}

Expand All @@ -198,6 +219,7 @@ private FSM.State<State, String> logCleanupResult(final CleanupResult result, fi
deleteEventsCounter.increment(result.result.getDeletedCount());
break;
}

return stay().using(nextPid);
}

Expand Down Expand Up @@ -226,11 +248,12 @@ private FSM.State<State, String> streamFailed(final Control streamComplete, fina
}

private FSM.State<State, String> shutdownRunningStream(final Control shutdown, final String lastPid) {
logger.info("Activating kill-switch by demand: <{}>", killSwitch);
logger.info("Activating kill-switch on demand: <{}>", killSwitch);
if (killSwitch != null) {
// using ABORT to preserve lastPid
killSwitch.abort(KILL_SWITCH_EXCEPTION);
}

return stay();
}

Expand All @@ -254,13 +277,15 @@ private FSM.State<State, String> retrieveHealth(final RetrieveHealth retrieveHea
DittoHeaders.empty()
);
getSender().tell(response, getSelf());

return stay();
}

private Duration randomizeQuietPeriod() {
final long divisor = 1024;
final long multiplier = (long) (Math.random() * divisor);
final var quietPeriod = config.getQuietPeriod();

return quietPeriod.plus(quietPeriod.multipliedBy(multiplier).dividedBy(divisor));
}

Expand All @@ -277,6 +302,7 @@ private Done streamCompletedOrFailed(@Nullable final Done done, @Nullable final
}
getSelf().tell(Control.STREAM_FAILED, ActorRef.noSender());
}

return Done.getInstance();
}

Expand All @@ -291,13 +317,25 @@ public Config setConfig(final Config config) {
cleanup = Cleanup.of(this.config, mongoReadJournal, materializer, responsibilitySupplier);
credits = Credits.of(this.config);
getSelf().tell(Control.SHUTDOWN, ActorRef.noSender());

return this.config.render();
}

private enum Control {
private FSM.State<State, String> serviceRequestsDone(final Control serviceRequestsDone, final String lastPid) {
if (killSwitch != null) {
logger.info("Aborting stream because of graceful shutdown.");
killSwitch.abort(KILL_SWITCH_EXCEPTION);
}
getSender().tell(Done.getInstance(), getSelf());

return stay();
}

public enum Control {
STREAM_COMPLETE,
STREAM_FAILED,
SHUTDOWN
SHUTDOWN,
SERVICE_REQUESTS_DONE
}

/**
Expand All @@ -307,4 +345,5 @@ public enum State {
IN_QUIET_PERIOD,
RUNNING
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand All @@ -64,7 +65,8 @@
*/
public final class PersistenceCleanupActorTest {

private final ActorSystem actorSystem = ActorSystem.create();
private final ActorSystem actorSystem = ActorSystem.create("test",
ConfigFactory.load("test.conf"));
private final AtomicReference<Source<Source<CleanupResult, NotUsed>, NotUsed>> sourceBox =
new AtomicReference<>(Source.empty());
private Cleanup cleanup;
Expand Down Expand Up @@ -244,6 +246,41 @@ public void modifyConfigWhenRunning() {
}};
}

@Test
public void shutdownInQuietPeriod() {
new TestKit(actorSystem) {{
final ActorRef underTest = childActorOf(testProps());

underTest.tell(PersistenceCleanupActor.Control.SERVICE_REQUESTS_DONE, getRef());
expectMsg(Done.getInstance());
}};
}

@Test
public void shutdownWhenRunning() {
new TestKit(actorSystem) {{
final ActorRef underTest = childActorOf(testProps());

// GIVEN a cleanup stream is running
final var retrieveHealth = RetrieveHealth.newInstance();
final var probeSource =
TestSource.<Source<CleanupResult, NotUsed>>probe(actorSystem);
final var probeSourcePair = probeSource.preMaterialize(actorSystem);
final var probe = probeSourcePair.first();
sourceBox.set(probeSourcePair.second());
underTest.tell(FSM.StateTimeout$.MODULE$, ActorRef.noSender());
underTest.tell(retrieveHealth, getRef());
expectMsg(retrieveHealthResponse("RUNNING", ""));

// WHEN graceful shutdown is initiated
underTest.tell(PersistenceCleanupActor.Control.SERVICE_REQUESTS_DONE, getRef());

// THEN expect cancellation and Done
probe.expectCancellation();
expectMsg(Done.getInstance());
}};
}

private void waitForResponse(final TestKit testKit,
final ActorRef underTest,
final RetrieveHealthResponse expectedResponse,
Expand Down
3 changes: 3 additions & 0 deletions internal/utils/persistent-actors/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ akka {

# for log messages during the actor system is starting up and shutting down:
stdout-loglevel = "INFO"

coordinated-shutdown.terminate-actor-system = off
coordinated-shutdown.run-by-actor-system-terminate = off
}

akka.persistence.journal.plugin = "mock-journal"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class PolicyPersistenceOperationsActor extends AbstractPersistenceO

public static final String ACTOR_NAME = "policyOps";

private PolicyPersistenceOperationsActor(final ActorRef pubSubMediator,
PolicyPersistenceOperationsActor(final ActorRef pubSubMediator,
final NamespacePersistenceOperations namespaceOps,
final EntityPersistenceOperations entitiesOps,
final MongoClientWrapper mongoClient,
Expand Down Expand Up @@ -85,20 +85,6 @@ public static Props props(final ActorRef pubSubMediator,
});
}

static Props propsForTest(final ActorRef pubSubMediator,
final MongoDbConfig mongoDbConfig,
final PersistenceOperationsConfig persistenceOperationsConfig,
final NamespacePersistenceOperations namespaceOps,
final EntityPersistenceOperations entitiesOps) {

return Props.create(PolicyPersistenceOperationsActor.class, () -> {
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);

return new PolicyPersistenceOperationsActor(pubSubMediator, namespaceOps, entitiesOps, mongoClient,
persistenceOperationsConfig);
});
}

@Override
public String getActorName() {
return ACTOR_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private PoliciesRootActor(final PoliciesConfig policiesConfig, final ActorRef pu

final var cleanupConfig = policiesConfig.getPolicyConfig().getCleanupConfig();
final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE);
startChildActor(PersistenceCleanupActor.NAME, cleanupActorProps);
startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps);

final var healthCheckConfig = policiesConfig.getHealthCheckConfig();
final var hcBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.ops.eventsource.MongoEventSourceITAssertions;
import org.eclipse.ditto.internal.utils.persistence.operations.EntityPersistenceOperations;
import org.eclipse.ditto.internal.utils.persistence.operations.NamespacePersistenceOperations;
Expand Down Expand Up @@ -219,8 +220,7 @@ protected ActorRef startActorUnderTest(final ActorSystem actorSystem, final Acto
}

private ActorRef startActorUnderTest(final ActorSystem actorSystem, final ActorRef pubSubMediator) {
final Props opsActorProps = PolicyPersistenceOperationsActor.propsForTest(pubSubMediator, mongoDbConfig,
persistenceOperationsConfig, namespaceOpsMock, entitiesOpsMock);
final Props opsActorProps = testProps(pubSubMediator);

return actorSystem.actorOf(opsActorProps, PolicyPersistenceOperationsActor.ACTOR_NAME);
}
Expand All @@ -232,4 +232,12 @@ protected ActorRef startEntityActor(final ActorSystem system, final ActorRef pub
return system.actorOf(props, id.toString());
}

private Props testProps(final ActorRef pubSubMediator) {
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);

return Props.create(PolicyPersistenceOperationsActor.class,
() -> new PolicyPersistenceOperationsActor(pubSubMediator, namespaceOpsMock, entitiesOpsMock,
mongoClient, persistenceOperationsConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private ThingsRootActor(final ThingsConfig thingsConfig,
final var cleanupConfig = thingsConfig.getThingConfig().getCleanupConfig();
final var mongoReadJournal = newMongoReadJournal(thingsConfig.getMongoDbConfig(), actorSystem);
final Props cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE);
startChildActor(PersistenceCleanupActor.NAME, cleanupActorProps);
startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps);

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(snapshotStreamingActor), getSelf());
Expand Down

0 comments on commit 83fb8cb

Please sign in to comment.