Skip to content

Commit

Permalink
Fix ClusterAvailabilityRule
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Apr 19, 2023
1 parent da9d74d commit e1fa84b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -271,16 +272,16 @@ private void fetchRuleSet() {
this.storageProvider.getResourceClusterScaleRules(this.clusterId)
.thenApply(rules -> {
Set<ContainerSkuID> removedKeys = new HashSet<>(this.skuToRuleMap.keySet());
removedKeys.removeAll(rules.getScaleRules().keySet());
final Set<ContainerSkuID> preservedKeys = rules.getScaleRules().keySet().stream().map(ContainerSkuID::of).collect(Collectors.toSet());
removedKeys.removeAll(preservedKeys);
removedKeys.forEach(this.skuToRuleMap::remove);

rules
.getScaleRules().values()
.forEach(rule -> {
log.info("Cluster [{}]: Adding scaleRule: {}", this.clusterId, rule);
this.skuToRuleMap.put(
rule.getSkuId(),
new ClusterAvailabilityRule(rule, this.clock));
.forEach(scaleRule -> {
log.info("Cluster [{}]: Adding scaleRule: {}", this.clusterId, scaleRule);
final ClusterAvailabilityRule clusterAvailabilityRule = createClusterAvailabilityRule(scaleRule, this.skuToRuleMap.get(scaleRule.getSkuId()));
this.skuToRuleMap.put(scaleRule.getSkuId(), clusterAvailabilityRule);
});
return GetRuleSetResponse.builder()
.rules(ImmutableMap.copyOf(this.skuToRuleMap))
Expand All @@ -291,6 +292,14 @@ private void fetchRuleSet() {
pipe(fetchFut, getContext().getDispatcher()).to(getSelf());
}

private ClusterAvailabilityRule createClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, ClusterAvailabilityRule existingRule) {
if (existingRule == null) {
return new ClusterAvailabilityRule(scaleSpec, this.clock, Instant.MIN, true);
}
// If rule exists already, port over lastActionInstant and enabled from existing rule
return new ClusterAvailabilityRule(scaleSpec, this.clock, existingRule.lastActionInstant, existingRule.enabled);
}

private void onSetScalerStatus(SetResourceClusterScalerStatusRequest req) {
if (skuToRuleMap.containsKey(req.getSkuId())) {
skuToRuleMap.get(req.getSkuId()).setEnabled(req.getEnabled());
Expand Down Expand Up @@ -357,27 +366,36 @@ static class ClusterAvailabilityRule {
private Instant lastActionInstant;
private boolean enabled;

public ClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, Clock clock) {
public ClusterAvailabilityRule(ResourceClusterScaleSpec scaleSpec, Clock clock, Instant lastActionInstant, Boolean enabled) {
this.scaleSpec = scaleSpec;
this.clock = clock;

// TODO: probably we should use current time
this.lastActionInstant = Instant.MIN;
this.enabled = true;
this.lastActionInstant = lastActionInstant;
this.enabled = enabled;
}

private void resetLastActionInstant() {
log.debug("resetLastActionInstant: {}, {}", this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());

lastActionInstant = clock.instant();
}

public void setEnabled(boolean enabled) {
log.debug("setEnabled: {}, {}, {}", enabled, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());

this.enabled = enabled;
resetLastActionInstant();
}

public boolean isEnabled() { return enabled; }

public boolean isLastActionOlderThan(long secondsSinceLastAction) {
log.debug("[isLastActionOlderThan] secondsSinceLastAction: {}, {}, {}", secondsSinceLastAction, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());
log.debug("[isLastActionOlderThan] lastActionInstant: {}, {}, {}", lastActionInstant, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());
log.debug("[isLastActionOlderThan] lastActionInstant + secondsSinceLastAction: {}, {}, {}", lastActionInstant.plusSeconds(secondsSinceLastAction), this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());
log.debug("[isLastActionOlderThan] comp: {}, {}, {}", lastActionInstant.plusSeconds(secondsSinceLastAction).compareTo(clock.instant()) > 0, this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());

return lastActionInstant.plusSeconds(secondsSinceLastAction).compareTo(clock.instant()) > 0;
}

Expand Down Expand Up @@ -430,7 +448,10 @@ else if (usage.getIdleCount() < scaleSpec.getMinIdleToKeep()) {
this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId(), decision);

// reset last action only if we decided to scale up or down
resetLastActionInstant();
if (decision.isPresent() && (decision.get().type.equals(ScaleType.ScaleDown) || decision.get().type.equals(ScaleType.ScaleUp))) {
log.debug("Ongoing scale operation. Resetting last action timer: {}, {}", this.scaleSpec.getClusterId(), this.scaleSpec.getSkuId());
resetLastActionInstant();
}
return decision;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ public void testRuleCoolDown() {
.minSize(11)
.maxSize(15)
.build(),
Clock.fixed(Clock.systemUTC().instant(), ZoneId.systemDefault()));
Clock.fixed(Clock.systemUTC().instant(), ZoneId.systemDefault()),
Instant.MIN,
true);

// Test scale up
UsageByGroupKey usage = UsageByGroupKey.builder()
Expand Down Expand Up @@ -302,7 +304,9 @@ public void testRuleFinishCoolDown() throws InterruptedException {
.minSize(11)
.maxSize(15)
.build(),
Clock.systemUTC());
Clock.systemUTC(),
Instant.MIN,
true);

// Test scale up
UsageByGroupKey usage =
Expand Down Expand Up @@ -353,7 +357,9 @@ public void testRule() {
.minSize(11)
.maxSize(15)
.build(),
Clock.fixed(Instant.MIN, ZoneId.systemDefault()));
Clock.fixed(Instant.MIN, ZoneId.systemDefault()),
Instant.MIN,
true);

// Test scale up
UsageByGroupKey usage = UsageByGroupKey.builder()
Expand Down

0 comments on commit e1fa84b

Please sign in to comment.