Skip to content

Commit

Permalink
Autoscaling: add nodes to capacity response (#64915)
Browse files Browse the repository at this point in the history
When an autoscaling capacity is calculated, we base this on information
about the current nodes in the cluster. In some cases, we may return
too small or too large a capacity if nodes are missing in the cluster.
The current_nodes returned in the capacity response allows the client
of the autoscaling capacity API to detect this by comparing to the
expected set of nodes. This in turn allows the client to disregard the
capacity for a policy as long as there is a discrepancy.
  • Loading branch information
henningandersen committed Nov 24, 2020
1 parent 6854974 commit 6800f3b
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
autoscaling.put_autoscaling_policy:
name: my_autoscaling_policy
body:
roles : []
# voting_only requires master to start so we are sure no nodes match
roles: ["voting_only"]
deciders:
fixed:
storage: 1337b
Expand All @@ -32,6 +33,8 @@
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.storage: 1337b }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.memory: 7331b }
- match: { policies.my_autoscaling_policy.deciders.fixed.reason_summary: "fixed storage [1.3kb] memory [7.1kb] nodes [10]" }
- length: { policies.my_autoscaling_policy.current_nodes: 0 }


# test cleanup
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.elasticsearch.xpack.autoscaling.action.PolicyValidator;
import org.elasticsearch.xpack.autoscaling.policy.AutoscalingPolicy;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -107,6 +109,7 @@ private AutoscalingDeciderResults calculateForPolicy(AutoscalingPolicy policy, C
if (hasUnknownRoles(policy)) {
return new AutoscalingDeciderResults(
AutoscalingCapacity.ZERO,
Collections.emptySortedSet(),
new TreeMap<>(Map.of("_unknown_role", new AutoscalingDeciderResult(null, null)))
);
}
Expand All @@ -116,7 +119,7 @@ private AutoscalingDeciderResults calculateForPolicy(AutoscalingPolicy policy, C
.stream()
.map(entry -> Tuple.tuple(entry.getKey(), calculateForDecider(entry.getKey(), entry.getValue(), context)))
.collect(Collectors.toMap(Tuple::v1, Tuple::v2, (a, b) -> { throw new UnsupportedOperationException(); }, TreeMap::new));
return new AutoscalingDeciderResults(context.currentCapacity, results);
return new AutoscalingDeciderResults(context.currentCapacity, context.currentNodes, results);
}

/**
Expand All @@ -138,6 +141,7 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
private final SortedSet<DiscoveryNodeRole> roles;
private final ClusterState state;
private final ClusterInfo clusterInfo;
private final SortedSet<DiscoveryNode> currentNodes;
private final AutoscalingCapacity currentCapacity;
private final boolean currentCapacityAccurate;

Expand All @@ -147,6 +151,9 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
Objects.requireNonNull(clusterInfo);
this.state = state;
this.clusterInfo = clusterInfo;
this.currentNodes = StreamSupport.stream(state.nodes().spliterator(), false)
.filter(this::rolesFilter)
.collect(Collectors.toCollection(() -> new TreeSet<>(AutoscalingDeciderResults.DISCOVERY_NODE_COMPARATOR)));
this.currentCapacity = calculateCurrentCapacity();
this.currentCapacityAccurate = calculateCurrentCapacityAccurate();
}
Expand All @@ -167,13 +174,11 @@ public AutoscalingCapacity currentCapacity() {

@Override
public Set<DiscoveryNode> nodes() {
return StreamSupport.stream(state.nodes().spliterator(), false).filter(this::rolesFilter).collect(Collectors.toSet());
return currentNodes;
}

private boolean calculateCurrentCapacityAccurate() {
return StreamSupport.stream(state.nodes().spliterator(), false)
.filter(this::rolesFilter)
.allMatch(this::nodeHasAccurateCapacity);
return currentNodes.stream().allMatch(this::nodeHasAccurateCapacity);
}

private boolean nodeHasAccurateCapacity(DiscoveryNode node) {
Expand All @@ -182,8 +187,7 @@ private boolean nodeHasAccurateCapacity(DiscoveryNode node) {
}

private AutoscalingCapacity calculateCurrentCapacity() {
return StreamSupport.stream(state.nodes().spliterator(), false)
.filter(this::rolesFilter)
return currentNodes.stream()
.map(this::resourcesFor)
.map(c -> new AutoscalingCapacity(c, c))
.reduce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,34 @@

package org.elasticsearch.xpack.autoscaling.capacity;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* Represents a collection of individual autoscaling decider results that can be aggregated into a single autoscaling capacity for a
* policy
*/
public class AutoscalingDeciderResults implements ToXContent, Writeable {

public static final Comparator<DiscoveryNode> DISCOVERY_NODE_COMPARATOR = Comparator.comparing(DiscoveryNode::getName)
.thenComparing(DiscoveryNode::getId);
private final AutoscalingCapacity currentCapacity;
private final SortedSet<DiscoveryNode> currentNodes;
private final SortedMap<String, AutoscalingDeciderResult> results;

/**
Expand All @@ -35,9 +43,14 @@ public Map<String, AutoscalingDeciderResult> results() {
return results;
}

public AutoscalingDeciderResults(final AutoscalingCapacity currentCapacity, final SortedMap<String, AutoscalingDeciderResult> results) {
public AutoscalingDeciderResults(
final AutoscalingCapacity currentCapacity,
final SortedSet<DiscoveryNode> currentNodes,
final SortedMap<String, AutoscalingDeciderResult> results
) {
Objects.requireNonNull(currentCapacity);
this.currentCapacity = currentCapacity;
this.currentNodes = Objects.requireNonNull(currentNodes);
Objects.requireNonNull(results);
if (results.isEmpty()) {
throw new IllegalArgumentException("results can not be empty");
Expand All @@ -47,15 +60,24 @@ public AutoscalingDeciderResults(final AutoscalingCapacity currentCapacity, fina

public AutoscalingDeciderResults(final StreamInput in) throws IOException {
this.currentCapacity = new AutoscalingCapacity(in);
this.currentNodes = in.readSet(DiscoveryNode::new)
.stream()
.collect(Collectors.toCollection(() -> new TreeSet<>(DISCOVERY_NODE_COMPARATOR)));
this.results = new TreeMap<>(in.readMap(StreamInput::readString, AutoscalingDeciderResult::new));
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
currentCapacity.writeTo(out);
out.writeCollection(currentNodes);
out.writeMap(results, StreamOutput::writeString, (output, result) -> result.writeTo(output));
}

@Override
public boolean isFragment() {
return false;
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand All @@ -64,6 +86,15 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field("required_capacity", requiredCapacity);
}
builder.field("current_capacity", currentCapacity);
builder.startArray("current_nodes");
{
for (DiscoveryNode node : currentNodes) {
builder.startObject();
builder.field("name", node.getName());
builder.endObject();
}
}
builder.endArray();
builder.startObject("deciders");
for (Map.Entry<String, AutoscalingDeciderResult> entry : results.entrySet()) {
builder.startObject(entry.getKey());
Expand Down Expand Up @@ -92,6 +123,10 @@ public AutoscalingCapacity currentCapacity() {
return currentCapacity;
}

public SortedSet<DiscoveryNode> currentNodes() {
return currentNodes;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.autoscaling;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -27,6 +28,7 @@
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -50,7 +52,7 @@ public static AutoscalingDeciderResults randomAutoscalingDeciderResults() {
.mapToObj(i -> Tuple.tuple(Integer.toString(i), randomAutoscalingDeciderResult()))
.collect(Collectors.toMap(Tuple::v1, Tuple::v2, (a, b) -> { throw new IllegalStateException(); }, TreeMap::new));
AutoscalingCapacity capacity = new AutoscalingCapacity(randomAutoscalingResources(), randomAutoscalingResources());
return new AutoscalingDeciderResults(capacity, results);
return new AutoscalingDeciderResults(capacity, randomNodes(), results);
}

public static AutoscalingCapacity randomAutoscalingCapacity() {
Expand Down Expand Up @@ -83,6 +85,21 @@ public static AutoscalingCapacity.AutoscalingResources randomNullValueAutoscalin
);
}

public static SortedSet<DiscoveryNode> randomNodes() {
String prefix = randomAlphaOfLength(5);
return IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(
i -> new DiscoveryNode(
prefix + i,
buildNewFakeTransportAddress(),
Map.of(),
randomRoles().stream().map(DiscoveryNode::getRoleFromRoleName).collect(Collectors.toSet()),
Version.CURRENT
)
)
.collect(Collectors.toCollection(() -> new TreeSet<>(AutoscalingDeciderResults.DISCOVERY_NODE_COMPARATOR)));
}

public static ByteSizeValue randomByteSizeValue() {
// do not want to test any overflow.
return new ByteSizeValue(randomLongBetween(0, Long.MAX_VALUE >> 16));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.autoscaling.policy.AutoscalingPolicy;
import org.elasticsearch.xpack.autoscaling.policy.AutoscalingPolicyMetadata;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -77,6 +78,7 @@ public void testMultiplePoliciesFixedCapacity() {

// there is no nodes in any tier.
assertThat(results.currentCapacity(), equalTo(AutoscalingCapacity.ZERO));
assertThat(results.currentNodes(), equalTo(Collections.emptySortedSet()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@
package org.elasticsearch.xpack.autoscaling.capacity;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.autoscaling.AutoscalingTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -20,6 +30,7 @@
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class AutoscalingDeciderResultsTests extends AutoscalingTestCase {

Expand All @@ -28,6 +39,7 @@ public void testAutoscalingDeciderResultsRejectsEmptyResults() {
IllegalArgumentException.class,
() -> new AutoscalingDeciderResults(
new AutoscalingCapacity(randomAutoscalingResources(), randomAutoscalingResources()),
randomNodes(),
new TreeMap<>()
)
);
Expand Down Expand Up @@ -61,6 +73,48 @@ public void testRequiredCapacity() {
verifySingleMetricLarger(node, large, largerMemory, autoscalingCapacities, largerMemory);
}

public void testToXContent() {
AutoscalingDeciderResults results = randomAutoscalingDeciderResults();
Map<String, Object> map = toMap(results);
boolean hasRequiredCapacity = results.requiredCapacity() != null;
Set<String> roots = hasRequiredCapacity
? Set.of("current_capacity", "current_nodes", "deciders", "required_capacity")
: Set.of("current_capacity", "current_nodes", "deciders");
assertThat(map.keySet(), equalTo(roots));
assertThat(map.get("current_nodes"), instanceOf(List.class));
List<?> expectedNodes = results.currentNodes().stream().map(dn -> Map.of("name", dn.getName())).collect(Collectors.toList());
assertThat(map.get("current_nodes"), equalTo(expectedNodes));
if (hasRequiredCapacity) {
assertThat(map.get("required_capacity"), equalTo(toMap(results.requiredCapacity())));
}

assertThat(map.get("current_capacity"), equalTo(toMap(results.currentCapacity())));
assertThat(
map.get("deciders"),
equalTo(results.results().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> toMap(e.getValue()))))
);
}

public Map<String, Object> toMap(ToXContent tox) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
if (tox.isFragment()) {
builder.startObject();
}
tox.toXContent(builder, null);
if (tox.isFragment()) {
builder.endObject();
}
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, null, BytesReference.bytes(builder).streamInput())
) {
return parser.map();
}
} catch (IOException e) {
throw new AssertionError(e);
}
}

private void verifySingleMetricLarger(
boolean node,
AutoscalingCapacity expectedStorage,
Expand Down Expand Up @@ -91,7 +145,10 @@ private void verifyRequiredCapacity(AutoscalingCapacity expected, AutoscalingCap
TreeMap::new
)
);
assertThat(new AutoscalingDeciderResults(randomAutoscalingCapacity(), results).requiredCapacity(), equalTo(expected));
assertThat(
new AutoscalingDeciderResults(randomAutoscalingCapacity(), randomNodes(), results).requiredCapacity(),
equalTo(expected)
);
}

private AutoscalingCapacity randomCapacity(boolean node, boolean storage, boolean memory, int lower, int upper) {
Expand Down

0 comments on commit 6800f3b

Please sign in to comment.