Skip to content

Commit

Permalink
Also terminate twin operations on coordinated shutdown after timeout.
Browse files Browse the repository at this point in the history
Reason: Twin operations can include acknowledgement requests.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 2, 2022
1 parent 45c5884 commit 04cd054
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ protected void becomeCorrupted() {
getContext().cancelReceiveTimeout();
passivate(Control.PASSIVATE);
})
.match(StopShardedActor.class, trigger -> getContext().stop(getSelf()))
.matchAny(message -> replyUnavailableException(message, getSender()))
.build());
}
Expand Down Expand Up @@ -350,6 +351,15 @@ protected <T> CompletionStage<Object> askTargetActor(final T message, final bool
});
}

/**
* Get the number of ongoing ops.
*
* @return The op counter.
*/
protected int getOpCounter() {
return opCounter;
}

/**
* Increment the op counter when receiving a non-sudo signal.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ public final class DefaultThingConfig implements ThingConfig {

private static final String CONFIG_PATH = "thing";

private final Duration liveChannelShutdownTimeout;
private final Duration shutdownTimeout;
private final SupervisorConfig supervisorConfig;
private final ActivityCheckConfig activityCheckConfig;
private final SnapshotConfig snapshotConfig;
private final CleanupConfig cleanupConfig;

private DefaultThingConfig(final ScopedConfig scopedConfig) {
liveChannelShutdownTimeout =
scopedConfig.getDuration(ConfigValue.LIVE_CHANNEL_SHUTDOWN_TIMEOUT.getConfigPath());
shutdownTimeout = scopedConfig.getDuration(ConfigValue.SHUTDOWN_TIMEOUT.getConfigPath());
supervisorConfig = DefaultSupervisorConfig.of(scopedConfig);
activityCheckConfig = DefaultActivityCheckConfig.of(scopedConfig);
snapshotConfig = DefaultSnapshotConfig.of(scopedConfig);
Expand Down Expand Up @@ -84,8 +83,8 @@ public CleanupConfig getCleanupConfig() {
}

@Override
public Duration getLiveChannelShutdownTimeout() {
return liveChannelShutdownTimeout;
public Duration getShutdownTimeout() {
return shutdownTimeout;
}

@Override
Expand All @@ -101,13 +100,12 @@ public boolean equals(final Object o) {
Objects.equals(activityCheckConfig, that.activityCheckConfig) &&
Objects.equals(snapshotConfig, that.snapshotConfig) &&
Objects.equals(cleanupConfig, that.cleanupConfig) &&
Objects.equals(liveChannelShutdownTimeout, that.liveChannelShutdownTimeout);
Objects.equals(shutdownTimeout, that.shutdownTimeout);
}

@Override
public int hashCode() {
return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig,
liveChannelShutdownTimeout);
return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig, shutdownTimeout);
}

@Override
Expand All @@ -117,7 +115,7 @@ public String toString() {
", activityCheckConfig=" + activityCheckConfig +
", snapshotConfig=" + snapshotConfig +
", cleanupConfig=" + cleanupConfig +
", liveChannelShutdownTimeout=" + liveChannelShutdownTimeout +
", shutdownTimeout=" + shutdownTimeout +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConf
WithCleanupConfig {

/**
* Get the timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown.
* Get the timeout waiting for responses and acknowledgements during coordinated shutdown.
*
* @return The timeout.
*/
Duration getLiveChannelShutdownTimeout();
Duration getShutdownTimeout();

/**
* An enumeration of the known config path expressions and their associated default values for {@code ThingConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* Timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown.
* Timeout waiting for responses and acknowledgements during coordinated shutdown.
*/
LIVE_CHANNEL_SHUTDOWN_TIMEOUT("live-channel-shutdown-timeout", Duration.ofSeconds(3));
SHUTDOWN_TIMEOUT("shutdown-timeout", Duration.ofSeconds(3));

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,7 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final PolicyEnforcerProvider policyEnforcerProvider;
private final ActorRef searchShardRegionProxy;

/**
* Counter for ongoing live signals going through this actor.
* The signals are also counted by the normal op counter.
*/
private int liveOpCounter = 0;

private final Duration liveChannelShutdownTimeout;
private final Duration shutdownTimeout;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand All @@ -120,7 +114,7 @@ private ThingSupervisorActor(final ActorRef pubSubMediator,
final var dittoScoped = DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());
enforcementConfig = DefaultEnforcementConfig.of(dittoScoped);
final var thingsConfig = DittoThingsConfig.of(dittoScoped);
liveChannelShutdownTimeout = thingsConfig.getThingConfig().getLiveChannelShutdownTimeout();
shutdownTimeout = thingsConfig.getThingConfig().getShutdownTimeout();

materializer = Materializer.createMaterializer(getContext());
responseReceiverCache = ResponseReceiverCache.lookup(getContext().getSystem());
Expand Down Expand Up @@ -370,25 +364,8 @@ protected ExponentialBackOffConfig getExponentialBackOffConfig() {
@Override
protected void stopShardedActor(final StopShardedActor trigger) {
super.stopShardedActor(trigger);
if (liveOpCounter > 0) {
getTimers().startSingleTimer(Control.SHUTDOWN_LIVE_CHANNEL, Control.SHUTDOWN_LIVE_CHANNEL,
liveChannelShutdownTimeout);
}
}

@Override
protected void decrementOpCounter(final Signal<?> signal) {
super.decrementOpCounter(signal);
if (isChannelLiveOrSmart(signal)) {
--liveOpCounter;
}
}

@Override
protected void incrementOpCounter(final Signal<?> signal) {
super.incrementOpCounter(signal);
if (isChannelLiveOrSmart(signal)) {
++liveOpCounter;
if (getOpCounter() > 0) {
getTimers().startSingleTimer(Control.SHUTDOWN_TIMEOUT, Control.SHUTDOWN_TIMEOUT, shutdownTimeout);
}
}

Expand All @@ -398,24 +375,19 @@ protected Receive activeBehaviour(
final FI.UnitApply<Object> matchAnyBehavior) {

return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_LIVE_CHANNEL, this::shutdownLiveChannel)
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownLiveChannel)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

private void shutdownLiveChannel(final Control shutdown) {
log.warning("Live channel timeout <{}> reached; aborting <{}> live channel ops",
liveChannelShutdownTimeout, liveOpCounter);
log.warning("Shutdown timeout <{}> reached; aborting <{}> ops", shutdownTimeout, getOpCounter());
getContext().stop(getSelf());
}

private boolean isChannelLiveOrSmart(final Signal<?> signal) {
return Signal.isChannelLive(signal) || Signal.isChannelSmart(signal);
}

private record CommandResponsePair<C, R>(C command, R response) {}

private enum Control {
SHUTDOWN_LIVE_CHANNEL
SHUTDOWN_TIMEOUT
}
}
4 changes: 2 additions & 2 deletions things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ ditto {

thing {

live-channel-shutdown-timeout = 3s
live-channel-shutdown-timeout = ${?THING_LIVE_CHANNEL_SHUTDOWN_TIMEOUT}
shutdown-timeout = 3s
shutdown-timeout = ${?THING_SHUTDOWN_TIMEOUT}

activity-check {
# the interval of how long to keep an "inactive" Thing in memory:
Expand Down
2 changes: 1 addition & 1 deletion things/service/src/test/resources/thing-test.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
thing {
live-channel-shutdown-timeout = 5s
shutdown-timeout = 5s
activity-check {
inactive-interval = 100d
deleted-interval = 100d
Expand Down

0 comments on commit 04cd054

Please sign in to comment.