Skip to content
Permalink
Browse files

Add a second shutdown reason that is added to a Shutdown emitted on P…

…urgeEntities

* Shutdown is no published in AbstractPersistenceOperationsActor in order to stop
  persistence actors of entities that are going to be purged. This is done
  analogously to namespace purging in DeleteNamespaceOrchestrator.

Signed-off-by: Klem Yannic (INST/ECS1) <yannic.klem@bosch-si.com>
  • Loading branch information
Yannic92 committed May 24, 2019
1 parent 2403923 commit 5b6fe7396ef1033b63486fe09e02291799936e05
Showing with 604 additions and 421 deletions.
  1. +2 −0 pom.xml
  2. +92 −0 services/base/src/main/java/org/eclipse/ditto/services/base/actors/ShutdownBehaviour.java
  3. +0 −89 services/base/src/main/java/org/eclipse/ditto/services/base/actors/ShutdownNamespaceBehavior.java
  4. +19 −4 ...lipse/ditto/services/connectivity/messaging/persistence/ConnectionPersistenceOperationsActor.java
  5. +4 −0 services/connectivity/starter/src/main/resources/connectivity.conf
  6. +24 −5 ...g/eclipse/ditto/services/policies/persistence/actors/policy/PolicyPersistenceOperationsActor.java
  7. +5 −5 ...ain/java/org/eclipse/ditto/services/policies/persistence/actors/policy/PolicySupervisorActor.java
  8. +3 −0 services/policies/starter/src/main/resources/policies.conf
  9. +19 −4 ...in/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceOperationsActor.java
  10. +4 −4 ...ence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingSupervisorActor.java
  11. +3 −0 services/things/starter/src/main/resources/things.conf
  12. +3 −0 services/thingsearch/starter/src/main/resources/things-search.conf
  13. +1 −1 ...s/src/main/java/org/eclipse/ditto/services/thingsearch/updater/actors/SearchUpdaterRootActor.java
  14. +5 −6 ...ater-actors/src/main/java/org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater.java
  15. +28 −4 ...org/eclipse/ditto/services/thingsearch/updater/actors/ThingsSearchPersistenceOperationsActor.java
  16. +47 −5 ...va/org/eclipse/ditto/services/utils/persistence/mongo/ops/AbstractPersistenceOperationsActor.java
  17. +45 −0 ...va/org/eclipse/ditto/services/utils/persistence/mongo/ops/PersistenceOperationsConfiguration.java
  18. +1 −1 .../eclipse/ditto/services/utils/persistence/mongo/ops/eventsource/MongoEventSourceITAssertions.java
  19. +46 −49 ...in/java/org/eclipse/ditto/signals/commands/common/{GenericReason.java → PurgeEntitiesReason.java}
  20. +30 −33 ...commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/PurgeNamespaceReason.java
  21. +9 −16 signals/commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownReason.java
  22. +20 −31 ...ommands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownReasonFactory.java
  23. +10 −5 ...s/commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownReasonType.java
  24. +0 −118 ...ls/commands/common/src/test/java/org/eclipse/ditto/signals/commands/common/GenericReasonTest.java
  25. +130 −0 ...mands/common/src/test/java/org/eclipse/ditto/signals/commands/common/PurgeEntitiesReasonTest.java
  26. +31 −12 ...ands/common/src/test/java/org/eclipse/ditto/signals/commands/common/PurgeNamespaceReasonTest.java
  27. +23 −29 ...nds/common/src/test/java/org/eclipse/ditto/signals/commands/common/ShutdownReasonFactoryTest.java
@@ -399,6 +399,8 @@
<exclude>org.eclipse.ditto.signals.base.DittoJsonRegistry</exclude>
<exclude>org.eclipse.ditto.signals.base.GlobalErrorRegistry</exclude>
<exclude>org.eclipse.ditto.signals.base.AbstractErrorRegistry</exclude>
<exclude>org.eclipse.ditto.signals.commands.common.ShutdownReason</exclude>
<exclude>org.eclipse.ditto.signals.commands.common.ShutdownReasonFactory</exclude>
</excludes>
</parameter>
</configuration>
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.base.actors;

import static org.eclipse.ditto.model.base.common.ConditionChecker.argumentNotEmpty;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.model.namespaces.NamespaceReader;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownReason;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;

/**
* Responsible for shutting down the given actor in case a shutdown command contains a reason that is applicable for
* the information hold by this behaviour.
*/
public final class ShutdownBehaviour {

private final String namespace;
private final String entityId;

private final ActorRef self;

private ShutdownBehaviour(final String namespace, final String entityId, final ActorRef self) {
this.namespace = namespace;
this.entityId = entityId;
this.self = self;
}

/**
* Create the actor behavior from its entity ID and reference.
*
* @param entityId entity ID to react to.
* @param pubSubMediator Akka pub-sub mediator.
* @param self reference of the actor itself.
* @return the actor behavior.
*/
public static ShutdownBehaviour fromId(final String entityId, final ActorRef pubSubMediator,
final ActorRef self) {

argumentNotEmpty(entityId, "Entity ID");
checkNotNull(self, "Self");

final String namespace = NamespaceReader.fromEntityId(entityId).orElse("");

final ShutdownBehaviour purgeEntitiesBehaviour = new ShutdownBehaviour(namespace, entityId, self);

purgeEntitiesBehaviour.subscribePubSub(checkNotNull(pubSubMediator, "Pub-Sub-Mediator"));
return purgeEntitiesBehaviour;
}

private void subscribePubSub(final ActorRef pubSubMediator) {
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(Shutdown.TYPE, self), self);
}

/**
* Create a new receive builder matching on messages handled by this actor.
*
* @return new receive builder.
*/
public ReceiveBuilder createReceive() {
return ReceiveBuilder.create()
.match(Shutdown.class, this::shutdown)
.match(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck);
}

private void shutdown(final Shutdown shutdown) {
final ShutdownReason shutdownReason = shutdown.getReason();

if(shutdownReason.isRelevantFor(namespace) || shutdownReason.isRelevantFor(entityId)) {
self.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
}

private void subscribeAck(final DistributedPubSubMediator.SubscribeAck ack) {
// do nothing
}
}

This file was deleted.

@@ -18,6 +18,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.AbstractPersistenceOperationsActor;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.EntityPersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.PersistenceOperationsConfiguration;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEntitiesPersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEventSourceSettings;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
@@ -37,10 +38,17 @@

private ConnectionPersistenceOperationsActor(final ActorRef pubSubMediator,
final EntityPersistenceOperations entitiesOps,
final MongoClientWrapper mongoClientWrapper) {
final MongoClientWrapper mongoClientWrapper,
final PersistenceOperationsConfiguration persistenceOperationsConfiguration) {

super(pubSubMediator, ConnectivityCommand.RESOURCE_TYPE, null, entitiesOps,
Collections.singleton(mongoClientWrapper));
super(
pubSubMediator,
ConnectivityCommand.RESOURCE_TYPE,
null,
entitiesOps,
Collections.singleton(mongoClientWrapper),
persistenceOperationsConfiguration
);
}

/**
@@ -60,8 +68,15 @@ public static Props props(final ActorRef pubSubMediator, final Config config) {
final MongoDatabase db = mongoClient.getDefaultDatabase();

final EntityPersistenceOperations entitiesOps = MongoEntitiesPersistenceOperations.of(db, eventSourceSettings);
final PersistenceOperationsConfiguration persistenceOperationsConfiguration =
PersistenceOperationsConfiguration.fromConfig(config);

return new ConnectionPersistenceOperationsActor(pubSubMediator, entitiesOps, mongoClient);
return new ConnectionPersistenceOperationsActor(
pubSubMediator,
entitiesOps,
mongoClient,
persistenceOperationsConfiguration
);
});
}

@@ -1,4 +1,8 @@
ditto {

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

connectivity {
http {
# InetAddress.getLocalHost.getHostAddress is used if empty
@@ -18,6 +18,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.ops.AbstractPersistenceOperationsActor;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.EntityPersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.NamespacePersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.PersistenceOperationsConfiguration;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEntitiesPersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEventSourceSettings;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoNamespacePersistenceOperations;
@@ -38,10 +39,20 @@

public static final String ACTOR_NAME = "policyOps";

private PolicyPersistenceOperationsActor(final ActorRef pubSubMediator, final NamespacePersistenceOperations namespaceOps,
final EntityPersistenceOperations entitiesOps, final MongoClientWrapper mongoClient) {
super(pubSubMediator, PolicyCommand.RESOURCE_TYPE, namespaceOps, entitiesOps,
Collections.singleton(mongoClient));
private PolicyPersistenceOperationsActor(final ActorRef pubSubMediator,
final NamespacePersistenceOperations namespaceOps,
final EntityPersistenceOperations entitiesOps,
final MongoClientWrapper mongoClient,
final PersistenceOperationsConfiguration persistenceOperationsConfiguration) {

super(
pubSubMediator,
PolicyCommand.RESOURCE_TYPE,
namespaceOps,
entitiesOps,
Collections.singleton(mongoClient),
persistenceOperationsConfiguration
);
}

/**
@@ -62,8 +73,16 @@ public static Props props(final ActorRef pubSubMediator, final Config config) {

final NamespacePersistenceOperations namespaceOps = MongoNamespacePersistenceOperations.of(db, eventSourceSettings);
final EntityPersistenceOperations entitiesOps = MongoEntitiesPersistenceOperations.of(db, eventSourceSettings);
final PersistenceOperationsConfiguration persistenceOperationsConfiguration =
PersistenceOperationsConfiguration.fromConfig(config);

return new PolicyPersistenceOperationsActor(pubSubMediator, namespaceOps, entitiesOps, mongoClient);
return new PolicyPersistenceOperationsActor(
pubSubMediator,
namespaceOps,
entitiesOps,
mongoClient,
persistenceOperationsConfiguration
);
});
}

@@ -26,7 +26,7 @@

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.policies.Policy;
import org.eclipse.ditto.services.base.actors.ShutdownNamespaceBehavior;
import org.eclipse.ditto.services.base.actors.ShutdownBehaviour;
import org.eclipse.ditto.services.policies.persistence.actors.AbstractReceiveStrategy;
import org.eclipse.ditto.services.policies.persistence.actors.ReceiveStrategy;
import org.eclipse.ditto.services.policies.persistence.actors.StrategyAwareReceiveBuilder;
@@ -54,7 +54,7 @@
* Between the termination of the child and the restart, this actor answers to all requests with a {@link
* PolicyUnavailableException} as fail fast strategy.
*/
public class PolicySupervisorActor extends AbstractActor {
public final class PolicySupervisorActor extends AbstractActor {

private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

@@ -64,7 +64,7 @@
private final Duration maxBackoff;
private final double randomFactor;
private final SupervisorStrategy supervisorStrategy;
private final ShutdownNamespaceBehavior shutdownNamespaceBehavior;
private final ShutdownBehaviour shutdownBehaviour;

private ActorRef child;
private long restartCount;
@@ -87,7 +87,7 @@ private PolicySupervisorActor(final ActorRef pubSubMediator,
this.randomFactor = randomFactor;
this.supervisorStrategy = supervisorStrategy;

shutdownNamespaceBehavior = ShutdownNamespaceBehavior.fromId(policyId, pubSubMediator, getSelf());
shutdownBehaviour = ShutdownBehaviour.fromId(policyId, pubSubMediator, getSelf());
}

/**
@@ -154,7 +154,7 @@ public Receive createReceive() {
receiveStrategies.forEach(strategyAwareReceiveBuilder::match);
strategyAwareReceiveBuilder.matchAny(new MatchAnyStrategy());

return shutdownNamespaceBehavior.createReceive().build().orElse(strategyAwareReceiveBuilder.build());
return shutdownBehaviour.createReceive().build().orElse(strategyAwareReceiveBuilder.build());
}

private Optional<ActorRef> getChild() {
@@ -2,6 +2,9 @@ ditto {
mapping-strategy.implementation = "org.eclipse.ditto.services.models.policies.PoliciesMappingStrategies"
cluster-downing.role = "policies"

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

policies {

tags {
@@ -17,6 +17,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.AbstractPersistenceOperationsActor;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.NamespacePersistenceOperations;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.PersistenceOperationsConfiguration;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEventSourceSettings;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoNamespacePersistenceOperations;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
@@ -38,10 +39,17 @@

private ThingPersistenceOperationsActor(final ActorRef pubSubMediator,
final NamespacePersistenceOperations namespaceOps,
final MongoClientWrapper mongoClientWrapper) {
final MongoClientWrapper mongoClientWrapper,
final PersistenceOperationsConfiguration persistenceOperationsConfiguration) {

super(pubSubMediator, ThingCommand.RESOURCE_TYPE, namespaceOps, null,
Collections.singleton(mongoClientWrapper));
super(
pubSubMediator,
ThingCommand.RESOURCE_TYPE,
namespaceOps,
null,
Collections.singleton(mongoClientWrapper),
persistenceOperationsConfiguration
);
}

/**
@@ -61,8 +69,15 @@ public static Props props(final ActorRef pubSubMediator, final Config config) {
final MongoDatabase db = mongoClient.getDefaultDatabase();

final NamespacePersistenceOperations namespaceOps = MongoNamespacePersistenceOperations.of(db, eventSourceSettings);
final PersistenceOperationsConfiguration persistenceOperationsConfiguration =
PersistenceOperationsConfiguration.fromConfig(config);

return new ThingPersistenceOperationsActor(pubSubMediator, namespaceOps, mongoClient);
return new ThingPersistenceOperationsActor(
pubSubMediator,
namespaceOps,
mongoClient,
persistenceOperationsConfiguration
);
});
}

0 comments on commit 5b6fe73

Please sign in to comment.
You can’t perform that action at this time.