Skip to content

Commit

Permalink
Integrate Things into Akka coordinated shutdown.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 1, 2022
1 parent 2293086 commit 45c5884
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public Receive createReceive() {
}

@Override
protected Receive activeBehaviour(
final FI.UnitApply<AbstractPersistenceSupervisor.Control> matchProcessNextTwinMessageBehavior,
protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.match(Config.class, this::onConnectivityConfigModified)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,17 @@ protected boolean shouldStartChildImmediately() {
return true;
}

protected Receive activeBehaviour(final FI.UnitApply<Control> matchProcessNextTwinMessageBehavior,
protected Receive activeBehaviour(
final Runnable matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {

return ReceiveBuilder.create()
.match(Terminated.class, this::childTerminated)
.matchEquals(Control.START_CHILDREN, this::startChildren)
.matchEquals(Control.PASSIVATE, this::passivate)
.matchEquals(Control.PROCESS_NEXT_TWIN_MESSAGE, decrementOpCounter(matchProcessNextTwinMessageBehavior))
.matchEquals(Control.SUDO_COMMAND_DONE, this::decrementSudoOpCounter)
.match(StopShardedActor.class, this::stopAfterOngoingOps)
.match(ProcessNextTwinMessage.class, decrementOpCounter(matchProcessNextTwinMessageBehavior))
.match(StopShardedActor.class, this::stopShardedActor)
.match(SudoCommand.class, this::forwardSudoCommandToChildIfAvailable)
.match(WithDittoHeaders.class, w -> w.getDittoHeaders().isSudo(),
this::forwardDittoSudoToChildIfAvailable)
Expand Down Expand Up @@ -349,7 +350,30 @@ protected <T> CompletionStage<Object> askTargetActor(final T message, final bool
});
}

private void stopAfterOngoingOps(final StopShardedActor trigger) {
/**
* Increment the op counter when receiving a non-sudo signal.
*
* @param signal the signal.
*/
protected void decrementOpCounter(final Signal<?> signal) {
--opCounter;
}

/**
* Decrement the op counter after completing processing a non-sudo signal.
*
* @param signal the signal.
*/
protected void incrementOpCounter(final Signal<?> signal) {
++opCounter;
}

/**
* Start terminating because the shard region is shutting down.
*
* @param trigger the hand-off message.
*/
protected void stopShardedActor(final StopShardedActor trigger) {
if (opCounter == 0 && sudoOpCounter == 0) {
log.debug("Stopping: no ongoing ops.");
getContext().stop(getSelf());
Expand All @@ -359,10 +383,11 @@ private void stopAfterOngoingOps(final StopShardedActor trigger) {
}
}

private FI.UnitApply<Control> decrementOpCounter(final FI.UnitApply<Control> matchProcessNextTwinMessageBehavior) {
return control -> {
--opCounter;
matchProcessNextTwinMessageBehavior.apply(control);
private FI.UnitApply<ProcessNextTwinMessage> decrementOpCounter(
final Runnable matchProcessNextTwinMessageBehavior) {
return processNextTwinMessage -> {
decrementOpCounter(processNextTwinMessage.signal());
matchProcessNextTwinMessageBehavior.run();
if (inCoordinatedShutdown && opCounter == 0 && sudoOpCounter == 0) {
log.debug("Stopping after waiting for ongoing ops.");
getContext().stop(getSelf());
Expand Down Expand Up @@ -437,9 +462,7 @@ private void becomeActive(final ShutdownBehaviour shutdownBehaviour) {
getContext().become(
shutdownBehaviour.createReceive().build().orElse(
activeBehaviour(
processNextTwinMessage -> {
// ingore
},
() -> {}, // ingore
this::enforceAndForwardToTargetActor
)
)
Expand All @@ -449,7 +472,7 @@ private void becomeActive(final ShutdownBehaviour shutdownBehaviour) {
protected void becomeTwinSignalProcessingAwaiting() {
getContext().become(
activeBehaviour(
processNextTwinMsg -> {
() -> {
unstashAll();
becomeActive(getShutdownBehaviour(entityId));
},
Expand Down Expand Up @@ -641,14 +664,14 @@ private void enforceAndForwardToTargetActor(final Object message) {
if (shouldBecomeTwinSignalProcessingAwaiting(signal)) {
becomeTwinSignalProcessingAwaiting();
}
final CompletionStage<Control> syncCs = signalTransformer.apply(signal)
final var syncCs = signalTransformer.apply(signal)
.whenComplete((result, error) -> handleOptionalTransformationException(signal, error, sender))
.thenCompose(transformed -> enforceSignalAndForwardToTargetActor((S) transformed, sender)
.whenComplete((response, throwable) -> {
handleSignalEnforcementResponse(response, throwable, transformed, sender);
}))
.handle((response, throwable) -> Control.PROCESS_NEXT_TWIN_MESSAGE);
++opCounter; // decremented by PROCESS_NEXT_TWIN_MESSAGE
.handle((response, throwable) -> new ProcessNextTwinMessage(signal));
incrementOpCounter(signal); // decremented by ProcessNextTwinMessage
Patterns.pipe(syncCs, getContext().getDispatcher()).pipeTo(getSelf(), getSelf());
}
} else if (null != persistenceActorChild) {
Expand Down Expand Up @@ -929,17 +952,19 @@ public enum Control {
*/
INIT_DONE,

/**
* Signals the actor to process the next message.
*/
PROCESS_NEXT_TWIN_MESSAGE,

/**
* Signals completion of a sudo command.
*/
SUDO_COMMAND_DONE
}

/**
* Signals the actor to process the next message.
*
* @param signal the previous message.
*/
private record ProcessNextTwinMessage(Signal<?> signal) {}

private record EnforcedSignalAndTargetActorResponse(@Nullable Signal<?> enforcedSignal,
@Nullable Object response) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
*/
package org.eclipse.ditto.things.service.common.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.DefaultSupervisorConfig;
import org.eclipse.ditto.base.service.config.supervision.SupervisorConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultActivityCheckConfig;
Expand All @@ -36,12 +37,15 @@ public final class DefaultThingConfig implements ThingConfig {

private static final String CONFIG_PATH = "thing";

private final Duration liveChannelShutdownTimeout;
private final SupervisorConfig supervisorConfig;
private final ActivityCheckConfig activityCheckConfig;
private final SnapshotConfig snapshotConfig;
private final CleanupConfig cleanupConfig;

private DefaultThingConfig(final ScopedConfig scopedConfig) {
liveChannelShutdownTimeout =
scopedConfig.getDuration(ConfigValue.LIVE_CHANNEL_SHUTDOWN_TIMEOUT.getConfigPath());
supervisorConfig = DefaultSupervisorConfig.of(scopedConfig);
activityCheckConfig = DefaultActivityCheckConfig.of(scopedConfig);
snapshotConfig = DefaultSnapshotConfig.of(scopedConfig);
Expand All @@ -56,7 +60,7 @@ private DefaultThingConfig(final ScopedConfig scopedConfig) {
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultThingConfig of(final Config config) {
return new DefaultThingConfig(DefaultScopedConfig.newInstance(config, CONFIG_PATH));
return new DefaultThingConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}

@Override
Expand All @@ -74,6 +78,16 @@ public SnapshotConfig getSnapshotConfig() {
return snapshotConfig;
}

@Override
public CleanupConfig getCleanupConfig() {
return cleanupConfig;
}

@Override
public Duration getLiveChannelShutdownTimeout() {
return liveChannelShutdownTimeout;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -86,12 +100,14 @@ public boolean equals(final Object o) {
return Objects.equals(supervisorConfig, that.supervisorConfig) &&
Objects.equals(activityCheckConfig, that.activityCheckConfig) &&
Objects.equals(snapshotConfig, that.snapshotConfig) &&
Objects.equals(cleanupConfig, that.cleanupConfig);
Objects.equals(cleanupConfig, that.cleanupConfig) &&
Objects.equals(liveChannelShutdownTimeout, that.liveChannelShutdownTimeout);
}

@Override
public int hashCode() {
return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig);
return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig,
liveChannelShutdownTimeout);
}

@Override
Expand All @@ -101,11 +117,7 @@ public String toString() {
", activityCheckConfig=" + activityCheckConfig +
", snapshotConfig=" + snapshotConfig +
", cleanupConfig=" + cleanupConfig +
", liveChannelShutdownTimeout=" + liveChannelShutdownTimeout +
"]";
}

@Override
public CleanupConfig getCleanupConfig() {
return cleanupConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package org.eclipse.ditto.things.service.common.config;

import java.time.Duration;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.WithSupervisorConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithSnapshotConfig;
import org.eclipse.ditto.internal.utils.persistentactors.cleanup.WithCleanupConfig;
Expand All @@ -25,4 +28,41 @@
@Immutable
public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConfig, WithSnapshotConfig,
WithCleanupConfig {

/**
* Get the timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown.
*
* @return The timeout.
*/
Duration getLiveChannelShutdownTimeout();

/**
* An enumeration of the known config path expressions and their associated default values for {@code ThingConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* Timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown.
*/
LIVE_CHANNEL_SHUTDOWN_TIMEOUT("live-channel-shutdown-timeout", Duration.ofSeconds(3));

private final String path;
private final Object defaultValue;

ConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletionStage;

Expand All @@ -30,6 +31,7 @@
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceSupervisor;
Expand Down Expand Up @@ -57,6 +59,8 @@
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
Expand Down Expand Up @@ -88,6 +92,14 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final PolicyEnforcerProvider policyEnforcerProvider;
private final ActorRef searchShardRegionProxy;

/**
* Counter for ongoing live signals going through this actor.
* The signals are also counted by the normal op counter.
*/
private int liveOpCounter = 0;

private final Duration liveChannelShutdownTimeout;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
@Nullable final ActorRef policiesShardRegion,
Expand All @@ -105,10 +117,10 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
this.distributedPubThingEventsForTwin = distributedPubThingEventsForTwin;
this.thingPersistenceActorPropsFactory = thingPersistenceActorPropsFactory;
persistenceActorChild = thingPersistenceActorRef;
final DefaultScopedConfig dittoScoped =
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());
final var dittoScoped = DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());
enforcementConfig = DefaultEnforcementConfig.of(dittoScoped);
final DittoThingsConfig thingsConfig = DittoThingsConfig.of(dittoScoped);
final var thingsConfig = DittoThingsConfig.of(dittoScoped);
liveChannelShutdownTimeout = thingsConfig.getThingConfig().getLiveChannelShutdownTimeout();

materializer = Materializer.createMaterializer(getContext());
responseReceiverCache = ResponseReceiverCache.lookup(getContext().getSystem());
Expand Down Expand Up @@ -355,5 +367,55 @@ protected ExponentialBackOffConfig getExponentialBackOffConfig() {
.getExponentialBackOffConfig();
}

@Override
protected void stopShardedActor(final StopShardedActor trigger) {
super.stopShardedActor(trigger);
if (liveOpCounter > 0) {
getTimers().startSingleTimer(Control.SHUTDOWN_LIVE_CHANNEL, Control.SHUTDOWN_LIVE_CHANNEL,
liveChannelShutdownTimeout);
}
}

@Override
protected void decrementOpCounter(final Signal<?> signal) {
super.decrementOpCounter(signal);
if (isChannelLiveOrSmart(signal)) {
--liveOpCounter;
}
}

@Override
protected void incrementOpCounter(final Signal<?> signal) {
super.incrementOpCounter(signal);
if (isChannelLiveOrSmart(signal)) {
++liveOpCounter;
}
}

@Override
protected Receive activeBehaviour(
final Runnable matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {

return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_LIVE_CHANNEL, this::shutdownLiveChannel)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

private void shutdownLiveChannel(final Control shutdown) {
log.warning("Live channel timeout <{}> reached; aborting <{}> live channel ops",
liveChannelShutdownTimeout, liveOpCounter);
getContext().stop(getSelf());
}

private boolean isChannelLiveOrSmart(final Signal<?> signal) {
return Signal.isChannelLive(signal) || Signal.isChannelSmart(signal);
}

private record CommandResponsePair<C, R>(C command, R response) {}

private enum Control {
SHUTDOWN_LIVE_CHANNEL
}
}
Loading

0 comments on commit 45c5884

Please sign in to comment.