Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,19 @@ public void testShardWriteLoadsArePresent() {
}
}

public void testMaxHeapPerNodeIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
Map<String, ByteSizeValue> maxHeapSizePerNode = clusterInfoService.getClusterInfo().getMaxHeapSizePerNode();
assertNotNull(maxHeapSizePerNode);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
assertEquals(state.nodes().size(), maxHeapSizePerNode.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(maxHeapSizePerNode.containsKey(node.getId()));
assertThat(maxHeapSizePerNode.get(node.getId()), greaterThan(ByteSizeValue.ZERO));
}
}

public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_156_0_00);
public static final TransportVersion ML_INFERENCE_ENDPOINT_CACHE = def(9_157_0_00);
public static final TransportVersion INDEX_SOURCE = def(9_158_0_00);
public static final TransportVersion MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO = def(9_159_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
40 changes: 33 additions & 7 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
final Map<ShardId, Double> shardWriteLoads;
// max heap size per node ID
final Map<String, ByteSizeValue> maxHeapSizePerNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not really a problem introduced here but it'd be nice to indicate what the opaque String keys are -- presumably persistent node IDs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment in 44dcc78


protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -80,6 +82,7 @@ protected ClusterInfo() {
* @param estimatedHeapUsages estimated heap usage broken down by node
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
* @see #shardIdentifierFromRouting
* @param maxHeapSizePerNode node id to max heap size
*/
public ClusterInfo(
Map<String, DiskUsage> leastAvailableSpaceUsage,
Expand All @@ -90,7 +93,8 @@ public ClusterInfo(
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
Map<ShardId, Double> shardWriteLoads
Map<ShardId, Double> shardWriteLoads,
Map<String, ByteSizeValue> maxHeapSizePerNode
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
Expand All @@ -101,6 +105,7 @@ public ClusterInfo(
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -125,6 +130,11 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.shardWriteLoads = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO)) {
this.maxHeapSizePerNode = in.readImmutableMap(ByteSizeValue::readFrom);
} else {
this.maxHeapSizePerNode = Map.of();
}
}

@Override
Expand All @@ -144,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO)) {
out.writeMap(this.maxHeapSizePerNode, StreamOutput::writeWriteable);
}
}

/**
Expand Down Expand Up @@ -224,8 +237,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid
// committing to API payloads until the features are settled
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads/maxHeapSizePerNode at this stage,
// to avoid committing to API payloads until the features are settled
);
}

Expand Down Expand Up @@ -326,6 +339,10 @@ public ReservedSpace getReservedSpace(String nodeId, String dataPath) {
return result == null ? ReservedSpace.EMPTY : result;
}

public Map<String, ByteSizeValue> getMaxHeapSizePerNode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test similar to IndexShardIT.testHeapUsageEstimateIsPresent? We can postpone to follow-up work if that matches timing better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added test in 44dcc78

return this.maxHeapSizePerNode;
}

/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
Expand All @@ -351,7 +368,8 @@ public boolean equals(Object o) {
&& reservedSpace.equals(that.reservedSpace)
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
&& shardWriteLoads.equals(that.shardWriteLoads);
&& shardWriteLoads.equals(that.shardWriteLoads)
&& maxHeapSizePerNode.equals(that.maxHeapSizePerNode);
}

@Override
Expand All @@ -365,7 +383,8 @@ public int hashCode() {
reservedSpace,
estimatedHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads
shardWriteLoads,
maxHeapSizePerNode
);
}

Expand Down Expand Up @@ -489,6 +508,7 @@ public static class Builder {
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
private Map<ShardId, Double> shardWriteLoads = Map.of();
private Map<String, ByteSizeValue> maxHeapSizePerNode = Map.of();

public ClusterInfo build() {
return new ClusterInfo(
Expand All @@ -500,7 +520,8 @@ public ClusterInfo build() {
reservedSpace,
estimatedHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads
shardWriteLoads,
maxHeapSizePerNode
);
}

Expand Down Expand Up @@ -548,5 +569,10 @@ public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) {
this.shardWriteLoads = shardWriteLoads;
return this;
}

public Builder maxHeapSizePerNode(Map<String, ByteSizeValue> maxHeapSizePerNode) {
this.maxHeapSizePerNode = maxHeapSizePerNode;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
import org.elasticsearch.index.shard.ShardId;

Expand All @@ -35,6 +36,7 @@ public class ClusterInfoSimulator {
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
private final Map<String, ByteSizeValue> maxHeapSizePerNode;
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;

public ClusterInfoSimulator(RoutingAllocation allocation) {
Expand All @@ -45,6 +47,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
this.maxHeapSizePerNode = Map.copyOf(allocation.clusterInfo().maxHeapSizePerNode);
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
}

Expand Down Expand Up @@ -162,7 +165,8 @@ public ClusterInfo getClusterInfo() {
Map.of(),
estimatedHeapUsages,
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
allocation.clusterInfo().getShardWriteLoads()
allocation.clusterInfo().getShardWriteLoads(),
maxHeapSizePerNode
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ private boolean shouldRefresh() {
public ClusterInfo getClusterInfo() {
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
maxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
final var currentMaxHeapPerNode = this.maxHeapPerNode; // Make sure we use a consistent view
currentMaxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
if (estimatedHeapUsage != null) {
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
Expand All @@ -554,7 +555,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.reservedSpace,
estimatedHeapUsages,
nodeThreadPoolUsageStatsPerNode,
indicesStatsSummary.shardWriteLoads()
indicesStatsSummary.shardWriteLoads(),
currentMaxHeapPerNode
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
*
* This method returns the corresponding initializing shard that would be allocated to this node.
*/
private static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
public static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed to do the same assertion that is done in this decider, but in the stateless decider.

final ShardRouting initializingShard;
if (shardRouting.unassigned()) {
initializingShard = shardRouting.initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
Expand Down Expand Up @@ -45,7 +46,8 @@ public static ClusterInfo randomClusterInfo() {
randomReservedSpace(),
randomNodeHeapUsage(),
randomNodeUsageStatsForThreadPools(),
randomShardWriteLoad()
randomShardWriteLoad(),
randomMaxHeapSizes()
);
}

Expand All @@ -58,6 +60,15 @@ private static Map<ShardId, Double> randomShardWriteLoad() {
return builder;
}

private static Map<String, ByteSizeValue> randomMaxHeapSizes() {
int numEntries = randomIntBetween(0, 128);
Map<String, ByteSizeValue> nodeMaxHeapSizes = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
nodeMaxHeapSizes.put(randomAlphaOfLength(32), randomByteSizeValue());
}
return nodeMaxHeapSizes;
}

private static Map<String, EstimatedHeapUsage> randomNodeHeapUsage() {
int numEntries = randomIntBetween(0, 128);
Map<String, EstimatedHeapUsage> nodeHeapUsage = new HashMap<>(numEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ public void testUnassignedAllocationPredictsDiskUsage() {
ImmutableOpenMap.of(),
ImmutableOpenMap.of(),
ImmutableOpenMap.of(),
ImmutableOpenMap.of(),
ImmutableOpenMap.of()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,7 @@ static class DevNullClusterInfo extends ClusterInfo {
reservedSpace,
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void testCanAllocateUsesMaxAvailableSpace() {
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
RoutingAllocation allocation = new RoutingAllocation(
Expand Down Expand Up @@ -185,6 +186,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
RoutingAllocation allocation = new RoutingAllocation(
Expand Down Expand Up @@ -333,6 +335,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) {
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
RoutingAllocation allocation = new RoutingAllocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ private ExtendedClusterInfo(Map<String, Long> extraShardSizes, ClusterInfo info)
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
this.delegate = info;
Expand Down