Skip to content

Commit

Permalink
add shutdown behaviour to SnapshotStreamingActor and terminate stream…
Browse files Browse the repository at this point in the history
… in PhaseServiceRequestsDone;

add test for shutdown behaviour to SnapshotStreamingActorTest;
change PubSub mechanism from send to publish for SudoStreamSnapshots commands in ThingsMetadataSource and SnapshotStreamingActor;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 22, 2022
1 parent 83fb8cb commit 352b0ba
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ public interface TimestampPersistence {
* @return the tagged timestamp.
*/
Source<Optional<Pair<Instant, String>>, NotUsed> getTaggedTimestamp();

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,14 @@ public Config setConfig(final Config config) {
if (!previousConfig.isEnabled() && this.config.isEnabled()) {
scheduleWakeUp();
}

return this.config.getConfig();
}

private Receive sleeping() {
final var sleepingReceiveBuilder = ReceiveBuilder.create();
preEnhanceSleepingBehavior(sleepingReceiveBuilder);

return sleepingReceiveBuilder.match(WokeUp.class, this::wokeUp)
.match(Event.class, this::addCustomEventToLog)
.match(RetrieveHealth.class, this::retrieveHealth)
Expand All @@ -195,6 +197,7 @@ private Receive sleeping() {
private Receive streaming() {
final var streamingReceiveBuilder = ReceiveBuilder.create();
preEnhanceStreamingBehavior(streamingReceiveBuilder);

return streamingReceiveBuilder
.match(StreamTerminated.class, this::streamTerminated)
.match(Event.class, this::addCustomEventToLog)
Expand Down Expand Up @@ -291,6 +294,7 @@ private void restartStream() {
log.info(description);
getSelf().tell(StreamTerminated.normally(description), getSelf());
}

return null;
});
}
Expand Down Expand Up @@ -321,6 +325,7 @@ private JsonObject render() {
.set(JsonFields.ENABLED, config.isEnabled())
.set(JsonFields.EVENTS, renderEvents(events));
postEnhanceStatusReport(statusReportBuilder);

return statusReportBuilder.build();
}

Expand Down
4 changes: 4 additions & 0 deletions internal/utils/persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<name>Eclipse Ditto :: Internal :: Utils :: Persistence</name>

<dependencies>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package org.eclipse.ditto.internal.utils.persistence.mongo;

import static org.eclipse.ditto.things.api.ThingsMessagingConstants.THINGS_PERSISTENCE_STREAMING_ACTOR_NAME;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -26,6 +29,7 @@
import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
Expand All @@ -35,38 +39,60 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.SharedKillSwitch;
import akka.stream.SourceRef;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamRefs;


/**
* An actor that streams from the snapshot store of a service with Mongo persistence plugin on request.
*/
@AllValuesAreNonnullByDefault
public final class SnapshotStreamingActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough but ultimately does not
* matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

private static final String ACTOR_NAME = THINGS_PERSISTENCE_STREAMING_ACTOR_NAME;

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final Materializer materializer = Materializer.createMaterializer(this::getContext);
private final SharedKillSwitch killSwitch = KillSwitches.shared(ACTOR_NAME);

private final Function<String, EntityId> pid2EntityId;
private final Function<EntityId, String> entityId2Pid;
private final DittoMongoClient mongoClient;
private final MongoReadJournal readJournal;
private final ActorRef pubSubMediator;


@SuppressWarnings("unused") // called by reflection
private SnapshotStreamingActor(final Function<String, EntityId> pid2EntityId,
final Function<EntityId, String> entityId2Pid,
final DittoMongoClient mongoClient,
final MongoReadJournal readJournal) {
final MongoReadJournal readJournal,
final ActorRef pubSubMediator) {
this.pid2EntityId = pid2EntityId;
this.entityId2Pid = entityId2Pid;
this.mongoClient = mongoClient;
this.readJournal = readJournal;
this.pubSubMediator = pubSubMediator;
}

@SuppressWarnings("unused") // called by reflection
Expand All @@ -80,6 +106,7 @@ private SnapshotStreamingActor(final Function<String, EntityId> pid2EntityId,
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
pubSubMediator = DistributedPubSub.get(getContext().getSystem()).mediator();
}

/**
Expand All @@ -103,14 +130,38 @@ public static Props props(final Function<String, EntityId> pid2EntityId,
* @param entityId2Pid function mapping entity ID to PID.
* @param mongoClient MongoDB client.
* @param readJournal the read journal.
* @param pubSubMediator the pubSubMediator.
* @return Props for this actor.
*/
public static Props propsForTest(final Function<String, EntityId> pid2EntityId,
final Function<EntityId, String> entityId2Pid,
final DittoMongoClient mongoClient,
final MongoReadJournal readJournal) {
final MongoReadJournal readJournal,
final ActorRef pubSubMediator) {

return Props.create(SnapshotStreamingActor.class, pid2EntityId, entityId2Pid, mongoClient, readJournal);
return Props.create(SnapshotStreamingActor.class, pid2EntityId, entityId2Pid, mongoClient, readJournal,
pubSubMediator);
}

@Override
public void preStart() throws Exception {
super.preStart();

final var self = getSelf();
pubSubMediator.tell(DistPubSubAccess.subscribeViaGroup(SudoStreamSnapshots.TYPE, ACTOR_NAME, self), self);

final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
final var serviceUnbindTask = "service-unbind-" + ACTOR_NAME;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind(), serviceUnbindTask,
() -> Patterns.ask(self, Control.SERVICE_UNBIND, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);

final var serviceRequestsDoneTask = "service-requests-done-" + ACTOR_NAME;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
() -> Patterns.ask(self, Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
}

@Override
Expand All @@ -123,6 +174,9 @@ public void postStop() throws Exception {
public Receive createReceive() {
return ReceiveBuilder.create()
.match(SudoStreamSnapshots.class, this::startStreaming)
.matchEquals(Control.SERVICE_UNBIND, this::serviceUnbind)
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck)
.matchAny(message -> log.warning("Unexpected message: <{}>", message))
.build();
}
Expand All @@ -136,12 +190,14 @@ private Source<StreamedSnapshot, NotUsed> createSource(final SudoStreamSnapshots
materializer,
command.getSnapshotFields().stream().map(JsonValue::asString).toArray(String[]::new)
);

return snapshotSource.map(this::mapSnapshot).log("snapshot-streaming", log);
}

private SnapshotFilter getSnapshotFilterFromCommand(final SudoStreamSnapshots command) {
final String start = command.hasNonEmptyLowerBound() ? entityId2Pid.apply(command.getLowerBound()) : "";
final String pidFilter = FilteredNamespacedEntityId.toPidFilter(command, entityId2Pid);

return SnapshotFilter.of(start, pidFilter);
}

Expand Down Expand Up @@ -178,15 +234,49 @@ private StreamedSnapshot mapSnapshot(final Document snapshot) {
final EntityId entityId = pid2EntityId.apply(snapshot.getString(MongoReadJournal.S_ID));
snapshot.remove(MongoReadJournal.S_ID);
final JsonObject snapshotJson = JsonObject.of(snapshot.toJson());

return StreamedSnapshot.of(entityId, snapshotJson);
}

private void startStreaming(final SudoStreamSnapshots command) {
final Duration timeout = Duration.ofMillis(command.getTimeoutMillis());
final SourceRef<StreamedSnapshot> sourceRef = createSource(command)
.via(killSwitch.flow())
.initialTimeout(timeout)
.idleTimeout(timeout)
.runWith(StreamRefs.sourceRef(), materializer);
getSender().tell(sourceRef, getSelf());
}

private void serviceUnbind(final Control serviceUnbind) {
log.info("{}: unsubscribing from pubsub for {} actor", serviceUnbind, ACTOR_NAME);

final CompletableFuture<Done> unsubscribeTask = Patterns.ask(pubSubMediator,
DistPubSubAccess.unsubscribeViaGroup(SudoStreamSnapshots.TYPE, ACTOR_NAME,
getSelf()), SHUTDOWN_ASK_TIMEOUT)
.toCompletableFuture()
.thenApply(ack -> {
log.info("{} unsubscribed successfully from pubsub for {} actor", ACTOR_NAME);
return Done.getInstance();
});

Patterns.pipe(unsubscribeTask, getContext().getDispatcher()).to(getSender());
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
log.info("Abort streaming of snapshots because of graceful shutdown.");
killSwitch.abort(new IllegalStateException(""));
getSender().tell(Done.getInstance(), getSelf());
}

private void handleSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) {
log.info("Successfully subscribed to distributed pub/sub on topic <{}> for group <{}>.",
subscribeAck.subscribe().topic(), subscribeAck.subscribe().group());
}

enum Control {
SERVICE_UNBIND,
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.stream.SourceRef;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

/**
Expand All @@ -54,14 +57,15 @@ public final class SnapshotStreamingActorTest {
private ActorSystem actorSystem;
private DittoMongoClient mockClient;
private MongoReadJournal mockReadJournal;
private TestProbe pubSubMediatorTestProbe;

@Before
public void initActorSystem() {
final Config config = ConfigFactory.load("test");
actorSystem = ActorSystem.create("AkkaTestSystem", config);
mockClient = Mockito.mock(DittoMongoClient.class);
mockReadJournal = Mockito.mock(MongoReadJournal.class);

pubSubMediatorTestProbe = TestProbe.apply("pubSubMediator", actorSystem);
}

@After
Expand Down Expand Up @@ -93,21 +97,48 @@ public void streamEmptySnapshotCollection() {

@Test
public void streamNonemptySnapshotCollection() {
streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE), SnapshotFilter.of("",""));
streamNonemptySnapshotCollection(
SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE),
SnapshotFilter.of("", ""));
}

@Test
public void streamNonemptyFilteredSnapshotCollection() {
streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE).withNamespacesFilter(List.of("eclipse", "ditto")),
SnapshotFilter.of("","^thing:(eclipse|ditto):.*"));
streamNonemptySnapshotCollection(
SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE)
.withNamespacesFilter(List.of("eclipse", "ditto")),
SnapshotFilter.of("", "^thing:(eclipse|ditto):.*"));
}

@Test
public void streamNonemptySnapshotCollectionFromLowerBound() {
streamNonemptySnapshotCollection(SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE).withLowerBound(EntityId.of(THING_TYPE, "snap:1")),
streamNonemptySnapshotCollection(
SudoStreamSnapshots.of(100, 10_000L, List.of(), DittoHeaders.empty(), THING_TYPE)
.withLowerBound(EntityId.of(THING_TYPE, "snap:1")),
SnapshotFilter.of("thing:snap:1", ""));
}

private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStreamSnapshots, final SnapshotFilter expectedFilter) {
@Test
public void testServiceUnbindAndServiceRequestsDone() {
new TestKit(actorSystem) {{
final ActorRef underTest = createSnapshotStreamingActor();
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Subscribe.class);

underTest.tell(SnapshotStreamingActor.Control.SERVICE_UNBIND, getRef());

final var unsub1 =
pubSubMediatorTestProbe.expectMsgClass(DistributedPubSubMediator.Unsubscribe.class);
pubSubMediatorTestProbe.reply(new DistributedPubSubMediator.UnsubscribeAck(unsub1));
expectMsg(Done.getInstance());

underTest.tell(SnapshotStreamingActor.Control.SERVICE_REQUESTS_DONE, getRef());

expectMsg(Done.getInstance());
}};
}

private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStreamSnapshots,
final SnapshotFilter expectedFilter) {
new TestKit(actorSystem) {{
final ActorRef underTest = createSnapshotStreamingActor();

Expand Down Expand Up @@ -145,6 +176,7 @@ private void streamNonemptySnapshotCollection(final SudoStreamSnapshots sudoStre

}


private void setSnapshotStore(final Source<Document, NotUsed> mockSource) {
Mockito.when(mockReadJournal.getNewestSnapshotsAbove(any(SnapshotFilter.class), anyInt(), any(), any()))
.thenReturn(mockSource);
Expand All @@ -161,8 +193,10 @@ private ActorRef createSnapshotStreamingActor() {
pid.substring(pid.indexOf(':') + 1)),
entityId -> THING_TYPE + ":" + entityId.toString(),
mockClient,
mockReadJournal
mockReadJournal,
pubSubMediatorTestProbe.ref()
);

return actorSystem.actorOf(props);
}
}
4 changes: 4 additions & 0 deletions internal/utils/persistence/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ akka {

log-config-on-start = off

# disable coordinated shutdown for tests
coordinated-shutdown.terminate-actor-system = off
coordinated-shutdown.run-by-actor-system-terminate = off

actor {
provider = "akka.cluster.ClusterActorRefProvider"
enable-additional-serialization-bindings = on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ akka {
# for log messages during the actor system is starting up and shutting down:
stdout-loglevel = "INFO"

# disable coordinated shutdown for tests
coordinated-shutdown.terminate-actor-system = off
coordinated-shutdown.run-by-actor-system-terminate = off
}
Expand Down
Loading

0 comments on commit 352b0ba

Please sign in to comment.