Skip to content

Commit

Permalink
Add new Shards Capacity Health Indicator (#94552)
Browse files Browse the repository at this point in the history
Introduces a new Health Indicator to check the cluster's health from the shards' capacity perspective.

It calculates the amount of available room for data and frozen groups, according to the following rules:

```
if data or frozen nodes have less than 5 shards -> RED
if data or frozen nodes have less than 10 shards -> YELLOW
otherwise -> GREEN
```
  • Loading branch information
HiDAl committed Mar 24, 2023
1 parent 14e1a9d commit 5c353b0
Show file tree
Hide file tree
Showing 8 changed files with 884 additions and 36 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94552
summary: Add new `ShardsCapacity` Health Indicator Service
area: Health
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.HealthService;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.metadata.HealthMetadata;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeRoles;
import org.junit.After;
import org.junit.Before;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ShardsCapacityHealthIndicatorServiceIT extends ESIntegTestCase {

private static final String INDEX_NAME = "index-name";
private InternalTestCluster internalCluster;

@Before
public void setUp() throws Exception {
super.setUp();
internalCluster = internalCluster();
updateClusterSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 30));
}

@After
public void tearDown() throws Exception {
super.tearDown();
updateClusterSettings(
Settings.builder()
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY))
);
}

public void testGreen() throws Exception {
// index: 4 shards + 1 replica = 8 shards used (30 - 8 = 22 > 10 available shards)
createIndex(4, 1);

var result = fetchShardsCapacityIndicatorResult(internalCluster);
assertEquals(result.status(), HealthStatus.GREEN);
assertEquals(result.symptom(), "The cluster has enough room to add new shards.");
assertThat(result.diagnosisList(), empty());
assertThat(result.impacts(), empty());
}

public void testYellow() throws Exception {
// index: 11 shards + 1 replica = 22 shards used (30 - 22 < 10 available shards)
createIndex(10, 1);

var result = fetchShardsCapacityIndicatorResult(internalCluster);
assertEquals(result.status(), HealthStatus.YELLOW);
assertEquals(result.symptom(), "Cluster is close to reaching the configured maximum number of shards for data nodes.");
assertThat(result.diagnosisList(), hasSize(1));
assertThat(result.impacts(), hasSize(2));
}

public void testRed() throws Exception {
// index: 13 shards + 1 replica = 26 shards used (30 - 26 < 5 available shards)
createIndex(13, 1);

var result = fetchShardsCapacityIndicatorResult(internalCluster);
assertEquals(result.status(), HealthStatus.RED);
assertEquals(result.symptom(), "Cluster is close to reaching the configured maximum number of shards for data nodes.");
assertThat(result.diagnosisList(), hasSize(1));
assertThat(result.impacts(), hasSize(2));
}

private void createIndex(int shards, int replicas) {
createIndex(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, shards).put(SETTING_NUMBER_OF_REPLICAS, replicas).build());
}

private HealthIndicatorResult fetchShardsCapacityIndicatorResult(InternalTestCluster internalCluster) throws Exception {
var healthNode = findHealthNode().getName();
var healthService = internalCluster.getInstance(HealthService.class, healthNode);
var healthIndicatorResults = getHealthServiceResults(healthService, healthNode);
assertThat(healthIndicatorResults, hasSize(1));
return healthIndicatorResults.get(0);
}

private void setUpCluster(InternalTestCluster internalCluster) throws Exception {
internalCluster.startMasterOnlyNode();
internalCluster.startDataOnlyNode();
internalCluster.startNode(NodeRoles.onlyRole(DATA_FROZEN_NODE_ROLE));
ensureStableCluster(internalCluster.getNodeNames().length);
waitForHealthMetadata();
}

private List<HealthIndicatorResult> getHealthServiceResults(HealthService healthService, String node) throws Exception {
AtomicReference<List<HealthIndicatorResult>> resultListReference = new AtomicReference<>();
ActionListener<List<HealthIndicatorResult>> listener = new ActionListener<>() {
@Override
public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
resultListReference.set(healthIndicatorResults);
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
};
healthService.getHealth(internalCluster().client(node), ShardsCapacityHealthIndicatorService.NAME, true, 1000, listener);
assertBusy(() -> assertNotNull(resultListReference.get()));
return resultListReference.get();
}

private void waitForHealthMetadata() throws Exception {
assertBusy(() -> {
var healthMetadata = HealthMetadata.getFromClusterState(internalCluster().clusterService().state());

assertNotNull(healthMetadata);
assertNotNull(healthMetadata.getShardLimitsMetadata());
assertTrue(
"max_shards_per_node setting must be greater than 0",
healthMetadata.getShardLimitsMetadata().maxShardsPerNode() > 0
);
assertTrue(
"max_shards_per_node.frozen setting must be greater than 0",
healthMetadata.getShardLimitsMetadata().maxShardsPerNodeFrozen() > 0
);
});
}

private static DiscoveryNode findHealthNode() {
var state = internalCluster().clusterService().state();
DiscoveryNode healthNode = HealthNode.findHealthNode(state);
assertNotNull(healthNode);
return healthNode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.health.Diagnosis;
import org.elasticsearch.health.HealthIndicatorDetails;
import org.elasticsearch.health.HealthIndicatorImpact;
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.ImpactArea;
import org.elasticsearch.health.metadata.HealthMetadata;
import org.elasticsearch.indices.ShardLimitValidator;

import java.util.List;
import java.util.stream.Stream;

/**
* This indicator reports health data about the shard capacity across the cluster.
*
* <p>
* The indicator will report:
* * RED when there's room for less than 5 shards (either data or frozen nodes)
* * YELLOW when there's room for less than 10 shards (either data or frozen nodes)
* * GREEN otherwise
* </p>
*
* Although the `max_shard_per_node(.frozen)?` information is scoped by Node, we use the information from master because there is where
* the available room for new shards is checked before creating new indices.
*/
public class ShardsCapacityHealthIndicatorService implements HealthIndicatorService {

static final String NAME = "shards_capacity";

static final String DATA_NODE_NAME = "data";
static final String FROZEN_NODE_NAME = "frozen";
private static final String UPGRADE_BLOCKED = "The cluster has too many used shards to be able to upgrade.";
private static final String UPGRADE_AT_RISK =
"The cluster is running low on room to add new shard. Upgrading to a new version is at risk.";
private static final String INDEX_CREATION_BLOCKED =
"The cluster is running low on room to add new shards. Adding data to new indices is at risk";
private static final String INDEX_CREATION_RISK =
"The cluster is running low on room to add new shards. Adding data to new indices might soon fail.";
private static final String HELP_GUIDE = "https://ela.st/fix-shards-capacity";
private static final TriFunction<String, Setting<?>, String, Diagnosis> SHARD_MAX_CAPACITY_REACHED_FN = (
id,
setting,
indexType) -> new Diagnosis(
new Diagnosis.Definition(
NAME,
id,
"Elasticsearch is about to reach the maximum number of shards it can host, based on your current settings.",
"Increase the value of ["
+ setting.getKey()
+ "] cluster setting or remove "
+ indexType
+ " indices to clear up resources.",
HELP_GUIDE
),
null
);

static final List<HealthIndicatorImpact> RED_INDICATOR_IMPACTS = List.of(
new HealthIndicatorImpact(NAME, "upgrade_blocked", 1, UPGRADE_BLOCKED, List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)),
new HealthIndicatorImpact(NAME, "creation_of_new_indices_blocked", 1, INDEX_CREATION_BLOCKED, List.of(ImpactArea.INGEST))
);
static final List<HealthIndicatorImpact> YELLOW_INDICATOR_IMPACTS = List.of(
new HealthIndicatorImpact(NAME, "upgrade_at_risk", 2, UPGRADE_AT_RISK, List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)),
new HealthIndicatorImpact(NAME, "creation_of_new_indices_at_risk", 2, INDEX_CREATION_RISK, List.of(ImpactArea.INGEST))
);
static final Diagnosis SHARDS_MAX_CAPACITY_REACHED_DATA_NODES = SHARD_MAX_CAPACITY_REACHED_FN.apply(
"increase_max_shards_per_node",
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
"data"
);
static final Diagnosis SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES = SHARD_MAX_CAPACITY_REACHED_FN.apply(
"increase_max_shards_per_node_frozen",
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
"frozen"
);

private final ClusterService clusterService;

public ShardsCapacityHealthIndicatorService(ClusterService clusterService) {
this.clusterService = clusterService;
}

@Override
public String name() {
return NAME;
}

@Override
public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResourcesCount, HealthInfo healthInfo) {
var state = clusterService.state();
var healthMetadata = HealthMetadata.getFromClusterState(state);
if (healthMetadata == null || healthMetadata.getShardLimitsMetadata() == null) {
return unknownIndicator();
}

var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
return mergeIndicators(
calculateFrom(shardLimitsMetadata.maxShardsPerNode(), state, ShardLimitValidator::checkShardLimitForNormalNodes),
calculateFrom(shardLimitsMetadata.maxShardsPerNodeFrozen(), state, ShardLimitValidator::checkShardLimitForFrozenNodes)
);
}

private HealthIndicatorResult mergeIndicators(StatusResult dataNodes, StatusResult frozenNodes) {
var finalStatus = HealthStatus.merge(Stream.of(dataNodes.status, frozenNodes.status));
var diagnoses = List.<Diagnosis>of();
var symptomBuilder = new StringBuilder();

if (finalStatus == HealthStatus.GREEN) {
symptomBuilder.append("The cluster has enough room to add new shards.");
}

// RED and YELLOW status indicates that the cluster might have issues. finalStatus has the worst between *data (non-frozen) and
// frozen* nodes, so we have to check each of the groups in order of provide the right message.
if (finalStatus.indicatesHealthProblem()) {
symptomBuilder.append("Cluster is close to reaching the configured maximum number of shards for ");
if (dataNodes.status == frozenNodes.status) {
symptomBuilder.append(DATA_NODE_NAME).append(" and ").append(FROZEN_NODE_NAME);
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES, SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES);

} else if (dataNodes.status.indicatesHealthProblem()) {
symptomBuilder.append(DATA_NODE_NAME);
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES);

} else if (frozenNodes.status.indicatesHealthProblem()) {
symptomBuilder.append(FROZEN_NODE_NAME);
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES);
}

symptomBuilder.append(" nodes.");
}

var indicatorImpacts = switch (finalStatus) {
case RED -> RED_INDICATOR_IMPACTS;
case YELLOW -> YELLOW_INDICATOR_IMPACTS;
default -> List.<HealthIndicatorImpact>of();
};

return createIndicator(
finalStatus,
symptomBuilder.toString(),
buildDetails(dataNodes.result, frozenNodes.result),
indicatorImpacts,
diagnoses
);
}

static StatusResult calculateFrom(int maxShardsPerNodeSetting, ClusterState state, ShardsCapacityChecker checker) {
var result = checker.check(maxShardsPerNodeSetting, 5, 1, state);
if (result.canAddShards() == false) {
return new StatusResult(HealthStatus.RED, result);
}

result = checker.check(maxShardsPerNodeSetting, 10, 1, state);
if (result.canAddShards() == false) {
return new StatusResult(HealthStatus.YELLOW, result);
}

return new StatusResult(HealthStatus.GREEN, result);
}

static HealthIndicatorDetails buildDetails(ShardLimitValidator.Result dataNodes, ShardLimitValidator.Result frozenNodes) {
return (builder, params) -> {
builder.startObject();
{
builder.startObject(DATA_NODE_NAME);
builder.field("max_shards_in_cluster", dataNodes.maxShardsInCluster());
if (dataNodes.currentUsedShards().isPresent()) {
builder.field("current_used_shards", dataNodes.currentUsedShards().get());
}
builder.endObject();
}
{
builder.startObject("frozen");
builder.field("max_shards_in_cluster", frozenNodes.maxShardsInCluster());
if (frozenNodes.currentUsedShards().isPresent()) {
builder.field("current_used_shards", frozenNodes.currentUsedShards().get());
}
builder.endObject();
}
builder.endObject();
return builder;
};
}

private HealthIndicatorResult unknownIndicator() {
return createIndicator(
HealthStatus.UNKNOWN,
"Unable to determine shard capacity status.",
HealthIndicatorDetails.EMPTY,
List.of(),
List.of()
);
}

record StatusResult(HealthStatus status, ShardLimitValidator.Result result) {}

@FunctionalInterface
interface ShardsCapacityChecker {
ShardLimitValidator.Result check(int maxConfiguredShardsPerNode, int numberOfNewShards, int replicas, ClusterState state);
}
}

0 comments on commit 5c353b0

Please sign in to comment.