Skip to content

Commit

Permalink
extend AbstractPersistenceOperationsActor with getActorName method;
Browse files Browse the repository at this point in the history
add graceful shutdown behaviour to AbstractPersistenceOperationsActor;
add test for shutdown behaviour to PolicyPersistenceOperationsActorIT;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 20, 2022
1 parent 717412c commit 88f44cf
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public static Props props(final ActorRef pubSubMediator,

return Props.create(ConnectionPersistenceOperationsActor.class, () -> {
final MongoEventSourceSettings eventSourceSettings =
MongoEventSourceSettings.fromConfig(config, ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX, false,
ConnectionPersistenceActor.JOURNAL_PLUGIN_ID, ConnectionPersistenceActor.SNAPSHOT_PLUGIN_ID);
MongoEventSourceSettings.fromConfig(config, ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX,
false, ConnectionPersistenceActor.JOURNAL_PLUGIN_ID,
ConnectionPersistenceActor.SNAPSHOT_PLUGIN_ID);

final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
final MongoDatabase db = mongoClient.getDefaultDatabase();
Expand All @@ -77,4 +78,9 @@ public static Props props(final ActorRef pubSubMediator,
});
}

@Override
public String getActorName() {
return ACTOR_NAME;
}

}
6 changes: 6 additions & 0 deletions internal/utils/persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
<artifactId>ditto-things-model</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,30 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReason;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespaceResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.CoordinatedShutdown;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;

Expand All @@ -57,15 +61,23 @@ public abstract class AbstractPersistenceOperationsActor extends AbstractActor {
*/
protected final ThreadSafeDittoLoggingAdapter logger;

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough but ultimately does not
* matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

private final ActorRef pubSubMediator;
private final EntityType entityType;
@Nullable private final NamespacePersistenceOperations namespaceOps;
@Nullable private final EntityPersistenceOperations entitiesOps;
private final Materializer materializer;
private final Collection<Closeable> toCloseWhenStopped;

private final Duration delayAfterPersistenceActorShutdown;

private int ongoingRequests = 0;
@Nullable private ActorRef shutdownReceiver = null;

private AbstractPersistenceOperationsActor(final ActorRef pubSubMediator,
final EntityType entityType,
@Nullable final NamespacePersistenceOperations namespaceOps,
Expand Down Expand Up @@ -122,6 +134,7 @@ private static List<Closeable> toList(final Closeable toCloseWhenStopped,
final List<Closeable> closeables = new ArrayList<>(1 + optionalToCloseWhenStopped.length);
closeables.add(toCloseWhenStopped);
Collections.addAll(closeables, optionalToCloseWhenStopped);

return closeables;
}

Expand Down Expand Up @@ -152,10 +165,25 @@ protected AbstractPersistenceOperationsActor(final ActorRef pubSubMediator,
Collections.emptyList());
}

protected abstract String getActorName();

@Override
public void preStart() {
subscribeForNamespaceCommands();
subscribeForEntitiesCommands();

final var self = getSelf();
final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
final var serviceUnbindTask = "service-unbind-" + getActorName() ;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind(), serviceUnbindTask,
() -> Patterns.ask(getSelf(), Control.SERVICE_UNBIND, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
final var serviceRequestsDoneTask = "service-requests-done-" + getActorName();
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
() -> Patterns.ask(self, Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
}

@Override
Expand Down Expand Up @@ -201,8 +229,11 @@ public Receive createReceive() {
return ReceiveBuilder.create()
.match(PurgeNamespace.class, this::purgeNamespace)
.match(PurgeEntities.class, this::purgeEntities)
.matchEquals(Control.OP_COMPLETE, this::opComplete)
.matchEquals(Control.SERVICE_UNBIND, this::serviceUnbind)
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck)
.matchAny(message -> logger.warning("unhandled: <{}>", message))
.matchAny(message -> logger.warning("Unhandled message: <{}>", message))
.build();
}

Expand All @@ -213,6 +244,7 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) {
return;
}

incrementOpCounter();
l.info("Running <{}>.", purgeNamespace);
final String namespace = purgeNamespace.getNamespace();
final ActorRef sender = getSender();
Expand All @@ -222,6 +254,7 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) {
.thenAccept(errors -> {
final PurgeNamespaceResponse response;
if (errors.isEmpty()) {
l.info("Successfully purged namespace <{}>.", namespace);
response = PurgeNamespaceResponse.successful(namespace, entityType,
purgeNamespace.getDittoHeaders());
} else {
Expand All @@ -230,12 +263,13 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) {
purgeNamespace.getDittoHeaders());
}
sender.tell(response, getSelf());
l.info("Successfully purged namespace <{}>.", namespace);
getSelf().tell(Control.OP_COMPLETE, ActorRef.noSender());
})
.exceptionally(error -> {
// Reply nothing - Error should not occur (DB errors were converted to stream elements and handled)
l.error(error, "Unexpected error when purging namespace <{}>!",
purgeNamespace.getNamespace());
purgeNamespace.getNamespace());
getSelf().tell(Control.OP_COMPLETE, ActorRef.noSender());
return null;
});
}
Expand All @@ -251,6 +285,7 @@ private void purgeEntities(final PurgeEntities purgeEntities) {
return;
}

incrementOpCounter();
shutDownPersistenceActorsOfEntitiesToPurge(purgeEntities);
schedulePurgingEntitiesIn(delayAfterPersistenceActorShutdown, purgeEntities);
}
Expand Down Expand Up @@ -285,24 +320,81 @@ private void doPurgeEntities(final PurgeEntities purgeEntities, final ActorRef i
.thenAccept(errors -> {
final PurgeEntitiesResponse response;
if (errors.isEmpty()) {
l.info("Successfully purged entities of type <{}>: <{}>", purgeEntityType, entityIds);
response = PurgeEntitiesResponse.successful(purgeEntityType, purgeEntities.getDittoHeaders());
} else {
errors.forEach(error -> l.error(error, "Error purging entities of type <{}>: <{}>",
purgeEntityType, entityIds));
response = PurgeEntitiesResponse.failed(purgeEntityType, purgeEntities.getDittoHeaders());
}
initiator.tell(response, getSelf());
l.info("Successfully purged entities of type <{}>: <{}>", purgeEntityType, entityIds);
getSelf().tell(Control.OP_COMPLETE, ActorRef.noSender());
})
.exceptionally(error -> {
// Reply nothing - Error should not occur (DB errors were converted to stream elements and handled)
l.error(error, "Unexpected error when purging entities <{}>!", purgeEntities.getEntityIds());
getSelf().tell(Control.OP_COMPLETE, ActorRef.noSender());
return null;
});
}

private void serviceUnbind(final Control serviceUnbind) {
logger.info("{}: unsubscribing from pubsub for {} actor", serviceUnbind, getActorName());

final ActorRef self = getSelf();
final CompletableFuture<Done> unsubscribeTask = CompletableFuture.allOf(
Patterns.ask(pubSubMediator,
DistPubSubAccess.unsubscribeViaGroup(PurgeEntities.getTopic(entityType),
getSubscribeGroup(), self),
SHUTDOWN_ASK_TIMEOUT)
.toCompletableFuture(),
Patterns.ask(pubSubMediator,
DistPubSubAccess.unsubscribeViaGroup(PurgeNamespace.TYPE, getSubscribeGroup(),
self), SHUTDOWN_ASK_TIMEOUT)
.toCompletableFuture())
.thenApply(ack -> {
logger.info("{} unsubscribed successfully from pubsub for {} actor", getActorName());
return Done.getInstance();
});

Patterns.pipe(unsubscribeTask, getContext().getDispatcher()).to(getSender());
}


private void incrementOpCounter() {
++ongoingRequests;
}

private void decrementOpCounter() {
--ongoingRequests;
}

private void opComplete(final Control opComplete) {
decrementOpCounter();
if (shutdownReceiver != null && ongoingRequests == 0) {
logger.info("{}: finished waiting for requests", Control.SERVICE_REQUESTS_DONE);
shutdownReceiver.tell(Done.getInstance(), getSelf());
}
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
if (ongoingRequests == 0) {
logger.info("{}: no ongoing requests", serviceRequestsDone);
getSender().tell(Done.getInstance(), getSelf());
} else {
logger.info("{}: waiting for {} ongoing requests", serviceRequestsDone, ongoingRequests);
shutdownReceiver = getSender();
}
}

private void handleSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) {
logger.debug("Got subscribeAck <{}>.", subscribeAck);
}

public enum Control {
SERVICE_UNBIND,
OP_COMPLETE,
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ default Source<List<Throwable>, NotUsed> purgeEntities(final Collection<EntityId

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespaceResponse;
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.operations.PersistenceOperationsConfig;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespaceResponse;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.mockito.Mockito;

import com.typesafe.config.Config;
Expand All @@ -55,18 +56,21 @@
*/
public abstract class MongoEventSourceITAssertions<I extends EntityId> {

@ClassRule
public static final MongoDbResource MONGO_RESOURCE = new MongoDbResource();

@Rule
public final ActorSystemResource actorSystemResource =
ActorSystemResource.newInstance(getEventSourcingConfiguration());

private static final Duration EXPECT_MESSAGE_TIMEOUT = Duration.ofSeconds(30);
private static final Random RANDOM = new Random();

protected static MongoDbConfig mongoDbConfig;

@ClassRule
public static final MongoDbResource MONGO_RESOURCE = new MongoDbResource();
protected static String mongoDbUri;
protected static PersistenceOperationsConfig persistenceOperationsConfig;

private ActorSystem actorSystem;

@BeforeClass
public static void startMongoDb() {
mongoDbUri = String.format("mongodb://%s:%s/test", MONGO_RESOURCE.getBindIp(), MONGO_RESOURCE.getPort());
Expand All @@ -83,13 +87,6 @@ private static Config getConfig() {
return mongoDbTestConfig;
}

@After
public void shutDownActorSystem() {
if (actorSystem != null) {
TestKit.shutdownActorSystem(actorSystem);
}
}

protected void assertPurgeNamespace() {
purgeNamespace(getEventSourcingConfiguration());
}
Expand Down Expand Up @@ -175,14 +172,16 @@ protected final List<String> getSupportedPrefixes() {
*
* @return config to feed the actor system and its actors.
*/
private Config getEventSourcingConfiguration() {
// - do not log dead letters (i. e., events for which there is no subscriber)
protected Config getEventSourcingConfiguration() {
// - do not log dead letters (i.e., events for which there is no subscriber)
// - bind to random available port
// - do not attempt to join an Akka cluster
// - do not shutdown jvm on exit (breaks unit tests)
// - make Mongo URI known to the persistence plugin and to the NamespaceOps actor
final String testConfig = "akka.log-dead-letters=0\n" +
"akka.persistence.journal-plugin-fallback.circuit-breaker.call-timeout=30s\n" +
"akka.coordinated-shutdown.terminate-actor-system=off\n" +
"akka.coordinated-shutdown.run-by-actor-system-terminate=off\n" +
"akka-contrib-mongodb-persistence-policies-journal.circuit-breaker.call-timeout=30s\n" +
"akka-contrib-mongodb-persistence-things-journal.circuit-breaker.call-timeout=30s\n" +
"akka.remote.artery.bind.port=0\n" +
Expand All @@ -196,7 +195,7 @@ private Config getEventSourcingConfiguration() {
}

private void purgeNamespace(final Config config) {
actorSystem = startActorSystem(config);
final var actorSystem = actorSystemResource.getActorSystem();
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.correlationId(String.valueOf(UUID.randomUUID()))
.build();
Expand Down Expand Up @@ -242,7 +241,7 @@ private void purgeNamespace(final Config config) {
}

private void purgeEntities(final Config config, final boolean prependNamespace) {
actorSystem = startActorSystem(config);
final var actorSystem = actorSystemResource.getActorSystem();
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.correlationId(String.valueOf(UUID.randomUUID()))
.build();
Expand Down Expand Up @@ -311,11 +310,6 @@ private void expectCreateEntityResponse(final TestKit testKit) {
testKit.expectMsgClass(EXPECT_MESSAGE_TIMEOUT, getCreateEntityResponseClass());
}

private ActorSystem startActorSystem(final Config config) {
final String name = getClass().getSimpleName() + '-' + UUID.randomUUID().toString();
return ActorSystem.create(name, config);
}

private static String prependNamespace(final String id, final String ns, final boolean prepend) {
if (prepend) {
return ns + ':' + id;
Expand All @@ -324,7 +318,7 @@ private static String prependNamespace(final String id, final String ns, final b
}
}

private static void sleep(final Duration duration) {
protected static void sleep(final Duration duration) {
try {
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
} catch (final Exception e) {
Expand Down
Loading

0 comments on commit 88f44cf

Please sign in to comment.