Skip to content

Commit

Permalink
Add hand-off messages to connection and client shard regions.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 6, 2022
1 parent dd63d4c commit 892aa4a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionCreator;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.health.DefaultHealthCheckingActorFactory;
Expand All @@ -50,8 +50,6 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.DeciderBuilder;
import scala.PartialFunction;
Expand Down Expand Up @@ -177,25 +175,16 @@ private ActorRef getCommandForwarder(final ClusterConfig clusterConfig, final Ac

private static ActorRef startConnectionShardRegion(final ActorSystem actorSystem,
final Props connectionSupervisorProps, final ClusterConfig clusterConfig) {

final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem)
.withRole(ConnectivityMessagingConstants.CLUSTER_ROLE);

return ClusterSharding.get(actorSystem)
.start(ConnectivityMessagingConstants.SHARD_REGION,
connectionSupervisorProps,
shardingSettings,
ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));
return ShardRegionCreator.start(actorSystem, ConnectivityMessagingConstants.SHARD_REGION,
connectionSupervisorProps, clusterConfig.getNumberOfShards(),
ConnectivityMessagingConstants.CLUSTER_ROLE);
}

private static ActorRef startClientShardRegion(final ActorSystem actorSystem, final ConnectivityConfig config) {
final var numberOfShards = config.getClusterConfig().getNumberOfShards();
final var refreshInterval = config.getClientConfig().getSubscriptionRefreshDelay();
final var props = ClientSupervisor.props(numberOfShards, refreshInterval);
final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem)
.withRole(ConnectivityMessagingConstants.CLUSTER_ROLE);
return ClusterSharding.get(actorSystem)
.start(ConnectivityMessagingConstants.CLIENT_SHARD_REGION, props, shardingSettings,
ShardRegionExtractor.of(numberOfShards, actorSystem));
return ShardRegionCreator.start(actorSystem, ConnectivityMessagingConstants.CLIENT_SHARD_REGION, props,
numberOfShards, ConnectivityMessagingConstants.CLUSTER_ROLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -113,6 +114,7 @@ public Receive createReceive() {
.match(Terminated.class, this::childTerminated)
.match(CloseConnection.class, this::isNoClientActorStarted, this::respondAndStop)
.match(ConsistentHashingRouter.ConsistentHashableEnvelope.class, this::extractFromEnvelope)
.match(StopShardedActor.class, this::restartIfOpen)
.matchAny(this::forwardToClientActor)
.build();
}
Expand Down Expand Up @@ -196,6 +198,18 @@ private void respondAndStop(final CloseConnection command) {
getContext().stop(getSelf());
}

private void restartIfOpen(final StopShardedActor stopShardedActor) {
if (props != null) {
logger.debug("Restarting connected client actor.");
final var envelope =
new ConsistentHashingRouter.ConsistentHashableEnvelope(props, clientActorId.toString());
ClusterSharding.get(getContext().getSystem())
.shardRegion(ConnectivityMessagingConstants.CLIENT_SHARD_REGION)
.tell(envelope, ActorRef.noSender());
}
getContext().stop(getSelf());
}

private enum Control {
STATUS_CHECK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
Expand Down Expand Up @@ -184,6 +185,8 @@ public final class ConnectionPersistenceActor
private final ConnectionPubSub connectionPubSub;
private final ActorRef clientShardRegion;
private int subscriptionCounter = 0;
private int ongoingStagedCommands = 0;
private boolean inCoordinatedShutdown = false;
private Instant connectionClosedAt = Instant.now();
private boolean connectionOpened;
@Nullable private Instant loggingEnabledUntil;
Expand Down Expand Up @@ -552,6 +555,7 @@ private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence cor
public void onMutation(final Command<?> command, final ConnectivityEvent<?> event,
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted) {
if (command instanceof StagedCommand stagedCommand) {
++ongoingStagedCommands;
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
} else {
super.onMutation(command, event, response, becomeCreated, becomeDeleted);
Expand All @@ -578,6 +582,11 @@ protected void checkForActivity(final CheckForActivity trigger) {
private void interpretStagedCommand(final StagedCommand command) {
if (!command.hasNext()) {
// execution complete
--ongoingStagedCommands;
if (inCoordinatedShutdown && ongoingStagedCommands == 0) {
log.debug("Stopping after waiting for ongoing staged commands");
getContext().stop(getSelf());
}
return;
}
switch (command.nextAction()) {
Expand Down Expand Up @@ -662,6 +671,7 @@ protected Receive matchAnyAfterInitialization() {
.matchEquals(Control.CHECK_LOGGING_ACTIVE, this::checkLoggingEnabled)
.matchEquals(Control.TRIGGER_UPDATE_PRIORITY, this::triggerUpdatePriority)
.match(UpdatePriority.class, this::updatePriority)
.match(StopShardedActor.class, this::stopShardedActor)
.build()
.orElse(super.matchAnyAfterInitialization());
}
Expand Down Expand Up @@ -921,7 +931,7 @@ private static Object consistentHashableEnvelope(final Object message, final Obj
private void broadcastToClientActors(final Object cmd, final ActorRef sender) {
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
final var envelope = consistentHashableEnvelope(cmd, clientActorId.toString());
clientShardRegion.tell(envelope, sender);
}
}
Expand All @@ -933,7 +943,7 @@ private CompletionStage<Object> askAllClientActors(final Signal<?> cmd) {
CompletionStage<Object> askFuture = CompletableFuture.completedStage(null);
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
final var envelope = consistentHashableEnvelope(cmd, clientActorId.toString());
askFuture = askFuture.thenCombine(
processClientAskResult(Patterns.ask(clientShardRegion, envelope, clientActorAskTimeout)),
(left, right) -> right
Expand Down Expand Up @@ -1117,6 +1127,16 @@ private ConnectivityCommandInterceptor getCommandValidator() {
return new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator);
}

private void stopShardedActor(final StopShardedActor trigger) {
if (ongoingStagedCommands == 0) {
log.debug("Stopping: no ongoing requests.");
getContext().stop(getSelf());
} else {
inCoordinatedShutdown = true;
log.debug("Waiting for <{}> staged commands before stopping", ongoingStagedCommands);
}
}

private static DittoRuntimeException toDittoRuntimeException(final Throwable error, final ConnectionId id,
final DittoHeaders headers) {

Expand Down

0 comments on commit 892aa4a

Please sign in to comment.