From 352b0badba675446bca84cacfb13a042f91d35b2 Mon Sep 17 00:00:00 2001 From: Stefan Maute Date: Thu, 22 Sep 2022 07:47:02 +0200 Subject: [PATCH] add shutdown behaviour to SnapshotStreamingActor and terminate stream in PhaseServiceRequestsDone; add test for shutdown behaviour to SnapshotStreamingActorTest; change PubSub mechanism from send to publish for SudoStreamSnapshots commands in ThingsMetadataSource and SnapshotStreamingActor; Signed-off-by: Stefan Maute --- .../akka/streaming/TimestampPersistence.java | 1 + ...eamingActorWithConfigWithStatusReport.java | 5 + internal/utils/persistence/pom.xml | 4 + .../mongo/SnapshotStreamingActor.java | 96 ++++++++++++++++++- .../mongo/SnapshotStreamingActorTest.java | 48 ++++++++-- .../persistence/src/test/resources/test.conf | 4 + .../src/test/resources/test.conf | 1 + policies/service/src/test/resources/test.conf | 5 +- .../service/starter/ThingsRootActor.java | 1 - .../updater/actors/BackgroundSyncActor.java | 1 + .../updater/actors/ThingsMetadataSource.java | 4 +- 11 files changed, 155 insertions(+), 15 deletions(-) diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/streaming/TimestampPersistence.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/streaming/TimestampPersistence.java index b3a83f4935..6f7b6a8dd1 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/streaming/TimestampPersistence.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/streaming/TimestampPersistence.java @@ -57,4 +57,5 @@ public interface TimestampPersistence { * @return the tagged timestamp. */ Source>, NotUsed> getTaggedTimestamp(); + } diff --git a/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java b/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java index 833fb96b78..241d3490ff 100644 --- a/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java +++ b/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java @@ -176,12 +176,14 @@ public Config setConfig(final Config config) { if (!previousConfig.isEnabled() && this.config.isEnabled()) { scheduleWakeUp(); } + return this.config.getConfig(); } private Receive sleeping() { final var sleepingReceiveBuilder = ReceiveBuilder.create(); preEnhanceSleepingBehavior(sleepingReceiveBuilder); + return sleepingReceiveBuilder.match(WokeUp.class, this::wokeUp) .match(Event.class, this::addCustomEventToLog) .match(RetrieveHealth.class, this::retrieveHealth) @@ -195,6 +197,7 @@ private Receive sleeping() { private Receive streaming() { final var streamingReceiveBuilder = ReceiveBuilder.create(); preEnhanceStreamingBehavior(streamingReceiveBuilder); + return streamingReceiveBuilder .match(StreamTerminated.class, this::streamTerminated) .match(Event.class, this::addCustomEventToLog) @@ -291,6 +294,7 @@ private void restartStream() { log.info(description); getSelf().tell(StreamTerminated.normally(description), getSelf()); } + return null; }); } @@ -321,6 +325,7 @@ private JsonObject render() { .set(JsonFields.ENABLED, config.isEnabled()) .set(JsonFields.EVENTS, renderEvents(events)); postEnhanceStatusReport(statusReportBuilder); + return statusReportBuilder.build(); } diff --git a/internal/utils/persistence/pom.xml b/internal/utils/persistence/pom.xml index eb2ace121a..7a5c39cf00 100755 --- a/internal/utils/persistence/pom.xml +++ b/internal/utils/persistence/pom.xml @@ -25,6 +25,10 @@ Eclipse Ditto :: Internal :: Utils :: Persistence + + org.eclipse.ditto + ditto-things-api + org.eclipse.ditto ditto-internal-utils-akka diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActor.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActor.java index bf68fff850..09fa69d592 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActor.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActor.java @@ -12,9 +12,12 @@ */ package org.eclipse.ditto.internal.utils.persistence.mongo; +import static org.eclipse.ditto.things.api.ThingsMessagingConstants.THINGS_PERSISTENCE_STREAMING_ACTOR_NAME; + import java.time.Duration; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -26,6 +29,7 @@ import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots; import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig; @@ -35,38 +39,60 @@ import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault; +import akka.Done; import akka.NotUsed; import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.CoordinatedShutdown; import akka.actor.Props; +import akka.cluster.pubsub.DistributedPubSub; +import akka.cluster.pubsub.DistributedPubSubMediator; import akka.japi.pf.ReceiveBuilder; +import akka.pattern.Patterns; +import akka.stream.KillSwitches; import akka.stream.Materializer; +import akka.stream.SharedKillSwitch; import akka.stream.SourceRef; import akka.stream.javadsl.Source; import akka.stream.javadsl.StreamRefs; + /** * An actor that streams from the snapshot store of a service with Mongo persistence plugin on request. */ @AllValuesAreNonnullByDefault public final class SnapshotStreamingActor extends AbstractActor { + /** + * Ask-timeout in shutdown tasks. Its duration should be long enough but ultimately does not + * matter because each shutdown phase has its own timeout. + */ + private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L); + + private static final String ACTOR_NAME = THINGS_PERSISTENCE_STREAMING_ACTOR_NAME; + private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); private final Materializer materializer = Materializer.createMaterializer(this::getContext); + private final SharedKillSwitch killSwitch = KillSwitches.shared(ACTOR_NAME); private final Function pid2EntityId; private final Function entityId2Pid; private final DittoMongoClient mongoClient; private final MongoReadJournal readJournal; + private final ActorRef pubSubMediator; + @SuppressWarnings("unused") // called by reflection private SnapshotStreamingActor(final Function pid2EntityId, final Function entityId2Pid, final DittoMongoClient mongoClient, - final MongoReadJournal readJournal) { + final MongoReadJournal readJournal, + final ActorRef pubSubMediator) { this.pid2EntityId = pid2EntityId; this.entityId2Pid = entityId2Pid; this.mongoClient = mongoClient; this.readJournal = readJournal; + this.pubSubMediator = pubSubMediator; } @SuppressWarnings("unused") // called by reflection @@ -80,6 +106,7 @@ private SnapshotStreamingActor(final Function pid2EntityId, DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config)); mongoClient = MongoClientWrapper.newInstance(mongoDbConfig); readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem()); + pubSubMediator = DistributedPubSub.get(getContext().getSystem()).mediator(); } /** @@ -103,14 +130,38 @@ public static Props props(final Function pid2EntityId, * @param entityId2Pid function mapping entity ID to PID. * @param mongoClient MongoDB client. * @param readJournal the read journal. + * @param pubSubMediator the pubSubMediator. * @return Props for this actor. */ public static Props propsForTest(final Function pid2EntityId, final Function entityId2Pid, final DittoMongoClient mongoClient, - final MongoReadJournal readJournal) { + final MongoReadJournal readJournal, + final ActorRef pubSubMediator) { - return Props.create(SnapshotStreamingActor.class, pid2EntityId, entityId2Pid, mongoClient, readJournal); + return Props.create(SnapshotStreamingActor.class, pid2EntityId, entityId2Pid, mongoClient, readJournal, + pubSubMediator); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + + final var self = getSelf(); + pubSubMediator.tell(DistPubSubAccess.subscribeViaGroup(SudoStreamSnapshots.TYPE, ACTOR_NAME, self), self); + + final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem()); + final var serviceUnbindTask = "service-unbind-" + ACTOR_NAME; + coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind(), serviceUnbindTask, + () -> Patterns.ask(self, Control.SERVICE_UNBIND, SHUTDOWN_ASK_TIMEOUT) + .thenApply(reply -> Done.done()) + ); + + final var serviceRequestsDoneTask = "service-requests-done-" + ACTOR_NAME; + coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask, + () -> Patterns.ask(self, Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT) + .thenApply(reply -> Done.done()) + ); } @Override @@ -123,6 +174,9 @@ public void postStop() throws Exception { public Receive createReceive() { return ReceiveBuilder.create() .match(SudoStreamSnapshots.class, this::startStreaming) + .matchEquals(Control.SERVICE_UNBIND, this::serviceUnbind) + .matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone) + .match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck) .matchAny(message -> log.warning("Unexpected message: <{}>", message)) .build(); } @@ -136,12 +190,14 @@ private Source createSource(final SudoStreamSnapshots materializer, command.getSnapshotFields().stream().map(JsonValue::asString).toArray(String[]::new) ); + return snapshotSource.map(this::mapSnapshot).log("snapshot-streaming", log); } private SnapshotFilter getSnapshotFilterFromCommand(final SudoStreamSnapshots command) { final String start = command.hasNonEmptyLowerBound() ? entityId2Pid.apply(command.getLowerBound()) : ""; final String pidFilter = FilteredNamespacedEntityId.toPidFilter(command, entityId2Pid); + return SnapshotFilter.of(start, pidFilter); } @@ -178,15 +234,49 @@ private StreamedSnapshot mapSnapshot(final Document snapshot) { final EntityId entityId = pid2EntityId.apply(snapshot.getString(MongoReadJournal.S_ID)); snapshot.remove(MongoReadJournal.S_ID); final JsonObject snapshotJson = JsonObject.of(snapshot.toJson()); + return StreamedSnapshot.of(entityId, snapshotJson); } private void startStreaming(final SudoStreamSnapshots command) { final Duration timeout = Duration.ofMillis(command.getTimeoutMillis()); final SourceRef sourceRef = createSource(command) + .via(killSwitch.flow()) .initialTimeout(timeout) .idleTimeout(timeout) .runWith(StreamRefs.sourceRef(), materializer); getSender().tell(sourceRef, getSelf()); } + + private void serviceUnbind(final Control serviceUnbind) { + log.info("{}: unsubscribing from pubsub for {} actor", serviceUnbind, ACTOR_NAME); + + final CompletableFuture unsubscribeTask = Patterns.ask(pubSubMediator, + DistPubSubAccess.unsubscribeViaGroup(SudoStreamSnapshots.TYPE, ACTOR_NAME, + getSelf()), SHUTDOWN_ASK_TIMEOUT) + .toCompletableFuture() + .thenApply(ack -> { + log.info("{} unsubscribed successfully from pubsub for {} actor", ACTOR_NAME); + return Done.getInstance(); + }); + + Patterns.pipe(unsubscribeTask, getContext().getDispatcher()).to(getSender()); + } + + private void serviceRequestsDone(final Control serviceRequestsDone) { + log.info("Abort streaming of snapshots because of graceful shutdown."); + killSwitch.abort(new IllegalStateException("")); + getSender().tell(Done.getInstance(), getSelf()); + } + + private void handleSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) { + log.info("Successfully subscribed to distributed pub/sub on topic <{}> for group <{}>.", + subscribeAck.subscribe().topic(), subscribeAck.subscribe().group()); + } + + enum Control { + SERVICE_UNBIND, + SERVICE_REQUESTS_DONE + } + } diff --git a/internal/utils/persistence/src/test/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActorTest.java b/internal/utils/persistence/src/test/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActorTest.java index 09e6ccf4ec..2b4da6e8e9 100644 --- a/internal/utils/persistence/src/test/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActorTest.java +++ b/internal/utils/persistence/src/test/java/org/eclipse/ditto/internal/utils/persistence/mongo/SnapshotStreamingActorTest.java @@ -36,13 +36,16 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import akka.Done; import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.cluster.pubsub.DistributedPubSubMediator; import akka.stream.SourceRef; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import akka.testkit.TestProbe; import akka.testkit.javadsl.TestKit; /** @@ -54,6 +57,7 @@ public final class SnapshotStreamingActorTest { private ActorSystem actorSystem; private DittoMongoClient mockClient; private MongoReadJournal mockReadJournal; + private TestProbe pubSubMediatorTestProbe; @Before public void initActorSystem() { @@ -61,7 +65,7 @@ public void initActorSystem() { actorSystem = ActorSystem.create("AkkaTestSystem", config); mockClient = Mockito.mock(DittoMongoClient.class); mockReadJournal = Mockito.mock(MongoReadJournal.class); - + pubSubMediatorTestProbe = TestProbe.apply("pubSubMediator", actorSystem); } @After @@ -93,21 +97,48 @@ public void streamEmptySnapshotCollection() { @Test public void streamNonemptySnapshotCollection() { - streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE), SnapshotFilter.of("","")); + streamNonemptySnapshotCollection( + SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE), + SnapshotFilter.of("", "")); } @Test public void streamNonemptyFilteredSnapshotCollection() { - streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE).withNamespacesFilter(List.of("eclipse", "ditto")), - SnapshotFilter.of("","^thing:(eclipse|ditto):.*")); + streamNonemptySnapshotCollection( + SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE) + .withNamespacesFilter(List.of("eclipse", "ditto")), + SnapshotFilter.of("", "^thing:(eclipse|ditto):.*")); } + @Test public void streamNonemptySnapshotCollectionFromLowerBound() { - streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE).withLowerBound(EntityId.of(THING_TYPE, "snap:1")), + streamNonemptySnapshotCollection( + SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE) + .withLowerBound(EntityId.of(THING_TYPE, "snap:1")), SnapshotFilter.of("thing:snap:1", "")); } - private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStreamSnapshots, final SnapshotFilter expectedFilter) { + @Test + public void testServiceUnbindAndServiceRequestsDone() { + new TestKit(actorSystem) {{ + final ActorRef underTest = createSnapshotStreamingActor(); + pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Subscribe.class); + + underTest.tell(SnapshotStreamingActor.Control.SERVICE_UNBIND, getRef()); + + final var unsub1 = + pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Unsubscribe.class); + pubSubMediatorTestProbe.reply(new DistributedPubSubMediator.UnsubscribeAck(unsub1)); + expectMsg(Done.getInstance()); + + underTest.tell(SnapshotStreamingActor.Control.SERVICE_REQUESTS_DONE, getRef()); + + expectMsg(Done.getInstance()); + }}; + } + + private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStreamSnapshots, + final SnapshotFilter expectedFilter) { new TestKit(actorSystem) {{ final ActorRef underTest = createSnapshotStreamingActor(); @@ -145,6 +176,7 @@ private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStre } + private void setSnapshotStore(final Source mockSource) { Mockito.when(mockReadJournal.getNewestSnapshotsAbove(any(SnapshotFilter.class), anyInt(), any(), any())) .thenReturn(mockSource); @@ -161,8 +193,10 @@ private ActorRef createSnapshotStreamingActor() { pid.substring(pid.indexOf(':') + 1)), entityId -> THING_TYPE + ":" + entityId.toString(), mockClient, - mockReadJournal + mockReadJournal, + pubSubMediatorTestProbe.ref() ); + return actorSystem.actorOf(props); } } diff --git a/internal/utils/persistence/src/test/resources/test.conf b/internal/utils/persistence/src/test/resources/test.conf index 28b2d1982d..a975197c7f 100644 --- a/internal/utils/persistence/src/test/resources/test.conf +++ b/internal/utils/persistence/src/test/resources/test.conf @@ -30,6 +30,10 @@ akka { log-config-on-start = off + # disable coordinated shutdown for tests + coordinated-shutdown.terminate-actor-system = off + coordinated-shutdown.run-by-actor-system-terminate = off + actor { provider = "akka.cluster.ClusterActorRefProvider" enable-additional-serialization-bindings = on diff --git a/internal/utils/persistent-actors/src/test/resources/test.conf b/internal/utils/persistent-actors/src/test/resources/test.conf index e6e2866966..b20f018872 100644 --- a/internal/utils/persistent-actors/src/test/resources/test.conf +++ b/internal/utils/persistent-actors/src/test/resources/test.conf @@ -7,6 +7,7 @@ akka { # for log messages during the actor system is starting up and shutting down: stdout-loglevel = "INFO" + # disable coordinated shutdown for tests coordinated-shutdown.terminate-actor-system = off coordinated-shutdown.run-by-actor-system-terminate = off } diff --git a/policies/service/src/test/resources/test.conf b/policies/service/src/test/resources/test.conf index 7bb2d0272b..61cfef9f55 100755 --- a/policies/service/src/test/resources/test.conf +++ b/policies/service/src/test/resources/test.conf @@ -36,8 +36,9 @@ akka { log-config-on-start = off - akka.coordinated-shutdown.terminate-actor-system = off - akka.coordinated-shutdown.run-by-actor-system-terminate = off + # disable coordinated shutdown for tests + coordinated-shutdown.terminate-actor-system = off + coordinated-shutdown.run-by-actor-system-terminate = off actor.provider = cluster actor { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java index bcca85d824..5536c158a2 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java @@ -135,7 +135,6 @@ private ThingsRootActor(final ThingsConfig thingsConfig, startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps); pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf()); - pubSubMediator.tell(DistPubSubAccess.put(snapshotStreamingActor), getSelf()); bindHttpStatusRoute(thingsConfig.getHttpConfig(), healthCheckingActor); diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java index 141be15d97..68c768041f 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java @@ -347,4 +347,5 @@ private record ProgressReport(ThingId thingId, boolean persisted) {} private enum Control { BOOKMARK_THING_ID } + } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java index 6c4295fbb2..e993942682 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java @@ -25,7 +25,6 @@ import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.policies.model.PolicyId; import org.eclipse.ditto.policies.model.PolicyIdInvalidException; -import org.eclipse.ditto.things.api.ThingsMessagingConstants; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingConstants; import org.eclipse.ditto.things.model.ThingId; @@ -86,7 +85,8 @@ private Object getStartStreamCommand(final ThingId lowerBound, final List, NotUsed> requestStream(final ThingId lowerBound, final List namespaceFilter) {