Skip to content

Commit

Permalink
use Control.SERVICE_REQUESTS_DONE of PersistenceCleanupActor;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 26, 2022
1 parent 53178f2 commit 1f9cb25
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.operations.AbstractPersistenceOperationsActor;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;

Expand Down Expand Up @@ -123,8 +122,7 @@ private PersistenceCleanupActor(final CleanupConfig config,
* @param myRole the cluster role of this node among which the background cleanup responsibility is divided.
* @return the Props object.
*/
public static Props props(final CleanupConfig config,
final MongoReadJournal mongoReadJournal,
public static Props props(final CleanupConfig config, final MongoReadJournal mongoReadJournal,
final String myRole) {

return Props.create(PersistenceCleanupActor.class, config, mongoReadJournal, myRole);
Expand All @@ -137,8 +135,7 @@ public void preStart() throws Exception {
final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
final var serviceRequestsDoneTask = "service-requests-done-" + ACTOR_NAME;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
() -> Patterns.ask(getSelf(), AbstractPersistenceOperationsActor.Control.SERVICE_REQUESTS_DONE,
SHUTDOWN_ASK_TIMEOUT)
() -> Patterns.ask(getSelf(), Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);

Expand Down Expand Up @@ -171,6 +168,7 @@ private FSMStateFunctionBuilder<State, String> inAnyState() {
return matchEvent(RetrieveHealth.class, this::retrieveHealth)
.event(RetrieveConfig.class, (retrieveConfig, lastPid) -> {
retrieveConfigBehavior().onMessage().apply(retrieveConfig);

return stay();
})
.event(ModifyConfig.class, (modifyConfig, lastPid) -> {
Expand All @@ -180,12 +178,14 @@ private FSMStateFunctionBuilder<State, String> inAnyState() {
.filter(JsonValue::isString)
.map(JsonValue::asString);
final var stay = stay();

return setLastPid.map(stay::using).orElse(stay);
})
.eventEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.anyEvent((message, lastPid) -> {
logger.warning("Got unhandled message <{}> when state=<{}> lastPid=<{}>",
message, stateName().name(), lastPid);

return stay();
});
}
Expand Down Expand Up @@ -230,9 +230,11 @@ private FSM.State<State, String> streamComplete(final Control streamComplete, fi
if (config.isEnabled()) {
final var nextQuietPeriod = randomizeQuietPeriod();
logger.info("Stream complete. Next stream in <{}> from start", nextQuietPeriod);

return result.forMax(nextQuietPeriod);
} else {
logger.info("Stream complete and disabled.");

return result;
}
}
Expand All @@ -241,10 +243,13 @@ private FSM.State<State, String> streamFailed(final Control streamComplete, fina
final var result = goTo(State.IN_QUIET_PERIOD).using(lastPid);
if (config.isEnabled()) {
final var nextQuietPeriod = randomizeQuietPeriod();
logger.info("Stream failed or shutdown. Next stream in <{}> starting from <{}>", nextQuietPeriod, lastPid);
logger.info("Stream failed or shutdown. Next stream in <{}> starting from <{}>", nextQuietPeriod,
lastPid);

return result.forMax(nextQuietPeriod);
} else {
logger.info("Stream failed or shutdown and disabled. Last PID=<{}>", lastPid);

return result;
}
}
Expand All @@ -262,9 +267,11 @@ private FSM.State<State, String> shutdownRunningStream(final Control shutdown, f
private FSM.State<State, String> shutdownInQuietPeriod(final Control shutdown, final String lastPid) {
if (config.isEnabled()) {
logger.info("Starting stream from <{}> in <{}> on request", lastPid, config.getQuietPeriod());

return goTo(State.IN_QUIET_PERIOD).forMax(config.getQuietPeriod());
} else {
logger.info("Stream disabled. lastPid=<{}>", lastPid);

return goTo(State.IN_QUIET_PERIOD);
}
}
Expand Down Expand Up @@ -304,6 +311,7 @@ private Done streamCompletedOrFailed(@Nullable final Done done, @Nullable final
}
getSelf().tell(Control.STREAM_FAILED, ActorRef.noSender());
}
killSwitch = null;

return Done.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,36 @@ public void shutdownWhenRunning() {
}};
}

@Test
public void shutdownWhenStreamCompleted() throws InterruptedException {
new TestKit(actorSystem) {{
final ActorRef underTest = childActorOf(testProps());
final var retrieveHealth = RetrieveHealth.newInstance();
final var probeSource =
TestSource.<Source<CleanupResult, NotUsed>>probe(actorSystem);
final var probeSourcePair = probeSource.preMaterialize(actorSystem);
final var probe = probeSourcePair.first();
sourceBox.set(probeSourcePair.second());
underTest.tell(FSM.StateTimeout$.MODULE$, ActorRef.noSender());
probe.expectRequest();

probe.sendNext(Source.single(new CleanupResult(
CleanupResult.Type.SNAPSHOTS,
new SnapshotRevision("thing:p:id", 1234, true),
DeleteResult.acknowledged(4)
)));
waitForResponse(this, underTest, retrieveHealthResponse("RUNNING", "thing:p:id"), probe::expectNoMsg);

// WHEN stream completes successfully and graceful shutdown is initiated
probe.sendComplete();
waitForResponse(this, underTest, retrieveHealthResponse("IN_QUIET_PERIOD", ""), probe::expectNoMsg);
underTest.tell(PersistenceCleanupActor.Control.SERVICE_REQUESTS_DONE, getRef());

// THEN expect Done
expectMsg(Done.getInstance());
}};
}

private void waitForResponse(final TestKit testKit,
final ActorRef underTest,
final RetrieveHealthResponse expectedResponse,
Expand Down

0 comments on commit 1f9cb25

Please sign in to comment.