Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shards allocation health indicator services #83513

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1b73587
shards allocation health indicator services
idegtiarenko Feb 4, 2022
65af584
Update docs/changelog/83513.yaml
idegtiarenko Feb 4, 2022
ef2bfaf
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Feb 10, 2022
bde7eb7
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Feb 14, 2022
171b819
update counts
idegtiarenko Feb 14, 2022
5f98333
refactor assertion
idegtiarenko Feb 14, 2022
31d698a
add summary string
idegtiarenko Feb 14, 2022
6fddaac
cleanup
idegtiarenko Feb 14, 2022
75a9730
fix forbidden api usage
idegtiarenko Feb 14, 2022
e7512cb
ignore temporary unallocated shards due to node restarting
idegtiarenko Feb 14, 2022
4f11792
take into account initializing status
idegtiarenko Feb 15, 2022
7e7aba5
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Feb 15, 2022
4419524
simplify indicator
idegtiarenko Feb 21, 2022
8bfdc86
handle restarting shards
idegtiarenko Feb 22, 2022
805b7e9
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Feb 22, 2022
04006fe
add warning note
idegtiarenko Feb 22, 2022
b0c8da5
fix comments
idegtiarenko Feb 23, 2022
0be8545
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Feb 23, 2022
fabf72b
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Mar 1, 2022
bb08288
ignore when primary is unavailable during restart
idegtiarenko Mar 1, 2022
2a52d81
become red when `AllocationDelay` is reached
idegtiarenko Mar 2, 2022
a9802b5
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Mar 2, 2022
cf2f4de
use getUnassignedTimeInMillis
idegtiarenko Mar 3, 2022
07013a7
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Mar 3, 2022
1f33992
upd
idegtiarenko Mar 3, 2022
a8dc1a8
upd
idegtiarenko Mar 3, 2022
8ad8ca0
include reallocating shards
idegtiarenko Mar 3, 2022
be0666a
update messages
idegtiarenko Mar 4, 2022
49bcc79
fmt
idegtiarenko Mar 4, 2022
aa0e9c6
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Mar 4, 2022
d2cc9d3
use monotonic time
idegtiarenko Mar 4, 2022
86264ea
some renaming
idegtiarenko Mar 4, 2022
39703b4
Merge branch 'master' into 83240_shards_health_indicator
idegtiarenko Mar 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/83513.yaml
@@ -0,0 +1,5 @@
pr: 83513
summary: Shards allocation health indicator services
area: Health
type: enhancement
issues: []
3 changes: 3 additions & 0 deletions docs/reference/index-modules.asciidoc
Expand Up @@ -163,6 +163,9 @@ specific index module:

The number of replicas each primary shard has. Defaults to 1.

WARNING: Configuring it to 0 may lead to temporary availability loss
during node restarts or permanent data loss in case of data corruption.

[[dynamic-index-auto-expand-replicas]]
`index.auto_expand_replicas`::
Auto-expand the number of replicas based on the number of data nodes in the
Expand Down
Expand Up @@ -41,6 +41,7 @@
public class NodesShutdownMetadata implements Metadata.Custom {
public static final String TYPE = "node_shutdown";
public static final Version NODE_SHUTDOWN_VERSION = Version.V_7_13_0;
public static final NodesShutdownMetadata EMPTY = new NodesShutdownMetadata(Map.of());

private static final ParseField NODES_FIELD = new ParseField("nodes");

Expand Down Expand Up @@ -70,17 +71,17 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE

public static Optional<NodesShutdownMetadata> getShutdowns(final ClusterState state) {
assert state != null : "cluster state should never be null";
return Optional.ofNullable(state).map(ClusterState::metadata).map(m -> m.custom(TYPE));
return Optional.of(state).map(ClusterState::metadata).map(m -> m.custom(TYPE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the line above it is always not null

}

private final Map<String, SingleNodeShutdownMetadata> nodes;

public NodesShutdownMetadata(Map<String, SingleNodeShutdownMetadata> nodes) {
this.nodes = nodes;
this.nodes = Collections.unmodifiableMap(nodes);
}

public NodesShutdownMetadata(StreamInput in) throws IOException {
this.nodes = in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new);
this(in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new));
}

@Override
Expand All @@ -92,7 +93,7 @@ public void writeTo(StreamOutput out) throws IOException {
* @return A map of NodeID to shutdown metadata.
*/
public Map<String, SingleNodeShutdownMetadata> getAllNodeMetadataMap() {
return Collections.unmodifiableMap(nodes);
return nodes;
}

/**
Expand Down
@@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.SimpleHealthIndicatorDetails;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
import static org.elasticsearch.health.HealthStatus.GREEN;
import static org.elasticsearch.health.HealthStatus.RED;
import static org.elasticsearch.health.HealthStatus.YELLOW;
import static org.elasticsearch.health.ServerHealthComponents.DATA;

/**
* This indicator reports health for shards.
* <p>
* Indicator will report:
* * RED when one or more primary shards are not available
* * YELLOW when one or more replica shards are not available
* * GREEN otherwise
* <p>
* Each shard needs to be available and replicated in order to guarantee high availability and prevent data loses.
* Shards allocated on nodes scheduled for restart (using nodes shutdown API) will not degrade this indicator health.
idegtiarenko marked this conversation as resolved.
Show resolved Hide resolved
*/
public class ShardsAvailabilityHealthIndicatorService implements HealthIndicatorService {

public static final String NAME = "shards_availability";

private final ClusterService clusterService;

public ShardsAvailabilityHealthIndicatorService(ClusterService clusterService) {
this.clusterService = clusterService;
}

@Override
public String name() {
return NAME;
}

@Override
public String component() {
return DATA;
}

@Override
public HealthIndicatorResult calculate() {
var state = clusterService.state();
var shutdown = state.getMetadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY);
var status = new ShardAllocationStatus();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we construct this with the shutdown metadata instead, such that we only pass the node shutdowns in the constructor? Would look cleaner to me. Also, can we pass just a Function<String, SingleNodeShutdownMetadata like:

Suggested change
var status = new ShardAllocationStatus();
var status = new ShardAllocationStatus(shutdown.getAllNodeMetadataMap()::get);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodes shutdown is not required for output and only used for calculation. I believe it is better to keep such dependencies as arguments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree, by having the addPrimary/Replica methods accept the shutdowns input as well, a specific implementation is dictated, where the information is resolved immediately rather than on output. But given the localness of the code, I can live with how it is for now.

I would still like to see us just pass around Function<String, SingleNodeShutdownMetadata> rather than the full metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still like to see us just pass around Function<String, SingleNodeShutdownMetadata> rather than the full metadata.

I agree with Henning on this


for (IndexRoutingTable indexShardRouting : state.routingTable()) {
for (IndexShardRoutingTable shardRouting : indexShardRouting) {
status.addPrimary(shardRouting.primaryShard(), shutdown);
for (ShardRouting replicaShard : shardRouting.replicaShards()) {
status.addReplica(replicaShard, shutdown);
}
}
}

return createIndicator(status.getStatus(), status.getSummary(), status.getDetails());
}

private static class ShardAllocationCounts {
private boolean available = true;
private int unassigned = 0;
private int unassigned_new = 0;
private int unassigned_restarting = 0;
private int initializing = 0;
private int started = 0;
private int reallocating = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to relocating?


public void increment(ShardRouting routing, NodesShutdownMetadata metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us rename metadata to shutdowns.

boolean isNew = isUnassignedDueToNewInitialization(routing);
boolean isRestarting = isUnassignedDueToTimelyRestart(routing, metadata);
available &= routing.active() || isRestarting || isNew;

switch (routing.state()) {
case UNASSIGNED -> {
if (isNew) {
unassigned_new++;
} else if (isRestarting) {
unassigned_restarting++;
} else {
unassigned++;
}
}
case INITIALIZING -> initializing++;
case STARTED -> started++;
idegtiarenko marked this conversation as resolved.
Show resolved Hide resolved
case RELOCATING -> reallocating++;
}
}
}

private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us rename metadata to shutdowns.

var info = routing.unassignedInfo();
if (info == null || info.getReason() != UnassignedInfo.Reason.NODE_RESTARTING) {
return false;
}
var shutdown = metadata.getAllNodeMetadataMap().get(info.getLastAllocatedNodeId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to retrive the Map<String, SingleNodeShutdownMetadata> once (cf Henning's comment)

if (shutdown == null || shutdown.getType() != SingleNodeShutdownMetadata.Type.RESTART) {
return false;
}
var now = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to pass a LongSupplier to unit test this more easily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this is used in a static method that is used in a static inner class. Passing the reference to the LongSupplier might make this code more complex. Currently the unit test injects the time of the entity relative to the current and does not rely on injecting time to the class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no need to hold on this. I did not realize it was used in a static context. I'm just advocating on trying to avoid System.currentTimeMillis() in tests and use monotonic clocks instead, or maybe just some time supplier as it is even more simple.

var restartingAllocationDelayExpiration = info.getUnassignedTimeInMillis() + shutdown.getAllocationDelay().getMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use info.getUnassignedTimeInNanos() as this is the one used to calculate the delay for delayed shard allocation?

return now <= restartingAllocationDelayExpiration;
}

private static boolean isUnassignedDueToNewInitialization(ShardRouting routing) {
return routing.primary() && routing.active() == false && getInactivePrimaryHealth(routing) == ClusterHealthStatus.YELLOW;
}

private static class ShardAllocationStatus {
private final ShardAllocationCounts primaries = new ShardAllocationCounts();
private final ShardAllocationCounts replicas = new ShardAllocationCounts();

public void addPrimary(ShardRouting routing, NodesShutdownMetadata metadata) {
primaries.increment(routing, metadata);
}

public void addReplica(ShardRouting routing, NodesShutdownMetadata metadata) {
replicas.increment(routing, metadata);
}

public HealthStatus getStatus() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ideally also trigger on initializing shards. This may have some complication to integrate with restart though if the operator/orchestration waits for green or yellow _cluster/health before removing the restart indication it might work out ok.

I think deferring to a follow-up is fine, but perhaps add a comment here to aid the next reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to the handover document

if (primaries.available == false) {
return RED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to special handle new indices similar to ClusterShardHealth. I think we should report green for now for shards of such new indices in this PR, since there will need to be some interaction with allocation to see if this is unassigned due to throttling or because the shard cannot be assigned at all currently.

A separate count of the "new" unassigned/initializing shards seems necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

} else if (replicas.available == false) {
return YELLOW;
} else {
return GREEN;
}
}

public String getSummary() {
var builder = new StringBuilder("This cluster has ");
if (primaries.unassigned > 0
|| primaries.unassigned_new > 0
|| primaries.unassigned_restarting > 0
|| replicas.unassigned > 0
|| replicas.unassigned_restarting > 0) {
builder.append(
Stream.of(
createMessage(primaries.unassigned, "unavailable primary", " unavailable primaries"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: did you consider implement a getSummary() method in ShardAllocationCounts directly, and call the methods here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I did not come up with a good way to merge distinct messages. It would require appending primary/primaries/replica/replicas suffixes as well as properly setting comas.

createMessage(primaries.unassigned_new, "creating primary", " creating primaries"),
createMessage(primaries.unassigned_restarting, "restarting primary", " restarting primaries"),
createMessage(replicas.unassigned, "unavailable replica", "unavailable replicas"),
createMessage(replicas.unassigned_restarting, "restarting replica", "restarting replicas")
).flatMap(Function.identity()).collect(joining(" , "))
).append(".");
} else {
builder.append("all shards available.");
}
return builder.toString();
}

private static Stream<String> createMessage(int count, String singular, String plural) {
return switch (count) {
case 0 -> Stream.empty();
case 1 -> Stream.of("1 " + singular);
default -> Stream.of(count + " " + plural);
};
}

public SimpleHealthIndicatorDetails getDetails() {
return new SimpleHealthIndicatorDetails(
Map.of(
"unassigned_primaries",
primaries.unassigned,
"initializing_primaries",
primaries.initializing,
"creating_primaries",
primaries.unassigned_new,
"restarting_primaries",
primaries.unassigned_restarting,
"started_primaries",
primaries.started + primaries.reallocating,
"unassigned_replicas",
replicas.unassigned,
"initializing_replicas",
replicas.initializing,
"restarting_replicas",
replicas.unassigned_restarting,
"started_replicas",
replicas.started + replicas.reallocating
)
);
}
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -1043,7 +1044,8 @@ protected Node(
private HealthService createHealthService(ClusterService clusterService) {
var serverHealthIndicatorServices = List.of(
new InstanceHasMasterHealthIndicatorService(clusterService),
new RepositoryIntegrityHealthIndicatorService(clusterService)
new RepositoryIntegrityHealthIndicatorService(clusterService),
new ShardsAvailabilityHealthIndicatorService(clusterService)
);
var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
.stream()
Expand Down