Skip to content

Commit

Permalink
Add to ShardLimits information to HealthMetadata (#94116)
Browse files Browse the repository at this point in the history
create all the machinery to have the shard limits information from the master node. Later, these values will be used by the new shard limits health indicator.

issue #91119
  • Loading branch information
HiDAl committed Mar 2, 2023
1 parent 14c0147 commit d10a976
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 98 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94116.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94116
summary: Add to `HealthMetadata` information about `ShardLimits`
area: Health
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_MAX_HEADROOM_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN;
import static org.elasticsearch.test.NodeRoles.onlyRoles;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -40,33 +48,42 @@ public void testEachMasterPublishesTheirThresholds() throws Exception {
int numberOfNodes = 3;
Map<String, String> watermarkByNode = new HashMap<>();
Map<String, ByteSizeValue> maxHeadroomByNode = new HashMap<>();
Map<String, HealthMetadata.ShardLimits> shardLimitsPerNode = new HashMap<>();
for (int i = 0; i < numberOfNodes; i++) {
ByteSizeValue randomBytes = ByteSizeValue.ofBytes(randomLongBetween(6, 19));
String customWatermark = percentageMode ? randomIntBetween(86, 94) + "%" : randomBytes.toString();
ByteSizeValue customMaxHeadroom = percentageMode ? randomBytes : ByteSizeValue.MINUS_ONE;
String nodeName = startNode(internalCluster, customWatermark, customMaxHeadroom.toString());
var customShardLimits = new HealthMetadata.ShardLimits(randomIntBetween(0, 1000), randomIntBetween(1001, 2000));
String nodeName = startNode(internalCluster, customWatermark, customMaxHeadroom.toString(), customShardLimits);
watermarkByNode.put(nodeName, customWatermark);
maxHeadroomByNode.put(nodeName, customMaxHeadroom);
shardLimitsPerNode.put(nodeName, customShardLimits);
}
ensureStableCluster(numberOfNodes);

String electedMaster = internalCluster.getMasterName();
{
HealthMetadata.Disk diskMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state())
.getDiskMetadata();
var healthMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state());
var diskMetadata = healthMetadata.getDiskMetadata();
assertThat(diskMetadata.describeHighWatermark(), equalTo(watermarkByNode.get(electedMaster)));
assertThat(diskMetadata.highMaxHeadroom(), equalTo(maxHeadroomByNode.get(electedMaster)));

var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
assertEquals(shardLimitsMetadata, shardLimitsPerNode.get(electedMaster));
}

// Stop the master to ensure another node will become master with a different watermark
internalCluster.stopNode(electedMaster);
ensureStableCluster(numberOfNodes - 1);
electedMaster = internalCluster.getMasterName();
{
HealthMetadata.Disk diskMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state())
.getDiskMetadata();
var healthMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state());
var diskMetadata = healthMetadata.getDiskMetadata();
assertThat(diskMetadata.describeHighWatermark(), equalTo(watermarkByNode.get(electedMaster)));
assertThat(diskMetadata.highMaxHeadroom(), equalTo(maxHeadroomByNode.get(electedMaster)));

var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
assertEquals(shardLimitsMetadata, shardLimitsPerNode.get(electedMaster));
}
}
}
Expand All @@ -77,8 +94,12 @@ public void testWatermarkSettingUpdate() throws Exception {
ByteSizeValue randomBytes = ByteSizeValue.ofBytes(randomLongBetween(6, 19));
String initialWatermark = percentageMode ? randomIntBetween(86, 94) + "%" : randomBytes.toString();
ByteSizeValue initialMaxHeadroom = percentageMode ? randomBytes : ByteSizeValue.MINUS_ONE;
HealthMetadata.ShardLimits initialShardLimits = new HealthMetadata.ShardLimits(
randomIntBetween(0, 1000),
randomIntBetween(1001, 2000)
);
for (int i = 0; i < numberOfNodes; i++) {
startNode(internalCluster, initialWatermark, initialMaxHeadroom.toString());
startNode(internalCluster, initialWatermark, initialMaxHeadroom.toString(), initialShardLimits);
}

randomBytes = ByteSizeValue.ofBytes(randomLongBetween(101, 200));
Expand All @@ -90,79 +111,84 @@ public void testWatermarkSettingUpdate() throws Exception {
randomBytes = ByteSizeValue.ofBytes(randomLongBetween(5, 10));
String updatedFloodStageWatermark = percentageMode ? randomIntBetween(91, 95) + "%" : randomBytes.toString();
ByteSizeValue updatedFloodStageMaxHeadroom = percentageMode ? randomBytes : ByteSizeValue.MINUS_ONE;
HealthMetadata.ShardLimits updatedShardLimits = new HealthMetadata.ShardLimits(
randomIntBetween(3000, 4000),
randomIntBetween(4001, 5000)
);

ensureStableCluster(numberOfNodes);
{
HealthMetadata.Disk diskMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state())
.getDiskMetadata();
var healthMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state());
var diskMetadata = healthMetadata.getDiskMetadata();
assertThat(diskMetadata.describeHighWatermark(), equalTo(initialWatermark));
assertThat(diskMetadata.highMaxHeadroom(), equalTo(initialMaxHeadroom));

var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
assertEquals(shardLimitsMetadata, initialShardLimits);
}
Settings.Builder builder = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), updatedLowWatermark)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), updatedHighWatermark)
.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
updatedFloodStageWatermark
);
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), updatedLowWatermark)
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), updatedHighWatermark)
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), updatedFloodStageWatermark)
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), updatedShardLimits.maxShardsPerNode())
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN.getKey(), updatedShardLimits.maxShardsPerNodeFrozen());
if (percentageMode) {
builder = builder.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING.getKey(),
updatedLowMaxHeadroom
)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_MAX_HEADROOM_SETTING.getKey(), updatedHighMaxHeadroom)
.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.getKey(),
updatedFloodStageMaxHeadroom
);
builder = builder.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING.getKey(), updatedLowMaxHeadroom)
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_MAX_HEADROOM_SETTING.getKey(), updatedHighMaxHeadroom)
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.getKey(), updatedFloodStageMaxHeadroom);
}
internalCluster.client()
.admin()
.cluster()
.updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(builder))
.actionGet();

assertBusy(() -> {
HealthMetadata.Disk diskMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state())
.getDiskMetadata();
var healthMetadata = HealthMetadata.getFromClusterState(internalCluster.clusterService().state());
var diskMetadata = healthMetadata.getDiskMetadata();
assertThat(diskMetadata.describeHighWatermark(), equalTo(updatedHighWatermark));
assertThat(diskMetadata.highMaxHeadroom(), equalTo(updatedHighMaxHeadroom));
assertThat(diskMetadata.describeFloodStageWatermark(), equalTo(updatedFloodStageWatermark));
assertThat(diskMetadata.floodStageMaxHeadroom(), equalTo(updatedFloodStageMaxHeadroom));

var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
assertEquals(shardLimitsMetadata, updatedShardLimits);
});
}
}

private String startNode(InternalTestCluster internalCluster, String customWatermark, String customMaxHeadroom) {
private String startNode(
InternalTestCluster internalCluster,
String customWatermark,
String customMaxHeadroom,
HealthMetadata.ShardLimits customShardLimits
) {
return internalCluster.startNode(
Settings.builder()
.put(onlyRoles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)))
.put(createWatermarkSettings(customWatermark, customMaxHeadroom))
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), customShardLimits.maxShardsPerNode())
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN.getKey(), customShardLimits.maxShardsPerNodeFrozen())
.build()
);
}

private Settings createWatermarkSettings(String highWatermark, String highMaxHeadroom) {
// We define both thresholds to avoid inconsistencies over the type of the thresholds
Settings.Builder settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), percentageMode ? "85%" : "20b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), highWatermark)
.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
percentageMode ? "95%" : "1b"
)
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), percentageMode ? "85%" : "20b")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), highWatermark)
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), percentageMode ? "95%" : "1b")
.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_WATERMARK_SETTING.getKey(),
percentageMode ? "95%" : "5b"
);
if (percentageMode) {
settings = settings.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING.getKey(), "20b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.getKey(), "1b")
settings = settings.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING.getKey(), "20b")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.getKey(), "1b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING.getKey(), "5b");
if (highMaxHeadroom.equals("-1") == false) {
settings = settings.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_MAX_HEADROOM_SETTING.getKey(),
highMaxHeadroom
);
settings = settings.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_MAX_HEADROOM_SETTING.getKey(), highMaxHeadroom);
}
}
return settings.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand All @@ -36,15 +37,22 @@ public final class HealthMetadata extends AbstractNamedDiffable<ClusterState.Cus
public static final String TYPE = "health";

private static final ParseField DISK_METADATA = new ParseField(Disk.TYPE);
private static final ParseField SHARD_LIMITS_METADATA = new ParseField(ShardLimits.TYPE);

private final Disk diskMetadata;
@Nullable
private final ShardLimits shardLimitsMetadata;

public HealthMetadata(Disk diskMetadata) {
public HealthMetadata(Disk diskMetadata, ShardLimits shardLimitsMetadata) {
this.diskMetadata = diskMetadata;
this.shardLimitsMetadata = shardLimitsMetadata;
}

public HealthMetadata(StreamInput in) throws IOException {
this.diskMetadata = Disk.readFrom(in);
this.shardLimitsMetadata = in.getTransportVersion().onOrAfter(ShardLimits.VERSION_SUPPORTING_SHARD_LIMIT_FIELDS)
? in.readOptionalWriteable(ShardLimits::readFrom)
: null;
}

@Override
Expand All @@ -60,6 +68,9 @@ public TransportVersion getMinimalSupportedVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
diskMetadata.writeTo(out);
if (out.getTransportVersion().onOrAfter(ShardLimits.VERSION_SUPPORTING_SHARD_LIMIT_FIELDS)) {
out.writeOptionalWriteable(shardLimitsMetadata);
}
}

public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -72,6 +83,11 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignore
builder.startObject(DISK_METADATA.getPreferredName());
diskMetadata.toXContent(builder, params);
builder.endObject();
if (shardLimitsMetadata != null) {
builder.startObject(SHARD_LIMITS_METADATA.getPreferredName());
shardLimitsMetadata.toXContent(builder, params);
builder.endObject();
}
return builder;
});
}
Expand All @@ -84,22 +100,90 @@ public Disk getDiskMetadata() {
return diskMetadata;
}

public ShardLimits getShardLimitsMetadata() {
return shardLimitsMetadata;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HealthMetadata that = (HealthMetadata) o;
return Objects.equals(diskMetadata, that.diskMetadata);
return Objects.equals(diskMetadata, that.diskMetadata) && Objects.equals(shardLimitsMetadata, that.shardLimitsMetadata);
}

@Override
public int hashCode() {
return Objects.hash(diskMetadata);
return Objects.hash(diskMetadata, shardLimitsMetadata);
}

@Override
public String toString() {
return "HealthMetadata{diskMetadata=" + Strings.toString(diskMetadata) + '}';
return "HealthMetadata{diskMetadata=" + Strings.toString(diskMetadata) + ", shardLimitsMetadata=" + shardLimitsMetadata + "}";
}

/**
* Contains the thresholds needed to determine the health of a cluster when it comes to the amount of room available to create new
* shards. These values are determined by the elected master.
*/
public record ShardLimits(int maxShardsPerNode, int maxShardsPerNodeFrozen) implements ToXContentFragment, Writeable {

private static final String TYPE = "shard_limits";
private static final ParseField MAX_SHARDS_PER_NODE = new ParseField("max_shards_per_node");
private static final ParseField MAX_SHARDS_PER_NODE_FROZEN = new ParseField("max_shards_per_node_frozen");
static final TransportVersion VERSION_SUPPORTING_SHARD_LIMIT_FIELDS = TransportVersion.V_8_8_0;

static ShardLimits readFrom(StreamInput in) throws IOException {
return new ShardLimits(in.readInt(), in.readInt());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MAX_SHARDS_PER_NODE.getPreferredName(), maxShardsPerNode);
builder.field(MAX_SHARDS_PER_NODE_FROZEN.getPreferredName(), maxShardsPerNodeFrozen);
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxShardsPerNode);
out.writeInt(maxShardsPerNodeFrozen);
}

public static Builder newBuilder() {
return new Builder();
}

public static Builder newBuilder(ShardLimits shardLimits) {
return new Builder(shardLimits);
}

public static class Builder {

private int maxShardsPerNode;
private int maxShardsPerNodeFrozen;

private Builder() {}

private Builder(ShardLimits shardLimits) {
this.maxShardsPerNode = shardLimits.maxShardsPerNode;
this.maxShardsPerNodeFrozen = shardLimits.maxShardsPerNodeFrozen;
}

public Builder maxShardsPerNode(int maxShardsPerNode) {
this.maxShardsPerNode = maxShardsPerNode;
return this;
}

public Builder maxShardsPerNodeFrozen(int maxShardsPerNodeFrozen) {
this.maxShardsPerNodeFrozen = maxShardsPerNodeFrozen;
return this;
}

public ShardLimits build() {
return new ShardLimits(maxShardsPerNode, maxShardsPerNodeFrozen);
}
}
}

/**
Expand Down

0 comments on commit d10a976

Please sign in to comment.