diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java index 36827df164..8eb14b2f14 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java @@ -305,6 +305,7 @@ protected void becomeCorrupted() { getContext().cancelReceiveTimeout(); passivate(Control.PASSIVATE); }) + .match(StopShardedActor.class, trigger -> getContext().stop(getSelf())) .matchAny(message -> replyUnavailableException(message, getSender())) .build()); } @@ -350,6 +351,15 @@ protected CompletionStage askTargetActor(final T message, final bool }); } + /** + * Get the number of ongoing ops. + * + * @return The op counter. + */ + protected int getOpCounter() { + return opCounter; + } + /** * Increment the op counter when receiving a non-sudo signal. * diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java index 4c605858b4..00c0774e8f 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java @@ -37,15 +37,14 @@ public final class DefaultThingConfig implements ThingConfig { private static final String CONFIG_PATH = "thing"; - private final Duration liveChannelShutdownTimeout; + private final Duration shutdownTimeout; private final SupervisorConfig supervisorConfig; private final ActivityCheckConfig activityCheckConfig; private final SnapshotConfig snapshotConfig; private final CleanupConfig cleanupConfig; private DefaultThingConfig(final ScopedConfig scopedConfig) { - liveChannelShutdownTimeout = - scopedConfig.getDuration(ConfigValue.LIVE_CHANNEL_SHUTDOWN_TIMEOUT.getConfigPath()); + shutdownTimeout = scopedConfig.getDuration(ConfigValue.SHUTDOWN_TIMEOUT.getConfigPath()); supervisorConfig = DefaultSupervisorConfig.of(scopedConfig); activityCheckConfig = DefaultActivityCheckConfig.of(scopedConfig); snapshotConfig = DefaultSnapshotConfig.of(scopedConfig); @@ -84,8 +83,8 @@ public CleanupConfig getCleanupConfig() { } @Override - public Duration getLiveChannelShutdownTimeout() { - return liveChannelShutdownTimeout; + public Duration getShutdownTimeout() { + return shutdownTimeout; } @Override @@ -101,13 +100,12 @@ public boolean equals(final Object o) { Objects.equals(activityCheckConfig, that.activityCheckConfig) && Objects.equals(snapshotConfig, that.snapshotConfig) && Objects.equals(cleanupConfig, that.cleanupConfig) && - Objects.equals(liveChannelShutdownTimeout, that.liveChannelShutdownTimeout); + Objects.equals(shutdownTimeout, that.shutdownTimeout); } @Override public int hashCode() { - return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig, - liveChannelShutdownTimeout); + return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, cleanupConfig, shutdownTimeout); } @Override @@ -117,7 +115,7 @@ public String toString() { ", activityCheckConfig=" + activityCheckConfig + ", snapshotConfig=" + snapshotConfig + ", cleanupConfig=" + cleanupConfig + - ", liveChannelShutdownTimeout=" + liveChannelShutdownTimeout + + ", shutdownTimeout=" + shutdownTimeout + "]"; } } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java index f18b60198c..544dd82a3c 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java @@ -30,11 +30,11 @@ public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConf WithCleanupConfig { /** - * Get the timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown. + * Get the timeout waiting for responses and acknowledgements during coordinated shutdown. * * @return The timeout. */ - Duration getLiveChannelShutdownTimeout(); + Duration getShutdownTimeout(); /** * An enumeration of the known config path expressions and their associated default values for {@code ThingConfig}. @@ -42,9 +42,9 @@ public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConf enum ConfigValue implements KnownConfigValue { /** - * Timeout waiting for responses and acknowledgements over the live channel during coordinated shutdown. + * Timeout waiting for responses and acknowledgements during coordinated shutdown. */ - LIVE_CHANNEL_SHUTDOWN_TIMEOUT("live-channel-shutdown-timeout", Duration.ofSeconds(3)); + SHUTDOWN_TIMEOUT("shutdown-timeout", Duration.ofSeconds(3)); private final String path; private final Object defaultValue; diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index 0b387302dc..ded7ee39e2 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -92,13 +92,7 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor 0) { - getTimers().startSingleTimer(Control.SHUTDOWN_LIVE_CHANNEL, Control.SHUTDOWN_LIVE_CHANNEL, - liveChannelShutdownTimeout); - } - } - - @Override - protected void decrementOpCounter(final Signal signal) { - super.decrementOpCounter(signal); - if (isChannelLiveOrSmart(signal)) { - --liveOpCounter; - } - } - - @Override - protected void incrementOpCounter(final Signal signal) { - super.incrementOpCounter(signal); - if (isChannelLiveOrSmart(signal)) { - ++liveOpCounter; + if (getOpCounter() > 0) { + getTimers().startSingleTimer(Control.SHUTDOWN_TIMEOUT, Control.SHUTDOWN_TIMEOUT, shutdownTimeout); } } @@ -398,24 +375,19 @@ protected Receive activeBehaviour( final FI.UnitApply matchAnyBehavior) { return ReceiveBuilder.create() - .matchEquals(Control.SHUTDOWN_LIVE_CHANNEL, this::shutdownLiveChannel) + .matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownLiveChannel) .build() .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior)); } private void shutdownLiveChannel(final Control shutdown) { - log.warning("Live channel timeout <{}> reached; aborting <{}> live channel ops", - liveChannelShutdownTimeout, liveOpCounter); + log.warning("Shutdown timeout <{}> reached; aborting <{}> ops", shutdownTimeout, getOpCounter()); getContext().stop(getSelf()); } - private boolean isChannelLiveOrSmart(final Signal signal) { - return Signal.isChannelLive(signal) || Signal.isChannelSmart(signal); - } - private record CommandResponsePair(C command, R response) {} private enum Control { - SHUTDOWN_LIVE_CHANNEL + SHUTDOWN_TIMEOUT } } diff --git a/things/service/src/main/resources/things.conf b/things/service/src/main/resources/things.conf index c05b531692..bfb6692b0b 100755 --- a/things/service/src/main/resources/things.conf +++ b/things/service/src/main/resources/things.conf @@ -31,8 +31,8 @@ ditto { thing { - live-channel-shutdown-timeout = 3s - live-channel-shutdown-timeout = ${?THING_LIVE_CHANNEL_SHUTDOWN_TIMEOUT} + shutdown-timeout = 3s + shutdown-timeout = ${?THING_SHUTDOWN_TIMEOUT} activity-check { # the interval of how long to keep an "inactive" Thing in memory: diff --git a/things/service/src/test/resources/thing-test.conf b/things/service/src/test/resources/thing-test.conf index 6b1bb7a35b..fd91ca6156 100644 --- a/things/service/src/test/resources/thing-test.conf +++ b/things/service/src/test/resources/thing-test.conf @@ -1,5 +1,5 @@ thing { - live-channel-shutdown-timeout = 5s + shutdown-timeout = 5s activity-check { inactive-interval = 100d deleted-interval = 100d