Skip to content

Commit

Permalink
Gather unassigned replicas corresponding to newly-created primary uni…
Browse files Browse the repository at this point in the history
…quely

Related to the work in elastic#101638 this changes the way we calculate whether all replicas are unassigned
when corresponding to newly created primaries. While this doesn't affect anything in Stateful ES on
its own, it's a building-block used for object-store-based ES (Serverless).

Semi-related to elastic#99951, though it does not solve (and does not strive to solve) that issue.
  • Loading branch information
dakrone committed Apr 22, 2024
1 parent 13dd169 commit 8b995eb
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 3 deletions.
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
Expand Down Expand Up @@ -440,7 +441,8 @@ public class ShardAllocationCounts {
public void increment(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
boolean isNew = isUnassignedDueToNewInitialization(routing, state);
boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns);
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state);
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state)
&& isNewlyCreatedAndInitializingReplica(routing, state) == false;
if (allUnavailable) {
indicesWithAllShardsUnavailable.add(routing.getIndexName());
}
Expand Down Expand Up @@ -498,7 +500,7 @@ private void addDefinition(Diagnosis.Definition diagnosisDefinition, String inde
* example: if a replica is passed then this will return true if ALL replicas are unassigned,
* but if at least one is assigned, it will return false.
*/
private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) {
boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) {
return StreamSupport.stream(
state.routingTable().allActiveShardsGrouped(new String[] { routing.getIndexName() }, true).spliterator(),
false
Expand All @@ -509,6 +511,29 @@ private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterS
.allMatch(ShardRouting::unassigned);
}

/**
* Returns true if the given shard is a replica that is only unassigned due to its primary being
* newly created. See {@link ClusterShardHealth#getInactivePrimaryHealth(ShardRouting)} for more
* information.
*
* We use this information when considering whether a cluster should turn red. For some cases
* (a newly created index having unassigned replicas for example), we don't want the cluster
* to turn "unhealthy" for the tiny amount of time before the shards are allocated.
*/
static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state) {
if (routing.active()) {
return false;
}
if (routing.primary()) {
return false;
}
ShardRouting primary = state.routingTable().shardRoutingTable(routing.shardId()).primaryShard();
if (primary.active()) {
return false;
}
return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
}

private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata shutdowns) {
var info = routing.unassignedInfo();
if (info == null || info.getReason() != UnassignedInfo.Reason.NODE_RESTARTING) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -407,6 +408,95 @@ public void testAllReplicasUnassigned() {
);
assertTrue(status.replicas.doAnyIndicesHaveAllUnavailable());
}
{
ClusterState clusterState = createClusterStateWith(
List.of(
indexNewlyCreated(
"myindex",
new ShardAllocation(
randomNodeId(),
CREATING,
new UnassignedInfo(
UnassignedInfo.Reason.NODE_LEFT,
"message",
null,
0,
0,
0,
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Set.of(),
null
)
), // Primary 1
new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 1
)
),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
// Here because the replica is unassigned due to the primary being created, it's treated as though the replica can be ignored.
assertFalse(
"an unassigned replica from a newly created and initializing primary "
+ "should not be treated as an index with all replicas unavailable",
status.replicas.doAnyIndicesHaveAllUnavailable()
);
}

/*
A couple of tests for
{@link ShardsAvailabilityHealthIndicatorService#areAllShardsOfThisTypeUnavailable(ShardRouting, ClusterState)}
*/
{
IndexRoutingTable routingTable = indexWithTwoPrimaryOneReplicaShard(
"myindex",
new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 1
new ShardAllocation(randomNodeId(), AVAILABLE), // Replica 1
new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 2
new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 2
);
ClusterState clusterState = createClusterStateWith(List.of(routingTable), List.of());
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
var shardRouting = routingTable.shardsWithState(ShardRoutingState.UNASSIGNED).get(0);
assertTrue(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState));
}
{
ClusterState clusterState = createClusterStateWith(
List.of(
index(
"myindex",
new ShardAllocation(randomNodeId(), AVAILABLE),
new ShardAllocation(randomNodeId(), AVAILABLE),
new ShardAllocation(randomNodeId(), UNAVAILABLE)
)
),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
var shardRouting = clusterState.routingTable().index("myindex").shardsWithState(ShardRoutingState.UNASSIGNED).get(0);
assertFalse(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState));
}
}

public void testShouldBeRedWhenThereAreUnassignedPrimariesAndNoReplicas() {
Expand Down Expand Up @@ -1913,6 +2003,88 @@ public void testMappedFieldsForTelemetry() {
}
}

public void testIsNewlyCreatedAndInitializingReplica() {
ShardId id = new ShardId("index", "uuid", 0);
IndexMetadata idxMeta = IndexMetadata.builder("index")
.numberOfShards(1)
.numberOfReplicas(1)
.settings(
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.version.created", IndexVersion.current())
.put("index.uuid", "uuid")
.build()
)
.build();
ShardRouting primary = createShardRouting(id, true, new ShardAllocation("node", AVAILABLE));
var state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of());
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(primary, state));

ShardRouting replica = createShardRouting(id, false, new ShardAllocation("node", AVAILABLE));
state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of());
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(replica, state));

ShardRouting unassignedReplica = createShardRouting(id, false, new ShardAllocation("node", UNAVAILABLE));
state = createClusterStateWith(
List.of(idxMeta),
List.of(index("index", "uuid", new ShardAllocation("node", UNAVAILABLE))),
List.of(),
List.of()
);
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unassignedReplica, state));

ShardAllocation allocation = new ShardAllocation(
"node",
UNAVAILABLE,
new UnassignedInfo(
UnassignedInfo.Reason.NODE_LEFT,
"message",
null,
0,
0,
0,
randomBoolean(),
UnassignedInfo.AllocationStatus.DECIDERS_NO,
Set.of(),
null
)
);
ShardRouting decidersNoReplica = createShardRouting(id, false, allocation);
state = createClusterStateWith(
List.of(idxMeta),
List.of(index(idxMeta, new ShardAllocation("node", UNAVAILABLE), allocation)),
List.of(),
List.of()
);
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(decidersNoReplica, state));

allocation = new ShardAllocation(
"node",
UNAVAILABLE,
new UnassignedInfo(
UnassignedInfo.Reason.NODE_LEFT,
"message",
null,
0,
0,
0,
randomBoolean(),
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Set.of(),
null
)
);
ShardRouting newlyInitializingReplica = createShardRouting(id, false, allocation);
state = createClusterStateWith(
List.of(idxMeta),
List.of(index(idxMeta, new ShardAllocation("node", CREATING), allocation)),
List.of(),
List.of()
);
assertTrue(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(newlyInitializingReplica, state));
}

private HealthIndicatorResult createExpectedResult(
HealthStatus status,
String symptom,
Expand Down Expand Up @@ -2038,9 +2210,18 @@ private static Map<String, Object> addDefaults(Map<String, Object> override) {
}

private static IndexRoutingTable index(String name, ShardAllocation primaryState, ShardAllocation... replicaStates) {
return index(name, "_na_", primaryState, replicaStates);
}

private static IndexRoutingTable index(String name, String uuid, ShardAllocation primaryState, ShardAllocation... replicaStates) {
return index(
IndexMetadata.builder(name)
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build())
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
.put(IndexMetadata.SETTING_INDEX_UUID, uuid)
.build()
)
.numberOfShards(1)
.numberOfReplicas(replicaStates.length)
.build(),
Expand All @@ -2049,6 +2230,21 @@ private static IndexRoutingTable index(String name, ShardAllocation primaryState
);
}

private static IndexRoutingTable indexNewlyCreated(String name, ShardAllocation primary1State, ShardAllocation replica1State) {
var indexMetadata = IndexMetadata.builder(name)
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build())
.numberOfShards(1)
.numberOfReplicas(1)
.build();
var index = indexMetadata.getIndex();
var shard1Id = new ShardId(index, 0);

var builder = IndexRoutingTable.builder(index);
builder.addShard(createShardRouting(shard1Id, true, primary1State));
builder.addShard(createShardRouting(shard1Id, false, replica1State));
return builder.build();
}

private static IndexRoutingTable indexWithTwoPrimaryOneReplicaShard(
String name,
ShardAllocation primary1State,
Expand Down

0 comments on commit 8b995eb

Please sign in to comment.