Skip to content

Commit

Permalink
[Health API] Abstract data tier diagnoses as node roles (#102466)
Browse files Browse the repository at this point in the history
We generalise the code that is diagnosing the shard availability when it comes to data tier issues. We make it more extensible, so in serverless we can introduce new roles.

For this reason, we consider a tier as a more specific kind of a role. Then we expose some methods and some diagnosis definitions in the ShardsAvailabilityHealthIndicatorService so they can be extended.
  • Loading branch information
gmarouli committed Nov 22, 2023
1 parent 5f4fb50 commit 9e3d0db
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 89 deletions.
6 changes: 5 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,5 +422,9 @@
org.elasticsearch.index.codec.bloomfilter.ES87BloomFilterPostingsFormat;
provides org.apache.lucene.codecs.DocValuesFormat with ES87TSDBDocValuesFormat;

exports org.elasticsearch.cluster.routing.allocation.shards to org.elasticsearch.shardhealth, org.elasticsearch.serverless.shardhealth;
exports org.elasticsearch.cluster.routing.allocation.shards
to
org.elasticsearch.shardhealth,
org.elasticsearch.serverless.shardhealth,
org.elasticsearch.serverless.apifiltering;
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ static void updateShardAllocationStatus(
);

public static final String ENABLE_TIER_ACTION_GUIDE = "https://ela.st/enable-tier";
public static final Map<String, Diagnosis.Definition> ACTION_ENABLE_TIERS_LOOKUP = DataTier.ALL_DATA_TIERS.stream()
private static final Map<String, Diagnosis.Definition> ACTION_ENABLE_TIERS_LOOKUP = DataTier.ALL_DATA_TIERS.stream()
.collect(
Collectors.toUnmodifiableMap(
tier -> tier,
Expand All @@ -276,7 +276,7 @@ static void updateShardAllocationStatus(
INCREASE_SHARD_LIMIT_ACTION_GUIDE
);

public static final Map<String, Diagnosis.Definition> ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP = DataTier.ALL_DATA_TIERS
private static final Map<String, Diagnosis.Definition> ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP = DataTier.ALL_DATA_TIERS
.stream()
.collect(
Collectors.toUnmodifiableMap(
Expand Down Expand Up @@ -307,7 +307,7 @@ static void updateShardAllocationStatus(
INCREASE_CLUSTER_SHARD_LIMIT_ACTION_GUIDE
);

public static final Map<String, Diagnosis.Definition> ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP = DataTier.ALL_DATA_TIERS
private static final Map<String, Diagnosis.Definition> ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP = DataTier.ALL_DATA_TIERS
.stream()
.collect(
Collectors.toUnmodifiableMap(
Expand Down Expand Up @@ -405,6 +405,7 @@ static void updateShardAllocationStatus(
TIER_CAPACITY_ACTION_GUIDE
);

// Visible for testing
public static final Map<String, Diagnosis.Definition> ACTION_INCREASE_TIER_CAPACITY_LOOKUP = DataTier.ALL_DATA_TIERS.stream()
.collect(
Collectors.toUnmodifiableMap(
Expand Down Expand Up @@ -622,11 +623,11 @@ List<Diagnosis.Definition> diagnoseAllocationResults(
ClusterState state,
List<NodeAllocationResult> nodeAllocationResults
) {
IndexMetadata index = state.metadata().index(shardRouting.index());
IndexMetadata indexMetadata = state.metadata().index(shardRouting.index());
List<Diagnosis.Definition> diagnosisDefs = new ArrayList<>();
if (index != null) {
diagnosisDefs.addAll(checkIsAllocationDisabled(index, nodeAllocationResults));
diagnosisDefs.addAll(checkDataTierRelatedIssues(index, nodeAllocationResults, state));
if (indexMetadata != null) {
diagnosisDefs.addAll(checkIsAllocationDisabled(indexMetadata, nodeAllocationResults));
diagnosisDefs.addAll(checkNodeRoleRelatedIssues(indexMetadata, nodeAllocationResults, state, shardRouting));
}
if (diagnosisDefs.isEmpty()) {
diagnosisDefs.add(ACTION_CHECK_ALLOCATION_EXPLAIN_API);
Expand All @@ -640,7 +641,7 @@ List<Diagnosis.Definition> diagnoseAllocationResults(
* @param outcome The outcome expected
* @return A predicate that returns true if the decision exists and matches the expected outcome, false otherwise.
*/
private static Predicate<NodeAllocationResult> hasDeciderResult(String deciderName, Decision.Type outcome) {
protected static Predicate<NodeAllocationResult> hasDeciderResult(String deciderName, Decision.Type outcome) {
return (nodeResult) -> {
Decision decision = nodeResult.getCanAllocateDecision();
return decision != null && decision.getDecisions().stream().anyMatch(d -> deciderName.equals(d.label()) && outcome == d.type());
Expand Down Expand Up @@ -676,26 +677,29 @@ List<Diagnosis.Definition> checkIsAllocationDisabled(IndexMetadata indexMetadata
}

/**
* Generates a list of diagnoses for common problems that keep a shard from allocating to nodes in a data tier.
* Generates a list of diagnoses for common problems that keep a shard from allocating to nodes depending on their role;
* a very common example of such a case are data tiers.
* @param indexMetadata Index metadata for the shard being diagnosed.
* @param nodeAllocationResults allocation decision results for all nodes in the cluster.
* @param clusterState the current cluster state.
* @param shardRouting the shard the nodeAllocationResults refer to
* @return A list of diagnoses for the provided unassigned shard
*/
public List<Diagnosis.Definition> checkDataTierRelatedIssues(
protected List<Diagnosis.Definition> checkNodeRoleRelatedIssues(
IndexMetadata indexMetadata,
List<NodeAllocationResult> nodeAllocationResults,
ClusterState clusterState
ClusterState clusterState,
ShardRouting shardRouting
) {
List<Diagnosis.Definition> diagnosisDefs = new ArrayList<>();
if (indexMetadata.getTierPreference().size() > 0) {
if (indexMetadata.getTierPreference().isEmpty() == false) {
List<NodeAllocationResult> dataTierAllocationResults = nodeAllocationResults.stream()
.filter(hasDeciderResult(DATA_TIER_ALLOCATION_DECIDER_NAME, Decision.Type.YES))
.toList();
if (dataTierAllocationResults.isEmpty()) {
// Shard must be allocated on specific tiers but no nodes were enabled for those tiers.
for (String tier : indexMetadata.getTierPreference()) {
Optional.ofNullable(ACTION_ENABLE_TIERS_LOOKUP.get(tier)).ifPresent(diagnosisDefs::add);
Optional.ofNullable(getAddNodesWithRoleAction(tier)).ifPresent(diagnosisDefs::add);
}
} else {
// Collect the nodes from the tiers this index is allowed on
Expand All @@ -719,29 +723,29 @@ public List<Diagnosis.Definition> checkDataTierRelatedIssues(

// Run checks for data tier specific problems
diagnosisDefs.addAll(
checkDataTierAtShardLimit(indexMetadata, clusterState, dataTierAllocationResults, dataTierNodes, preferredTier)
checkNodesWithRoleAtShardLimit(indexMetadata, clusterState, dataTierAllocationResults, dataTierNodes, preferredTier)
);
diagnosisDefs.addAll(checkDataTierShouldMigrate(indexMetadata, dataTierAllocationResults, preferredTier, dataTierNodes));
checkNotEnoughNodesInDataTier(dataTierAllocationResults, preferredTier).ifPresent(diagnosisDefs::add);
checkNotEnoughNodesWithRole(dataTierAllocationResults, preferredTier).ifPresent(diagnosisDefs::add);
}
}
return diagnosisDefs;
}

private List<Diagnosis.Definition> checkDataTierAtShardLimit(
protected List<Diagnosis.Definition> checkNodesWithRoleAtShardLimit(
IndexMetadata indexMetadata,
ClusterState clusterState,
List<NodeAllocationResult> dataTierAllocationResults,
Set<DiscoveryNode> dataTierNodes,
@Nullable String preferredTier
List<NodeAllocationResult> nodeRoleAllocationResults,
Set<DiscoveryNode> nodesWithRoles,
@Nullable String role
) {
// All tier nodes at shards limit?
if (dataTierAllocationResults.stream().allMatch(hasDeciderResult(ShardsLimitAllocationDecider.NAME, Decision.Type.NO))) {
// All applicable nodes at shards limit?
if (nodeRoleAllocationResults.stream().allMatch(hasDeciderResult(ShardsLimitAllocationDecider.NAME, Decision.Type.NO))) {
List<Diagnosis.Definition> diagnosisDefs = new ArrayList<>();
// We need the routing nodes for the tiers this index is allowed on to determine the offending shard limits
List<RoutingNode> dataTierRoutingNodes = clusterState.getRoutingNodes()
// We need the routing nodes for the role this index is allowed on to determine the offending shard limits
List<RoutingNode> candidateNodes = clusterState.getRoutingNodes()
.stream()
.filter(routingNode -> dataTierNodes.contains(routingNode.node()))
.filter(routingNode -> nodesWithRoles.contains(routingNode.node()))
.toList();

// Determine which total_shards_per_node settings are present
Expand All @@ -752,34 +756,29 @@ private List<Diagnosis.Definition> checkDataTierAtShardLimit(
// Determine which total_shards_per_node settings are keeping things from allocating
boolean clusterShardsPerNodeShouldChange = false;
if (clusterShardsPerNode > 0) {
int minShardCountInTier = dataTierRoutingNodes.stream()
.map(RoutingNode::numberOfOwningShards)
.min(Integer::compareTo)
.orElse(-1);
clusterShardsPerNodeShouldChange = minShardCountInTier >= clusterShardsPerNode;
int minShardCount = candidateNodes.stream().map(RoutingNode::numberOfOwningShards).min(Integer::compareTo).orElse(-1);
clusterShardsPerNodeShouldChange = minShardCount >= clusterShardsPerNode;
}
boolean indexShardsPerNodeShouldChange = false;
if (indexShardsPerNode > 0) {
int minShardCountInTier = dataTierRoutingNodes.stream()
int minShardCount = candidateNodes.stream()
.map(routingNode -> routingNode.numberOfOwningShardsForIndex(indexMetadata.getIndex()))
.min(Integer::compareTo)
.orElse(-1);
indexShardsPerNodeShouldChange = minShardCountInTier >= indexShardsPerNode;
indexShardsPerNodeShouldChange = minShardCount >= indexShardsPerNode;
}

// Add appropriate diagnosis
if (preferredTier != null) {
// We cannot allocate the shard to the most preferred tier because a shard limit is reached.
if (role != null) {
// We cannot allocate the shard to the most preferred role because a shard limit is reached.
if (clusterShardsPerNodeShouldChange) {
Optional.ofNullable(ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP.get(preferredTier))
.ifPresent(diagnosisDefs::add);
Optional.ofNullable(getIncreaseShardLimitClusterSettingAction(role)).ifPresent(diagnosisDefs::add);
}
if (indexShardsPerNodeShouldChange) {
Optional.ofNullable(ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP.get(preferredTier)).ifPresent(diagnosisDefs::add);
Optional.ofNullable(getIncreaseShardLimitIndexSettingAction(role)).ifPresent(diagnosisDefs::add);
}
} else {
// We couldn't determine a desired tier. This is likely because there are no tiers in the cluster,
// only `data` nodes. Give a generic ask for increasing the shard limit.
// We couldn't determine a desired role. Give a generic ask for increasing the shard limit.
if (clusterShardsPerNodeShouldChange) {
diagnosisDefs.add(ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING);
}
Expand Down Expand Up @@ -838,16 +837,16 @@ private static List<Diagnosis.Definition> checkDataTierShouldMigrate(
}
}

private static Optional<Diagnosis.Definition> checkNotEnoughNodesInDataTier(
List<NodeAllocationResult> dataTierAllocationResults,
@Nullable String preferredTier
protected Optional<Diagnosis.Definition> checkNotEnoughNodesWithRole(
List<NodeAllocationResult> nodeAllocationResults,
@Nullable String role
) {
// Not enough tier nodes to hold shards on different nodes?
if (dataTierAllocationResults.stream().allMatch(hasDeciderResult(SameShardAllocationDecider.NAME, Decision.Type.NO))) {
// We couldn't determine a desired tier. This is likely because there are no tiers in the cluster,
// only `data` nodes. Give a generic ask for increasing the shard limit.
if (preferredTier != null) {
return Optional.ofNullable(ACTION_INCREASE_TIER_CAPACITY_LOOKUP.get(preferredTier));
// Not enough nodes to hold shards on different nodes?
if (nodeAllocationResults.stream().allMatch(hasDeciderResult(SameShardAllocationDecider.NAME, Decision.Type.NO))) {
// We couldn't determine a desired role. This is likely because there are no nodes with the relevant role in the cluster.
// Give a generic ask for increasing the shard limit.
if (role != null) {
return Optional.ofNullable(getIncreaseNodeWithRoleCapacityAction(role));
} else {
return Optional.of(ACTION_INCREASE_NODE_CAPACITY);
}
Expand All @@ -856,6 +855,26 @@ private static Optional<Diagnosis.Definition> checkNotEnoughNodesInDataTier(
}
}

@Nullable
public Diagnosis.Definition getAddNodesWithRoleAction(String role) {
return ACTION_ENABLE_TIERS_LOOKUP.get(role);
}

@Nullable
public Diagnosis.Definition getIncreaseShardLimitIndexSettingAction(String role) {
return ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP.get(role);
}

@Nullable
public Diagnosis.Definition getIncreaseShardLimitClusterSettingAction(String role) {
return ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP.get(role);
}

@Nullable
public Diagnosis.Definition getIncreaseNodeWithRoleCapacityAction(String role) {
return ACTION_INCREASE_TIER_CAPACITY_LOOKUP.get(role);
}

public class ShardAllocationStatus {
protected final ShardAllocationCounts primaries = new ShardAllocationCounts();
protected final ShardAllocationCounts replicas = new ShardAllocationCounts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public Response(final ClusterName clusterName, final List<HealthIndicatorResult>
}
}

public Response(final ClusterName clusterName, final List<HealthIndicatorResult> indicators, HealthStatus topLevelStatus) {
this.indicators = indicators;
this.clusterName = clusterName;
this.status = topLevelStatus;
}

public ClusterName getClusterName() {
return clusterName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_CHECK_ALLOCATION_EXPLAIN_API;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_ENABLE_CLUSTER_ROUTING_ALLOCATION;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_ENABLE_INDEX_ROUTING_ALLOCATION;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_ENABLE_TIERS_LOOKUP;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_NODE_CAPACITY;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_INCLUDE_DATA;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_REQUIRE_DATA;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.ACTION_RESTORE_FROM_SNAPSHOT;
Expand All @@ -32,9 +32,16 @@
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.RESTORE_FROM_SNAPSHOT_ACTION_GUIDE;
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.TIER_CAPACITY_ACTION_GUIDE;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

public class ShardsAvailabilityActionGuideTests extends ESTestCase {

private final ShardsAvailabilityHealthIndicatorService service = new ShardsAvailabilityHealthIndicatorService(
mock(ClusterService.class),
mock(AllocationService.class),
mock(SystemIndices.class)
);

public void testRestoreFromSnapshotAction() {
assertThat(ACTION_RESTORE_FROM_SNAPSHOT.helpURL(), is(RESTORE_FROM_SNAPSHOT_ACTION_GUIDE));
}
Expand All @@ -60,20 +67,17 @@ public void testEnableClusterRoutingAllocation() {
}

public void testEnableDataTiers() {
assertThat(ACTION_ENABLE_TIERS_LOOKUP.get(DataTier.DATA_HOT).helpURL(), is(ENABLE_TIER_ACTION_GUIDE));
assertThat(service.getAddNodesWithRoleAction(DataTier.DATA_HOT).helpURL(), is(ENABLE_TIER_ACTION_GUIDE));
}

public void testIncreaseShardLimitIndexSettingInTier() {
assertThat(
ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING_LOOKUP.get(DataTier.DATA_HOT).helpURL(),
is(INCREASE_SHARD_LIMIT_ACTION_GUIDE)
);
assertThat(service.getIncreaseShardLimitIndexSettingAction(DataTier.DATA_HOT).helpURL(), is(INCREASE_SHARD_LIMIT_ACTION_GUIDE));
assertThat(ACTION_INCREASE_SHARD_LIMIT_INDEX_SETTING.helpURL(), is(INCREASE_SHARD_LIMIT_ACTION_GUIDE));
}

public void testIncreaseShardLimitClusterSettingInTier() {
assertThat(
ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING_LOOKUP.get(DataTier.DATA_HOT).helpURL(),
service.getIncreaseShardLimitClusterSettingAction(DataTier.DATA_HOT).helpURL(),
is(INCREASE_CLUSTER_SHARD_LIMIT_ACTION_GUIDE)
);
assertThat(ACTION_INCREASE_SHARD_LIMIT_CLUSTER_SETTING.helpURL(), is(INCREASE_CLUSTER_SHARD_LIMIT_ACTION_GUIDE));
Expand Down

0 comments on commit 9e3d0db

Please sign in to comment.