Skip to content

Commit

Permalink
Replace Ditto pubsub by direct shard messaging for search update.
Browse files Browse the repository at this point in the history
Reason: Ditto pubsub reacts too slowly for coordinated shutdown.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 21, 2022
1 parent 87ca5dc commit 909ebea
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 228 deletions.
4 changes: 4 additions & 0 deletions things/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-thingsearch-api</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -137,9 +138,9 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
ThingsMessagingConstants.CLUSTER_ROLE,
ThingsMessagingConstants.SHARD_REGION
);
// TODO
searchShardRegionProxy =
shardRegionProxyActorFactory.getShardRegionProxyActor("search", "search-wildcard-updater");
shardRegionProxyActorFactory.getShardRegionProxyActor(ThingsSearchConstants.CLUSTER_ROLE,
ThingsSearchConstants.SHARD_REGION);

try {
inlinePolicyEnrichment = new SupervisorInlinePolicyEnrichment(getContext().getSystem(), log, getEntityId(),
Expand Down
4 changes: 0 additions & 4 deletions thingsearch/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-cache-loaders</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-pubsub-things</artifactId>
</dependency>

<!-- logstash appender logging -->
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.ThingsSearchUpdaterPersistence;

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.actor.Status;
import akka.event.DiagnosticLoggingAdapter;
Expand All @@ -58,6 +60,7 @@ final class PolicyModificationForwarder extends AbstractActor {

private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef pubSubMediator;
private final ActorRef thingsUpdater;
private final ThingsSearchUpdaterPersistence persistence;
private final BlockNamespaceBehavior blockNamespaceBehavior;
Expand All @@ -72,6 +75,7 @@ private PolicyModificationForwarder(final ActorRef pubSubMediator,
final BlockedNamespaces blockedNamespaces,
final ThingsSearchUpdaterPersistence persistence) {

this.pubSubMediator = pubSubMediator;
this.thingsUpdater = thingsUpdater;
this.persistence = persistence;
blockNamespaceBehavior = BlockNamespaceBehavior.of(blockedNamespaces);
Expand All @@ -97,13 +101,24 @@ public static Props props(final ActorRef pubSubMediator,
final BlockedNamespaces blockedNamespaces,
final ThingsSearchUpdaterPersistence persistence) {

return Props.create(PolicyModificationForwarder.class, pubSubMediator, thingsUpdater, blockedNamespaces, persistence);
return Props.create(PolicyModificationForwarder.class, pubSubMediator, thingsUpdater, blockedNamespaces,
persistence);
}

@Override
public void postStop() throws Exception {
public void preStart() {
CoordinatedShutdown.get(getContext().getSystem())
.addTask(CoordinatedShutdown.PhaseServiceUnbind(), "service-unbind-" + ACTOR_NAME, () -> {
final var unsub = DistPubSubAccess.unsubscribeViaGroup(PolicyTag.PUB_SUB_TOPIC_MODIFIED,
ACTOR_NAME, getSelf());
final var shutdownAskTimeout = Duration.ofMinutes(1); // does not matter as phase will timeout
return Patterns.ask(pubSubMediator, unsub, shutdownAskTimeout).thenApply(reply -> Done.done());
});
}

@Override
public void postStop() {
terminateStream();
super.postStop();
}

@Override
Expand Down Expand Up @@ -163,7 +178,8 @@ private void dumpPolicyRevisions(final Control trigger) {

private void streamTerminated(final Object streamTerminated) {
if (streamTerminated instanceof Status.Failure failure) {
final String errorMessage = "PolicyModificationForwarder stream terminated (should NEVER happen!), restarting";
final String errorMessage =
"PolicyModificationForwarder stream terminated (should NEVER happen!), restarting";
log.error(failure.cause(), errorMessage);
} else {
log.info("PolicyModificationForwarder stream completed; restarting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsubthings.ThingEventPubSubFactory;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
import org.eclipse.ditto.thingsearch.service.common.util.RootSupervisorStrategyFactory;
Expand Down Expand Up @@ -110,16 +108,10 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());

final var thingEventSub =
ThingEventPubSubFactory.shardIdOnly(getContext(), numberOfShards, DistributedAcks.empty(actorSystem))
.startDistributedSub();
final var thingsUpdaterProps =
ThingsUpdater.props(thingEventSub, updaterShard, updaterConfig, blockedNamespaces,
pubSubMediator);
ThingsUpdater.props(updaterShard, updaterConfig, blockedNamespaces, pubSubMediator);

thingsUpdaterActor = startChildActor(ThingsUpdater.ACTOR_NAME, thingsUpdaterProps);
startClusterSingletonActor(NewEventForwarder.ACTOR_NAME,
NewEventForwarder.props(thingEventSub, updaterShard, blockedNamespaces));

// start policy modification forwarder
startChildActor(PolicyModificationForwarder.ACTOR_NAME, PolicyModificationForwarder.props(
Expand Down
Loading

0 comments on commit 909ebea

Please sign in to comment.