Skip to content

Commit

Permalink
Keep track of desired nodes status in cluster state (#87474)
Browse files Browse the repository at this point in the history
This commit adds desired nodes status tracking to the cluster state. Previously status was tracked
in-memory by DesiredNodesMembershipService this approach had certain limitations, and made
the consumer code more complex. This takes a simpler approach to keep the status updated when
the desired nodes are updated or when a new node joins, storing the status in the cluster state,
this allows to consume that information easily where it is necessary.
Additionally, this commit moves test code from depending directly of DesiredNodes which can be
seen as an internal data structure to rely more on UpdateDesiredNodesRequest.

Relates #84165
  • Loading branch information
fcofdez committed Jun 16, 2022
1 parent 43b415f commit eb8c4ba
Show file tree
Hide file tree
Showing 25 changed files with 1,311 additions and 731 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87474
summary: Keep track of desired nodes status in cluster state
area: Autoscaling
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -977,13 +977,17 @@ public XContentBuilder stringListField(String name, Collection<String> values) t
}

public XContentBuilder xContentList(String name, Collection<? extends ToXContent> values) throws IOException {
return xContentList(name, values, ToXContent.EMPTY_PARAMS);
}

public XContentBuilder xContentList(String name, Collection<? extends ToXContent> values, ToXContent.Params params) throws IOException {
field(name);
if (values == null) {
return nullValue();
}
startArray();
for (ToXContent value : values) {
value(value);
value(value, params);
}
endArray();
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,25 @@
package org.elasticsearch.upgrades;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.desirednodes.UpdateDesiredNodesRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.DesiredNode;
import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class DesiredNodesUpgradeIT extends AbstractRollingTestCase {
Expand All @@ -25,93 +38,119 @@ public void testUpgradeDesiredNodes() throws Exception {
}

switch (CLUSTER_TYPE) {
case OLD -> {
var response = updateDesiredNodes(1, desiredNodesWithIntegerProcessor());
var statusCode = response.getStatusLine().getStatusCode();
assertThat(statusCode, equalTo(200));
}
case OLD -> addClusterNodesToDesiredNodesWithIntegerProcessors(1);
case MIXED -> {
final var historyVersion = FIRST_MIXED_ROUND ? 2 : 3;
int version = FIRST_MIXED_ROUND ? 2 : 3;
if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
var response = updateDesiredNodes(historyVersion, desiredNodesWithRangeOrFloatProcessors());
var statusCode = response.getStatusLine().getStatusCode();
assertThat(statusCode, equalTo(200));
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version);
} else {
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
final var responseException = expectThrows(
ResponseException.class,
() -> updateDesiredNodes(historyVersion, desiredNodesWithRangeOrFloatProcessors())
() -> addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version)
);
var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertThat(statusCode, is(equalTo(400)));
}
}
case UPGRADED -> {
var response = updateDesiredNodes(4, desiredNodesWithRangeOrFloatProcessors());
var statusCode = response.getStatusLine().getStatusCode();
assertThat(statusCode, equalTo(200));
assertAllDesiredNodesAreActualized();
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(4);
}
}

final var getDesiredNodesRequest = new Request("GET", "/_internal/desired_nodes/_latest");
Response response = client().performRequest(getDesiredNodesRequest);
final var response = client().performRequest(getDesiredNodesRequest);
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
}

private Response updateDesiredNodes(int version, String body) throws Exception {
final var updateDesiredNodesRequest = new Request("PUT", "/_internal/desired_nodes/history/" + version);
updateDesiredNodesRequest.setJsonEntity(body);
return client().performRequest(updateDesiredNodesRequest);
private void assertAllDesiredNodesAreActualized() throws Exception {
final var request = new Request("GET", "_cluster/state/metadata");
final var response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
Map<String, Object> responseMap = responseAsMap(response);
List<Map<String, Object>> nodes = extractValue(responseMap, "metadata.desired_nodes.latest.nodes");
assertThat(nodes.size(), is(greaterThan(0)));
for (Map<String, Object> desiredNode : nodes) {
final int status = extractValue(desiredNode, "status");
assertThat((short) status, is(equalTo(DesiredNodeWithStatus.Status.ACTUALIZED.getValue())));
}
}

private String desiredNodesWithRangeOrFloatProcessors() {
private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(int version) throws Exception {
final List<DesiredNode> nodes;
if (randomBoolean()) {
return """
{
"nodes" : [
{
"settings" : {
"node.name" : "instance-000187"
},
"processors_range" : {"min": 9.0, "max": 10.0},
"memory" : "58gb",
"storage" : "1tb",
"node_version" : "99.1.0"
}
]
}""";
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
0.5f,
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
} else {
return """
{
"nodes" : [
{
"settings" : {
"node.name" : "instance-000187"
},
"processors" : 9.5,
"memory" : "58gb",
"storage" : "1tb",
"node_version" : "99.1.0"
}
]
}""";
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
new DesiredNode.ProcessorsRange(randomIntBetween(1, 10), (float) randomIntBetween(20, 30)),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
}
updateDesiredNodes(nodes, version);
}

private void addClusterNodesToDesiredNodesWithIntegerProcessors(int version) throws Exception {
final var nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
randomIntBetween(1, 24),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
updateDesiredNodes(nodes, version);
}

private void updateDesiredNodes(List<DesiredNode> nodes, int version) throws IOException {
final var request = new Request("PUT", "/_internal/desired_nodes/history/" + version);
try (var builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.xContentList(UpdateDesiredNodesRequest.NODES_FIELD.getPreferredName(), nodes);
builder.endObject();
request.setJsonEntity(Strings.toString(builder));
final var response = client().performRequest(request);
final var statusCode = response.getStatusLine().getStatusCode();
assertThat(statusCode, equalTo(200));
}
}

private List<String> getNodeNames() throws Exception {
final var request = new Request("GET", "/_nodes");
final var response = client().performRequest(request);
Map<String, Object> responseMap = responseAsMap(response);
Map<String, Map<String, Object>> nodes = extractValue(responseMap, "nodes");
final List<String> nodeNames = new ArrayList<>();
for (Map.Entry<String, Map<String, Object>> nodeInfoEntry : nodes.entrySet()) {
final String nodeName = extractValue(nodeInfoEntry.getValue(), "name");
nodeNames.add(nodeName);
}

return nodeNames;
}

private String desiredNodesWithIntegerProcessor() {
return """
{
"nodes" : [
{
"settings" : {
"node.name" : "instance-000187"
},
"processors" : 9,
"memory" : "58gb",
"storage" : "1tb",
"node_version" : "99.1.0"
}
]
}""";
@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
}

0 comments on commit eb8c4ba

Please sign in to comment.