diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActor.java index f901f72ac..52a338438 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActor.java @@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.Builder; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -271,16 +272,16 @@ private void fetchRuleSet() { this.storageProvider.getResourceClusterScaleRules(this.clusterId) .thenApply(rules -> { Set removedKeys = new HashSet<>(this.skuToRuleMap.keySet()); - removedKeys.removeAll(rules.getScaleRules().keySet()); + final Set preservedKeys = rules.getScaleRules().keySet().stream().map(ContainerSkuID::of).collect(Collectors.toSet()); + removedKeys.removeAll(preservedKeys); removedKeys.forEach(this.skuToRuleMap::remove); rules .getScaleRules().values() - .forEach(rule -> { - log.info("Cluster [{}]: Adding scaleRule: {}", this.clusterId, rule); - this.skuToRuleMap.put( - rule.getSkuId(), - new ClusterAvailabilityRule(rule, this.clock)); + .forEach(scaleRule -> { + log.info("Cluster [{}]: Adding scaleRule: {}", this.clusterId, scaleRule); + final ClusterAvailabilityRule clusterAvailabilityRule = createClusterAvailabilityRule(scaleRule, this.skuToRuleMap.get(scaleRule.getSkuId())); + this.skuToRuleMap.put(scaleRule.getSkuId(), clusterAvailabilityRule); }); return GetRuleSetResponse.builder() .rules(ImmutableMap.copyOf(this.skuToRuleMap)) @@ -291,6 +292,14 @@ private void fetchRuleSet() { pipe(fetchFut, getContext().getDispatcher()).to(getSelf()); } + private ClusterAvailabilityRule createClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, ClusterAvailabilityRule existingRule) { + if (existingRule == null) { + return new ClusterAvailabilityRule(scaleSpec, this.clock, Instant.MIN, true); + } + // If rule exists already, port over lastActionInstant and enabled from existing rule + return new ClusterAvailabilityRule(scaleSpec, this.clock, existingRule.lastActionInstant, existingRule.enabled); + } + private void onSetScalerStatus(SetResourceClusterScalerStatusRequest req) { if (skuToRuleMap.containsKey(req.getSkuId())) { skuToRuleMap.get(req.getSkuId()).setEnabled(req.getEnabled()); @@ -357,20 +366,24 @@ static class ClusterAvailabilityRule { private Instant lastActionInstant; private boolean enabled; - public ClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, Clock clock) { + public ClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, Clock clock, Instant lastActionInstant, Boolean enabled) { this.scaleSpec = scaleSpec; this.clock = clock; // TODO: probably we should use current time - this.lastActionInstant = Instant.MIN; - this.enabled = true; + this.lastActionInstant = lastActionInstant; + this.enabled = enabled; } private void resetLastActionInstant() { + log.debug("resetLastActionInstant: {}, {}", this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + lastActionInstant = clock.instant(); } public void setEnabled(boolean enabled) { + log.debug("setEnabled: {}, {}, {}", enabled, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + this.enabled = enabled; resetLastActionInstant(); } @@ -378,6 +391,11 @@ public void setEnabled(boolean enabled) { public boolean isEnabled() { return enabled; } public boolean isLastActionOlderThan(long secondsSinceLastAction) { + log.debug("[isLastActionOlderThan] secondsSinceLastAction: {}, {}, {}", secondsSinceLastAction, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + log.debug("[isLastActionOlderThan] lastActionInstant: {}, {}, {}", lastActionInstant, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + log.debug("[isLastActionOlderThan] lastActionInstant + secondsSinceLastAction: {}, {}, {}", lastActionInstant.plusSeconds(secondsSinceLastAction), this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + log.debug("[isLastActionOlderThan] comp: {}, {}, {}", lastActionInstant.plusSeconds(secondsSinceLastAction).compareTo(clock.instant()) > 0, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + return lastActionInstant.plusSeconds(secondsSinceLastAction).compareTo(clock.instant()) > 0; } @@ -430,7 +448,10 @@ else if (usage.getIdleCount() < scaleSpec.getMinIdleToKeep()) { this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId(), decision); // reset last action only if we decided to scale up or down - resetLastActionInstant(); + if (decision.isPresent() && (decision.get().type.equals(ScaleType.ScaleDown) || decision.get().type.equals(ScaleType.ScaleUp))) { + log.debug("Ongoing scale operation. Resetting last action timer: {}, {}", this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId()); + resetLastActionInstant(); + } return decision; } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActorTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActorTests.java index e37570262..93c9884ff 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActorTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActorTests.java @@ -252,7 +252,9 @@ public void testRuleCoolDown() { .minSize(11) .maxSize(15) .build(), - Clock.fixed(Clock.systemUTC().instant(), ZoneId.systemDefault())); + Clock.fixed(Clock.systemUTC().instant(), ZoneId.systemDefault()), + Instant.MIN, + true); // Test scale up UsageByGroupKey usage = UsageByGroupKey.builder() @@ -302,7 +304,9 @@ public void testRuleFinishCoolDown() throws InterruptedException { .minSize(11) .maxSize(15) .build(), - Clock.systemUTC()); + Clock.systemUTC(), + Instant.MIN, + true); // Test scale up UsageByGroupKey usage = @@ -353,7 +357,9 @@ public void testRule() { .minSize(11) .maxSize(15) .build(), - Clock.fixed(Instant.MIN, ZoneId.systemDefault())); + Clock.fixed(Instant.MIN, ZoneId.systemDefault()), + Instant.MIN, + true); // Test scale up UsageByGroupKey usage = UsageByGroupKey.builder()