Skip to content

Commit

Permalink
Fix how the health API is reporting initializing shards (#93502)
Browse files Browse the repository at this point in the history
When a shard is initializing is not available for usage but it's also not unassigned. In this PR, we update the symptom and we add a new diagnosis to guide the user through this situation.
  • Loading branch information
gmarouli committed Feb 8, 2023
1 parent e228103 commit db17d38
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 25 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/93502.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93502
summary: Health API fix the reporting of initializing shards
area: Health
type: bug
issues: [90327]
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
FIX_DELAYED_SHARDS_GUIDE
);

public static final String WAIT_FOR_INITIALIZATION_GUIDE = "https://ela.st/wait-for-shard-initialization";
public static final Diagnosis.Definition DIAGNOSIS_WAIT_FOR_INITIALIZATION = new Diagnosis.Definition(
NAME,
"initializing_shards",
"Elasticsearch is currently initializing the unavailable shards. Please wait for the initialization to finish.",
"The shards will become available as long as the initialization completes. No action is required by the user, you can"
+ " monitor the progress of the initializing shards at "
+ WAIT_FOR_INITIALIZATION_GUIDE
+ ".",
WAIT_FOR_INITIALIZATION_GUIDE
);

public static final String ENABLE_INDEX_ALLOCATION_GUIDE = "https://ela.st/fix-index-allocation";
public static final Diagnosis.Definition ACTION_ENABLE_INDEX_ROUTING_ALLOCATION = new Diagnosis.Definition(
NAME,
Expand Down Expand Up @@ -412,7 +424,12 @@ public void increment(ShardRouting routing, ClusterState state, NodesShutdownMet
}
}
}
case INITIALIZING -> initializing++;
case INITIALIZING -> {
initializing++;
if (verbose) {
addDefinition(DIAGNOSIS_WAIT_FOR_INITIALIZATION, routing.getIndexName());
}
}
case STARTED -> started++;
case RELOCATING -> relocating++;
}
Expand Down Expand Up @@ -451,22 +468,16 @@ List<Diagnosis.Definition> diagnoseUnassignedShardRouting(ShardRouting shardRout
List<Diagnosis.Definition> diagnosisDefs = new ArrayList<>();
LOGGER.trace("Diagnosing unassigned shard [{}] due to reason [{}]", shardRouting.shardId(), shardRouting.unassignedInfo());
switch (shardRouting.unassignedInfo().getLastAllocationStatus()) {
case NO_VALID_SHARD_COPY:
diagnosisDefs.add(ACTION_RESTORE_FROM_SNAPSHOT);
break;
case NO_ATTEMPT:
case NO_VALID_SHARD_COPY -> diagnosisDefs.add(ACTION_RESTORE_FROM_SNAPSHOT);
case NO_ATTEMPT -> {
if (shardRouting.unassignedInfo().isDelayed()) {
diagnosisDefs.add(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS);
} else {
diagnosisDefs.addAll(explainAllocationsAndDiagnoseDeciders(shardRouting, state));
}
break;
case DECIDERS_NO:
diagnosisDefs.addAll(explainAllocationsAndDiagnoseDeciders(shardRouting, state));
break;
case DELAYED_ALLOCATION:
diagnosisDefs.add(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS);
break;
}
case DECIDERS_NO -> diagnosisDefs.addAll(explainAllocationsAndDiagnoseDeciders(shardRouting, state));
case DELAYED_ALLOCATION -> diagnosisDefs.add(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS);
}
if (diagnosisDefs.isEmpty()) {
diagnosisDefs.add(ACTION_CHECK_ALLOCATION_EXPLAIN_API);
Expand Down Expand Up @@ -556,10 +567,10 @@ List<Diagnosis.Definition> diagnoseAllocationResults(
* @return A predicate that returns true if the decision exists and matches the expected outcome, false otherwise.
*/
private static Predicate<NodeAllocationResult> hasDeciderResult(String deciderName, Decision.Type outcome) {
return (nodeResult) -> nodeResult.getCanAllocateDecision()
.getDecisions()
.stream()
.anyMatch(decision -> deciderName.equals(decision.label()) && outcome == decision.type());
return (nodeResult) -> {
Decision decision = nodeResult.getCanAllocateDecision();
return decision != null && decision.getDecisions().stream().anyMatch(d -> deciderName.equals(d.label()) && outcome == d.type());
};
}

/**
Expand Down Expand Up @@ -804,13 +815,17 @@ public String getSymptom() {
|| primaries.unassigned_new > 0
|| primaries.unassigned_restarting > 0
|| replicas.unassigned > 0
|| replicas.unassigned_restarting > 0) {
|| replicas.unassigned_restarting > 0
|| primaries.initializing > 0
|| replicas.initializing > 0) {
builder.append(
Stream.of(
createMessage(primaries.unassigned, "unavailable primary shard", "unavailable primary shards"),
createMessage(primaries.unassigned_new, "creating primary shard", "creating primary shards"),
createMessage(primaries.unassigned_restarting, "restarting primary shard", "restarting primary shards"),
createMessage(replicas.unassigned, "unavailable replica shard", "unavailable replica shards"),
createMessage(primaries.initializing, "initializing primary shard", "initializing primary shards"),
createMessage(replicas.initializing, "initializing replica shard", "initializing replica shards"),
createMessage(replicas.unassigned_restarting, "restarting replica shard", "restarting replica shards")
).flatMap(Function.identity()).collect(joining(", "))
).append(".");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_REQUIRE_DATA;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_REQUIRE_DATA_LOOKUP;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_RESTORE_FROM_SNAPSHOT;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.DIAGNOSIS_WAIT_FOR_INITIALIZATION;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.NAME;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.AVAILABLE;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.CREATING;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.INITIALIZING;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.RESTARTING;
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.UNAVAILABLE;
Expand Down Expand Up @@ -137,6 +139,98 @@ public void testShouldBeGreenWhenAllPrimariesAndReplicasAreStarted() {
);
}

public void testShouldBeYellowWhenReplicaIsInitializing() {
var clusterState = createClusterStateWith(
List.of(
index("replicated-index", new ShardAllocation(randomNodeId(), AVAILABLE), new ShardAllocation(randomNodeId(), INITIALIZING))
),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);

assertThat(
service.calculate(true, HealthInfo.EMPTY_HEALTH_INFO),
equalTo(
createExpectedResult(
YELLOW,
"This cluster has 1 initializing replica shard.",
Map.of("started_primaries", 1, "initializing_replicas", 1),
List.of(
new HealthIndicatorImpact(
NAME,
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_IMPACT_ID,
2,
"Searches might be slower than usual. Fewer redundant copies of the data exist on 1 index [replicated-index].",
List.of(ImpactArea.SEARCH)
)
),
List.of(
new Diagnosis(
DIAGNOSIS_WAIT_FOR_INITIALIZATION,
List.of(new Diagnosis.Resource(INDEX, List.of("replicated-index")))
)
)
)
)
);
}

public void testShouldBeRedWhenPrimaryIsInitializing() {
var clusterState = createClusterStateWith(
List.of(index("unreplicated-index", new ShardAllocation(randomNodeId(), INITIALIZING))),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);

HealthIndicatorResult calculate = service.calculate(true, HealthInfo.EMPTY_HEALTH_INFO);
assertThat(
calculate,
equalTo(
createExpectedResult(
RED,
"This cluster has 1 initializing primary shard.",
Map.of("initializing_primaries", 1),
List.of(
new HealthIndicatorImpact(
NAME,
ShardsAvailabilityHealthIndicatorService.PRIMARY_UNASSIGNED_IMPACT_ID,
1,
"Cannot add data to 1 index [unreplicated-index]. Searches might return incomplete results.",
List.of(ImpactArea.INGEST, ImpactArea.SEARCH)
)
),
List.of(
new Diagnosis(
DIAGNOSIS_WAIT_FOR_INITIALIZATION,
List.of(new Diagnosis.Resource(INDEX, List.of("unreplicated-index")))
)
)
)
)
);
}

public void testShouldBeGreenWhenAllPrimariesAreCreating() {
var clusterState = createClusterStateWith(
List.of(index("unreplicated-index", new ShardAllocation(randomNodeId(), CREATING))),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);

assertThat(
service.calculate(true, HealthInfo.EMPTY_HEALTH_INFO),
equalTo(
createExpectedResult(
GREEN,
"This cluster has 1 creating primary shard.",
Map.of("creating_primaries", 1),
emptyList(),
emptyList()
)
)
);
}

public void testShouldBeYellowWhenThereAreUnassignedReplicas() {
var availableReplicas = randomList(0, 5, () -> new ShardAllocation(randomNodeId(), AVAILABLE));
var unavailableReplicas = randomList(1, 5, () -> new ShardAllocation(randomNodeId(), UNAVAILABLE));
Expand Down Expand Up @@ -439,10 +533,7 @@ public void testShouldBeYellowWhenRestartingReplicasReachedAllocationDelay() {
}

public void testShouldBeGreenWhenThereAreInitializingPrimaries() {
var clusterState = createClusterStateWith(
List.of(index("restarting-index", new ShardAllocation("node-0", INITIALIZING))),
List.of()
);
var clusterState = createClusterStateWith(List.of(index("restarting-index", new ShardAllocation("node-0", CREATING))), List.of());
var service = createShardsAvailabilityIndicatorService(clusterState);

assertThat(
Expand Down Expand Up @@ -802,6 +893,37 @@ public void testDiagnoseEnableIndexAllocation() {
assertThat(actions, contains(ACTION_ENABLE_INDEX_ROUTING_ALLOCATION));
}

public void testNodeAllocationResultWithNullDecision() {
// Index definition, 1 primary no replicas, allocation is not allowed
IndexMetadata indexMetadata = IndexMetadata.builder("red-index")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
.build()
)
.numberOfShards(1)
.numberOfReplicas(0)
.build();

var service = createShardsAvailabilityIndicatorService();

// Get the list of user actions that are generated for this unassigned index shard
List<Diagnosis.Definition> actions = service.checkIsAllocationDisabled(
indexMetadata,
List.of(
new NodeAllocationResult(
// Shard allocation is disabled on index
new DiscoveryNode(randomNodeId(), buildNewFakeTransportAddress(), Version.CURRENT),
new NodeAllocationResult.ShardStoreInfo(10),
null
)
)
);

assertThat(actions, hasSize(0));
}

public void testDiagnoseEnableClusterAllocation() {
// Index definition, 1 primary no replicas
IndexMetadata indexMetadata = IndexMetadata.builder("red-index")
Expand Down Expand Up @@ -1503,6 +1625,10 @@ public void testMappedFieldsForTelemetry() {
ACTION_INCREASE_NODE_CAPACITY.getUniqueId(),
equalTo("elasticsearch:health:shards_availability:diagnosis:increase_node_capacity_for_allocations")
);
assertThat(
DIAGNOSIS_WAIT_FOR_INITIALIZATION.getUniqueId(),
equalTo("elasticsearch:health:shards_availability:diagnosis:initializing_shards")
);
for (String tier : List.of("data_content", "data_hot", "data_warm", "data_cold", "data_frozen")) {
assertThat(
ACTION_ENABLE_TIERS_LOOKUP.get(tier).getUniqueId(),
Expand Down Expand Up @@ -1685,10 +1811,13 @@ private static ShardRouting createShardRouting(ShardId shardId, boolean primary,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null),
ShardRouting.Role.DEFAULT
);
if (allocation.state == INITIALIZING) {
if (allocation.state == CREATING) {
return routing;
}
routing = routing.initialize(allocation.nodeId, null, 0);
if (allocation.state == INITIALIZING) {
return routing;
}
routing = routing.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
if (allocation.state == AVAILABLE) {
return routing;
Expand Down Expand Up @@ -1718,7 +1847,7 @@ private static ShardRouting createShardRouting(ShardId shardId, boolean primary,

private static RecoverySource getSource(boolean primary, ShardState state) {
if (primary) {
return state == INITIALIZING
return state == CREATING
? RecoverySource.EmptyStoreRecoverySource.INSTANCE
: RecoverySource.ExistingStoreRecoverySource.INSTANCE;
} else {
Expand All @@ -1728,9 +1857,10 @@ private static RecoverySource getSource(boolean primary, ShardState state) {

public enum ShardState {
UNAVAILABLE,
INITIALIZING,
CREATING,
AVAILABLE,
RESTARTING
RESTARTING,
INITIALIZING,
}

private record ShardAllocation(String nodeId, ShardState state, Long unassignedTimeNanos, @Nullable UnassignedInfo unassignedInfo) {
Expand Down

0 comments on commit db17d38

Please sign in to comment.