Skip to content

Commit

Permalink
Centralize the concept of processors configuration (#89662)
Browse files Browse the repository at this point in the history
This commit centralize the processor count concept into the Processors class.
With this change now all the places using a processor count rely on this new class,
such as desired nodes, `node.processors` setting and autoscaling deciders.

- Processor counts are rounded to up to 5 decimal places
- Processors can be represented as doubles

Desired nodes processors were stored as floats, this poses some challenges during
upgrades as once the value is casted to a double, the precision increases and therefore
the number is not the same. In order to allow idempotent desired nodes updates after
upgrades, this commit introduces `DesiredNode#equalsWithProcessorsCloseTo(DesiredNode that)`
which allows comparing two desired nodes that differ up to a max delta in their processor
specification as floats.
  • Loading branch information
fcofdez committed Sep 6, 2022
1 parent 33ff7b2 commit 284dce6
Show file tree
Hide file tree
Showing 39 changed files with 796 additions and 241 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89662.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89662
summary: Centralize the concept of processors configuration
area: Autoscaling
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.monitor.os;

import org.apache.lucene.util.Constants;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -24,7 +25,7 @@
public class EvilOsProbeTests extends ESTestCase {

public void testOsPrettyName() throws IOException {
final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), randomIntBetween(1, 8));
final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), Processors.of((double) randomIntBetween(1, 8)));
if (Constants.LINUX) {
final List<String> lines;
if (Files.exists(PathUtils.get("/etc/os-release"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xcontent.json.JsonXContent;

Expand All @@ -31,37 +32,109 @@
import static org.hamcrest.Matchers.is;

public class DesiredNodesUpgradeIT extends AbstractRollingTestCase {
private enum ProcessorsPrecision {
DOUBLE,
FLOAT
}

public void testUpgradeDesiredNodes() throws Exception {
// Desired nodes was introduced in 8.1
if (UPGRADE_FROM_VERSION.before(Version.V_8_1_0)) {
return;
}

if (UPGRADE_FROM_VERSION.onOrAfter(Processors.DOUBLE_PROCESSORS_SUPPORT_VERSION)) {
assertUpgradedNodesCanReadDesiredNodes();
} else if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent();
} else {
assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions();
}
}

private void assertUpgradedNodesCanReadDesiredNodes() throws Exception {
final int desiredNodesVersion = switch (CLUSTER_TYPE) {
case OLD -> 1;
case MIXED -> FIRST_MIXED_ROUND ? 2 : 3;
case UPGRADED -> 4;
};

if (CLUSTER_TYPE != ClusterType.OLD) {
final Map<String, Object> desiredNodes = getLatestDesiredNodes();
final String historyId = extractValue(desiredNodes, "history_id");
final int version = extractValue(desiredNodes, "version");
assertThat(historyId, is(equalTo("upgrade_test")));
assertThat(version, is(equalTo(desiredNodesVersion - 1)));
}

addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(desiredNodesVersion, ProcessorsPrecision.DOUBLE);
assertAllDesiredNodesAreActualized();
}

private void assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent() throws Exception {
// We define the same set of desired nodes to ensure that they are equal across all
// the test runs, otherwise we cannot guarantee an idempotent update in this test
final var desiredNodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
1238.49922909,
ByteSizeValue.ofGb(32),
ByteSizeValue.ofGb(128),
Version.CURRENT
)
)
.toList();

final int desiredNodesVersion = switch (CLUSTER_TYPE) {
case OLD -> 1;
case MIXED -> FIRST_MIXED_ROUND ? 2 : 3;
case UPGRADED -> 4;
};

if (CLUSTER_TYPE != ClusterType.OLD) {
updateDesiredNodes(desiredNodes, desiredNodesVersion - 1);
}
for (int i = 0; i < 2; i++) {
updateDesiredNodes(desiredNodes, desiredNodesVersion);
}

final Map<String, Object> latestDesiredNodes = getLatestDesiredNodes();
final int latestDesiredNodesVersion = extractValue(latestDesiredNodes, "version");
assertThat(latestDesiredNodesVersion, is(equalTo(desiredNodesVersion)));

if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertAllDesiredNodesAreActualized();
}
}

private void assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions() throws Exception {
switch (CLUSTER_TYPE) {
case OLD -> addClusterNodesToDesiredNodesWithIntegerProcessors(1);
case MIXED -> {
int version = FIRST_MIXED_ROUND ? 2 : 3;
if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version);
} else {
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
final var responseException = expectThrows(
ResponseException.class,
() -> addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version)
);
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertThat(statusCode, is(equalTo(400)));
}
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
final var responseException = expectThrows(
ResponseException.class,
() -> addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(version, ProcessorsPrecision.FLOAT)
);
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertThat(statusCode, is(equalTo(400)));
}
case UPGRADED -> {
assertAllDesiredNodesAreActualized();
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(4);
addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(4, ProcessorsPrecision.FLOAT);
}
}

getLatestDesiredNodes();
}

private Map<String, Object> getLatestDesiredNodes() throws IOException {
final var getDesiredNodesRequest = new Request("GET", "/_internal/desired_nodes/_latest");
final var response = client().performRequest(getDesiredNodesRequest);
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
return responseAsMap(response);
}

private void assertAllDesiredNodesAreActualized() throws Exception {
Expand All @@ -77,32 +150,34 @@ private void assertAllDesiredNodesAreActualized() throws Exception {
}
}

private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(int version) throws Exception {
private void addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(int version, ProcessorsPrecision processorsPrecision)
throws Exception {
final List<DesiredNode> nodes;
if (randomBoolean()) {
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
0.5f,
processorsPrecision == ProcessorsPrecision.DOUBLE ? randomDoubleProcessorCount() : randomFloatProcessorCount(),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
} else {
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
new DesiredNode.ProcessorsRange(randomIntBetween(1, 10), (float) randomIntBetween(20, 30)),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
nodes = getNodeNames().stream().map(nodeName -> {
double minProcessors = processorsPrecision == ProcessorsPrecision.DOUBLE
? randomDoubleProcessorCount()
: randomFloatProcessorCount();
return new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
new DesiredNode.ProcessorsRange(minProcessors, minProcessors + randomIntBetween(10, 20)),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
);
}).toList();
}
updateDesiredNodes(nodes, version);
}
Expand All @@ -123,7 +198,7 @@ private void addClusterNodesToDesiredNodesWithIntegerProcessors(int version) thr
}

private void updateDesiredNodes(List<DesiredNode> nodes, int version) throws IOException {
final var request = new Request("PUT", "/_internal/desired_nodes/history/" + version);
final var request = new Request("PUT", "/_internal/desired_nodes/upgrade_test/" + version);
try (var builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.xContentList(UpdateDesiredNodesRequest.NODES_FIELD.getPreferredName(), nodes);
Expand All @@ -149,6 +224,14 @@ private List<String> getNodeNames() throws Exception {
return nodeNames;
}

private double randomDoubleProcessorCount() {
return randomDoubleBetween(0.5, 512.1234, true);
}

private float randomFloatProcessorCount() {
return randomIntBetween(1, 512) + randomFloat();
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception {

public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().startNodes(
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 2.9).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 5.9).build()
);

final String node_1 = nodesIds.get(0);
Expand Down Expand Up @@ -134,6 +134,8 @@ public void testAllocatedProcessors() throws Exception {
);

assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(3));
assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(2.9));
assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(6));
assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(5.9));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ protected void doExecute(Task task, UpdateDesiredNodesRequest request, ActionLis
if (request.isCompatibleWithVersion(minNodeVersion) == false) {
listener.onFailure(
new IllegalArgumentException(
"Unable to use processor ranges or floating-point processors in mixed-clusters with nodes in version: " + minNodeVersion
"Unable to use processor ranges, floating-point (with greater precision) processors "
+ "in mixed-clusters with nodes in version: "
+ minNodeVersion
)
);
return;
Expand All @@ -124,7 +126,7 @@ static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDe
);

if (latestDesiredNodes != null) {
if (latestDesiredNodes.equals(proposedDesiredNodes)) {
if (latestDesiredNodes.equalsWithProcessorsCloseTo(proposedDesiredNodes)) {
return latestDesiredNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public UpdateDesiredNodesRequest(StreamInput in) throws IOException {
this.historyID = in.readString();
this.version = in.readLong();
this.nodes = in.readList(DesiredNode::readFrom);
dryRun = in.getVersion().onOrAfter(DRY_RUN_VERSION) ? in.readBoolean() : false;
if (in.getVersion().onOrAfter(DRY_RUN_VERSION)) {
this.dryRun = in.readBoolean();
} else {
this.dryRun = false;
}
}

@Override
Expand Down Expand Up @@ -98,6 +102,7 @@ public boolean isCompatibleWithVersion(Version version) {
if (version.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
return true;
}

return nodes.stream().allMatch(desiredNode -> desiredNode.isCompatibleWithVersion(version));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ private void validate(DesiredNode node) {
int minProcessors = node.roundedDownMinProcessors();
Integer roundedUpMaxProcessors = node.roundedUpMaxProcessors();
int maxProcessors = roundedUpMaxProcessors == null ? minProcessors : roundedUpMaxProcessors;
Setting.intSetting(NODE_PROCESSORS_SETTING.getKey(), minProcessors, 1, maxProcessors, Setting.Property.NodeScope).get(settings);
Setting.doubleSetting(
NODE_PROCESSORS_SETTING.getKey(),
minProcessors,
Double.MIN_VALUE,
maxProcessors,
Setting.Property.NodeScope
).get(settings);
final Settings.Builder updatedSettings = Settings.builder().put(settings);
updatedSettings.remove(NODE_PROCESSORS_SETTING.getKey());
settings = updatedSettings.build();
Expand Down

0 comments on commit 284dce6

Please sign in to comment.