Skip to content

Commit ae0f1a6

Browse files
authored
Remove INDEX_REFRESH_BLOCK after index becomes searchable (#120807)
This commit enhances the ShardStartedClusterStateTaskExecutor by introducing functionality to automatically remove the INDEX_REFRESH_BLOCK once an index becomes searchable. The change ensures search availability by checking that at least one copy of each searchable shard is available whenever an unpromotable shard is started. Once this condition is met, the INDEX_REFRESH_BLOCK is removed. Closes ES-10278
1 parent d3f20e5 commit ae0f1a6

File tree

4 files changed

+217
-8
lines changed

4 files changed

+217
-8
lines changed

docs/changelog/120807.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120807
2+
summary: Remove INDEX_REFRESH_BLOCK after index becomes searchable
3+
area: CRUD
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2424
import org.elasticsearch.cluster.ClusterStateTaskListener;
2525
import org.elasticsearch.cluster.NotMasterException;
26+
import org.elasticsearch.cluster.block.ClusterBlock;
27+
import org.elasticsearch.cluster.block.ClusterBlocks;
2628
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
2729
import org.elasticsearch.cluster.metadata.IndexMetadata;
2830
import org.elasticsearch.cluster.metadata.Metadata;
@@ -70,6 +72,7 @@
7072

7173
import static org.apache.logging.log4j.Level.DEBUG;
7274
import static org.apache.logging.log4j.Level.ERROR;
75+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
7376
import static org.elasticsearch.cluster.service.MasterService.isPublishFailureException;
7477
import static org.elasticsearch.core.Strings.format;
7578

@@ -619,6 +622,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
619622
List<TaskContext<StartedShardUpdateTask>> tasksToBeApplied = new ArrayList<>();
620623
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size());
621624
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
625+
Set<Index> indicesWithUnpromotableShardsStarted = null;
622626
final Map<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<>();
623627
final ClusterState initialState = batchExecutionContext.initialState();
624628
for (var taskContext : batchExecutionContext.taskContexts()) {
@@ -737,6 +741,14 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
737741
new ClusterStateTimeRanges(newTimestampMillisRange, newEventIngestedMillisRange)
738742
);
739743
}
744+
745+
if (matched.isPromotableToPrimary() == false
746+
&& initialState.blocks().hasIndexBlock(index.getName(), INDEX_REFRESH_BLOCK)) {
747+
if (indicesWithUnpromotableShardsStarted == null) {
748+
indicesWithUnpromotableShardsStarted = new HashSet<>();
749+
}
750+
indicesWithUnpromotableShardsStarted.add(index);
751+
}
740752
}
741753
}
742754
}
@@ -760,7 +772,10 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
760772
maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
761773
}
762774

775+
maybeUpdatedState = maybeRemoveIndexRefreshBlocks(maybeUpdatedState, indicesWithUnpromotableShardsStarted);
776+
763777
assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);
778+
assert assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(maybeUpdatedState);
764779

765780
for (final var taskContext : tasksToBeApplied) {
766781
final var task = taskContext.getTask();
@@ -776,6 +791,36 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
776791
return maybeUpdatedState;
777792
}
778793

794+
private static ClusterState maybeRemoveIndexRefreshBlocks(
795+
ClusterState clusterState,
796+
@Nullable Set<Index> indicesWithUnpromotableShardsStarted
797+
) {
798+
// The provided cluster state must include the newly STARTED unpromotable shards
799+
if (indicesWithUnpromotableShardsStarted == null) {
800+
return clusterState;
801+
}
802+
803+
ClusterBlocks.Builder clusterBlocksBuilder = null;
804+
for (Index indexWithUnpromotableShardsStarted : indicesWithUnpromotableShardsStarted) {
805+
String indexName = indexWithUnpromotableShardsStarted.getName();
806+
assert clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK) : indexWithUnpromotableShardsStarted;
807+
808+
var indexRoutingTable = clusterState.routingTable().index(indexWithUnpromotableShardsStarted);
809+
if (indexRoutingTable.readyForSearch()) {
810+
if (clusterBlocksBuilder == null) {
811+
clusterBlocksBuilder = ClusterBlocks.builder(clusterState.blocks());
812+
}
813+
clusterBlocksBuilder.removeIndexBlock(indexName, INDEX_REFRESH_BLOCK);
814+
}
815+
}
816+
817+
if (clusterBlocksBuilder == null) {
818+
return clusterState;
819+
}
820+
821+
return ClusterState.builder(clusterState).blocks(clusterBlocksBuilder).build();
822+
}
823+
779824
private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
780825
for (Map.Entry<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting().entrySet()) {
781826
assert cursor.getValue().allPrimaryShardsActive() == false
@@ -799,6 +844,16 @@ private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterSt
799844
return true;
800845
}
801846

847+
private static boolean assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(ClusterState clusterState) {
848+
for (Map.Entry<String, Set<ClusterBlock>> indexBlock : clusterState.blocks().indices().entrySet()) {
849+
if (indexBlock.getValue().contains(INDEX_REFRESH_BLOCK)) {
850+
assert clusterState.routingTable().index(indexBlock.getKey()).readyForSearch() == false
851+
: "Index [" + indexBlock.getKey() + "] is searchable but has an INDEX_REFRESH_BLOCK";
852+
}
853+
}
854+
return true;
855+
}
856+
802857
@Override
803858
public void clusterStatePublished(ClusterState newClusterState) {
804859
rerouteService.reroute(

server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,43 @@
1212
import org.elasticsearch.TransportVersions;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.support.ActionTestUtils;
15+
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.ESAllocationTestCase;
1718
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
1819
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardUpdateTask;
20+
import org.elasticsearch.cluster.block.ClusterBlocks;
1921
import org.elasticsearch.cluster.metadata.IndexMetadata;
2022
import org.elasticsearch.cluster.metadata.Metadata;
23+
import org.elasticsearch.cluster.routing.AllocationId;
24+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2125
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
26+
import org.elasticsearch.cluster.routing.RoutingTable;
2227
import org.elasticsearch.cluster.routing.ShardRouting;
2328
import org.elasticsearch.cluster.routing.ShardRoutingState;
2429
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2530
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
2631
import org.elasticsearch.common.Priority;
2732
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.core.Tuple;
2834
import org.elasticsearch.index.shard.IndexLongFieldRange;
2935
import org.elasticsearch.index.shard.ShardId;
3036
import org.elasticsearch.index.shard.ShardLongFieldRange;
3137

3238
import java.util.List;
39+
import java.util.stream.Collectors;
3340
import java.util.stream.IntStream;
3441
import java.util.stream.Stream;
3542

3643
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
3744
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
3845
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas;
3946
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard;
47+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
4048
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
4149
import static org.hamcrest.Matchers.equalTo;
4250
import static org.hamcrest.Matchers.is;
51+
import static org.hamcrest.Matchers.notNullValue;
4352
import static org.hamcrest.Matchers.sameInstance;
4453

4554
public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {
@@ -479,6 +488,114 @@ public void testExpandsTimestampRangeForReplica() throws Exception {
479488
assertThat(latestIndexMetadata.getEventIngestedRange(), sameInstance(IndexLongFieldRange.UNKNOWN));
480489
}
481490

491+
public void testIndexRefreshBlockIsClearedOnceTheIndexIsReadyToBeSearched() throws Exception {
492+
final var indexName = "test";
493+
final var numberOfShards = randomIntBetween(1, 4);
494+
final var numberOfReplicas = randomIntBetween(1, 4);
495+
var clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicasWithState(
496+
new String[] { indexName },
497+
numberOfShards,
498+
ShardRouting.Role.INDEX_ONLY,
499+
IntStream.range(0, numberOfReplicas)
500+
.mapToObj(unused -> Tuple.tuple(ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY))
501+
.toList()
502+
);
503+
504+
clusterState = ClusterState.builder(clusterState)
505+
.metadata(Metadata.builder(clusterState.metadata()).put(withActiveShardsInSyncAllocationIds(clusterState, indexName)))
506+
.blocks(ClusterBlocks.builder(clusterState.blocks()).addIndexBlock(indexName, INDEX_REFRESH_BLOCK))
507+
.build();
508+
509+
while (clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK)) {
510+
clusterState = maybeInitializeUnassignedReplicaShard(clusterState);
511+
512+
final IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
513+
514+
final var initializingReplicaShardOpt = clusterState.routingTable()
515+
.allShards()
516+
.filter(shardRouting -> shardRouting.isPromotableToPrimary() == false)
517+
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.INITIALIZING))
518+
.findFirst();
519+
520+
assertThat(clusterState.routingTable().allShards().toList().toString(), initializingReplicaShardOpt.isPresent(), is(true));
521+
522+
var initializingReplicaShard = initializingReplicaShardOpt.get();
523+
524+
final var shardId = initializingReplicaShard.shardId();
525+
final var primaryTerm = indexMetadata.primaryTerm(shardId.id());
526+
final var replicaAllocationId = initializingReplicaShard.allocationId().getId();
527+
final var task = new StartedShardUpdateTask(
528+
new StartedShardEntry(
529+
shardId,
530+
replicaAllocationId,
531+
primaryTerm,
532+
"test",
533+
ShardLongFieldRange.UNKNOWN,
534+
ShardLongFieldRange.UNKNOWN
535+
),
536+
createTestListener()
537+
);
538+
539+
final var resultingState = executeTasks(clusterState, List.of(task));
540+
assertNotSame(clusterState, resultingState);
541+
542+
clusterState = resultingState;
543+
}
544+
545+
var indexRoutingTable = clusterState.routingTable().index(indexName);
546+
assertThat(indexRoutingTable.readyForSearch(), is(true));
547+
for (int i = 0; i < numberOfShards; i++) {
548+
var shardRoutingTable = indexRoutingTable.shard(i);
549+
assertThat(shardRoutingTable, is(notNullValue()));
550+
// Ensure that at least one unpromotable shard is either STARTED or RELOCATING
551+
assertThat(shardRoutingTable.unpromotableShards().isEmpty(), is(false));
552+
}
553+
assertThat(clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK), is(false));
554+
}
555+
556+
private static ClusterState maybeInitializeUnassignedReplicaShard(ClusterState clusterState) {
557+
var unassignedShardRoutingOpt = clusterState.routingTable()
558+
.allShards()
559+
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.UNASSIGNED))
560+
.findFirst();
561+
562+
if (unassignedShardRoutingOpt.isEmpty()) {
563+
return clusterState;
564+
}
565+
566+
var unassignedShardRouting = unassignedShardRoutingOpt.get();
567+
var initializedShard = unassignedShardRouting.initialize(randomUUID(), null, 1);
568+
569+
RoutingTable routingTable = clusterState.routingTable();
570+
IndexRoutingTable indexRoutingTable = routingTable.index(unassignedShardRouting.getIndexName());
571+
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
572+
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
573+
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
574+
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
575+
ShardRouting shardRouting = shardRoutingTable.shard(copy);
576+
newIndexRoutingTable.addShard(shardRouting == unassignedShardRouting ? initializedShard : shardRouting);
577+
}
578+
}
579+
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
580+
return ClusterState.builder(clusterState).routingTable(routingTable).build();
581+
}
582+
583+
private static IndexMetadata.Builder withActiveShardsInSyncAllocationIds(ClusterState clusterState, String indexName) {
584+
IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(clusterState.metadata().index(indexName));
585+
var indexRoutingTable = clusterState.routingTable().index(indexName);
586+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable.allShards().toList()) {
587+
indexMetadataBuilder.putInSyncAllocationIds(
588+
indexShardRoutingTable.shardId().getId(),
589+
indexShardRoutingTable.activeShards()
590+
.stream()
591+
.map(ShardRouting::allocationId)
592+
.map(AllocationId::getId)
593+
.collect(Collectors.toSet())
594+
);
595+
}
596+
return indexMetadataBuilder;
597+
}
598+
482599
private ClusterState executeTasks(final ClusterState state, final List<StartedShardUpdateTask> tasks) throws Exception {
483600
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, executor, tasks);
484601
}

test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,34 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(
363363
int numberOfShards,
364364
List<ShardRouting.Role> replicaRoles
365365
) {
366-
int numberOfDataNodes = replicaRoles.size() + 1;
366+
return stateWithAssignedPrimariesAndReplicasWithState(
367+
indices,
368+
numberOfShards,
369+
replicaRoles.stream().map(role -> Tuple.tuple(ShardRoutingState.STARTED, role)).toList()
370+
);
371+
}
372+
373+
/**
374+
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
375+
*/
376+
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
377+
String[] indices,
378+
int numberOfShards,
379+
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicaRoleAndStates
380+
) {
381+
return stateWithAssignedPrimariesAndReplicasWithState(indices, numberOfShards, ShardRouting.Role.DEFAULT, replicaRoleAndStates);
382+
}
383+
384+
/**
385+
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
386+
*/
387+
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
388+
String[] indices,
389+
int numberOfShards,
390+
ShardRouting.Role primaryRole,
391+
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicasStateAndRoles
392+
) {
393+
int numberOfDataNodes = replicasStateAndRoles.size() + 1;
367394
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
368395
for (int i = 0; i < numberOfDataNodes + 1; i++) {
369396
final DiscoveryNode node = newNode(i);
@@ -383,7 +410,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(
383410
for (String index : indices) {
384411
IndexMetadata indexMetadata = IndexMetadata.builder(index)
385412
.settings(
386-
indexSettings(IndexVersion.current(), numberOfShards, replicaRoles.size()).put(
413+
indexSettings(IndexVersion.current(), numberOfShards, replicasStateAndRoles.size()).put(
387414
SETTING_CREATION_DATE,
388415
System.currentTimeMillis()
389416
)
@@ -397,14 +424,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(
397424
final ShardId shardId = new ShardId(index, "_na_", i);
398425
IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId);
399426
indexShardRoutingBuilder.addShard(
400-
TestShardRouting.newShardRouting(index, i, newNode(0).getId(), null, true, ShardRoutingState.STARTED)
427+
shardRoutingBuilder(index, i, newNode(0).getId(), true, ShardRoutingState.STARTED).withRole(primaryRole).build()
401428
);
402-
for (int replica = 0; replica < replicaRoles.size(); replica++) {
403-
indexShardRoutingBuilder.addShard(
404-
shardRoutingBuilder(index, i, newNode(replica + 1).getId(), false, ShardRoutingState.STARTED).withRole(
405-
replicaRoles.get(replica)
406-
).build()
429+
for (int replica = 0; replica < replicasStateAndRoles.size(); replica++) {
430+
var replicaStateAndRole = replicasStateAndRoles.get(replica);
431+
ShardRoutingState shardRoutingState = replicaStateAndRole.v1();
432+
String currentNodeId = shardRoutingState.equals(ShardRoutingState.UNASSIGNED) ? null : newNode(replica + 1).getId();
433+
var shardRoutingBuilder = shardRoutingBuilder(index, i, currentNodeId, false, shardRoutingState).withRole(
434+
replicaStateAndRole.v2()
407435
);
436+
if (shardRoutingState.equals(ShardRoutingState.RELOCATING)) {
437+
shardRoutingBuilder.withRelocatingNodeId(DiscoveryNodeUtils.create("relocating_" + replica).getId());
438+
}
439+
indexShardRoutingBuilder.addShard(shardRoutingBuilder.build());
408440
}
409441
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder);
410442
}

0 commit comments

Comments
 (0)