From 840b91ae7a4d460fc7a14fd32e76cd398456d971 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 21 Oct 2025 10:21:34 -0400 Subject: [PATCH 01/13] commit --- .../admin/indices/flush/ShardFlushRequest.java | 7 +++++++ .../admin/indices/flush/TransportFlushAction.java | 12 ++++++++++-- .../indices/refresh/TransportRefreshAction.java | 3 ++- .../TransportBroadcastReplicationAction.java | 12 +++++++++--- .../replication/BroadcastReplicationTests.java | 3 ++- 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 62f01561fc644..aa6e0f4b785dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -27,6 +28,12 @@ public ShardFlushRequest(FlushRequest request, ShardId shardId) { this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } + public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { + super(shardId, reshardSplitShardCountSummary); + this.request = request; + this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default + } + public ShardFlushRequest(StreamInput in) throws IOException { super(in); request = new FlushRequest(in); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 6771dc445fc15..df9bb1d91012d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -17,7 +17,9 @@ import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; @@ -59,8 +61,14 @@ public TransportFlushAction( } @Override - protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) { - return new ShardFlushRequest(request, shardId); + protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, ProjectMetadata project) { + // Get effective shardCount for shardId and pass it on as parameter to new ShardFlushRequest + var indexMetadata = project.index(shardId.getIndexName()); + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + if (indexMetadata != null) { + reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + } + return new ShardFlushRequest(request, shardId, reshardSplitShardCountSummary); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index f804dc9ffe907..6a5f8b3a6e2a6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.shard.ShardId; @@ -60,7 +61,7 @@ public TransportRefreshAction( } @Override - protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) { + protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, ProjectMetadata project) { BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId); replicationRequest.waitForActiveShards(ActiveShardCount.NONE); return replicationRequest; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index aeb44696e5134..49fd56a91529c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -112,6 +112,7 @@ public void accept(ActionListener listener) { task, request, shardId, + project, // needed to calculate the correct SplitShardCountSummary down the stack ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()) ); } @@ -178,9 +179,14 @@ public void onFailure(Exception e) { }; } - protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener shardActionListener) { + protected void shardExecute( + Task task, + Request request, + ShardId shardId, + ProjectMetadata project, + ActionListener shardActionListener) { assert Transports.assertNotTransportThread("may hit all the shards"); - ShardRequest shardRequest = newShardRequest(request, shardId); + ShardRequest shardRequest = newShardRequest(request, shardId, project); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener); } @@ -204,7 +210,7 @@ protected List shards(Request request, ProjectState projectState) { return shardIds; } - protected abstract ShardRequest newShardRequest(Request request, ShardId shardId); + protected abstract ShardRequest newShardRequest(Request request, ShardId shardId, ProjectMetadata project); protected abstract Response newResponse( int successfulShards, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 15b66ed32dbad..4d11d0b456714 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -285,7 +285,7 @@ private class TestBroadcastReplicationAction extends TransportBroadcastReplicati } @Override - protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId) { + protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId, ProjectMetadata project) { return new BasicReplicationRequest(shardId); } @@ -304,6 +304,7 @@ protected void shardExecute( Task task, DummyBroadcastRequest request, ShardId shardId, + ProjectMetadata project, ActionListener shardActionListener ) { capturedShardRequests.add(new Tuple<>(shardId, shardActionListener)); From dfd2a308e1b6b46d967347fbed55ab51c45b2874 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 21 Oct 2025 14:30:25 +0000 Subject: [PATCH 02/13] [CI] Auto commit changes from spotless --- .../replication/TransportBroadcastReplicationAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 49fd56a91529c..4864bdcb86ec9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -184,7 +184,8 @@ protected void shardExecute( Request request, ShardId shardId, ProjectMetadata project, - ActionListener shardActionListener) { + ActionListener shardActionListener + ) { assert Transports.assertNotTransportThread("may hit all the shards"); ShardRequest shardRequest = newShardRequest(request, shardId, project); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); From 3048e2010657844d18311085c463b7106f2c5386 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 23 Oct 2025 11:42:59 -0400 Subject: [PATCH 03/13] commit --- server/src/main/java/module-info.java | 1 + .../indices/flush/TransportFlushAction.java | 7 ++-- .../flush/TransportShardFlushAction.java | 33 ++++++++++++++++++- .../action/bulk/BulkOperation.java | 6 +++- .../action/bulk/ShardBulkSplitHelper.java | 17 ++++++++-- .../metadata/IndexReshardingState.java | 14 ++++++++ 6 files changed, 69 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index f50f130366f9a..c940e2c16a594 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -55,6 +55,7 @@ requires org.apache.lucene.queryparser; requires org.apache.lucene.sandbox; requires org.apache.lucene.suggest; + requires org.elasticsearch.server; exports org.elasticsearch; exports org.elasticsearch.action; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index df9bb1d91012d..bb17a88fa8f42 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -63,11 +63,8 @@ public TransportFlushAction( @Override protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, ProjectMetadata project) { // Get effective shardCount for shardId and pass it on as parameter to new ShardFlushRequest - var indexMetadata = project.index(shardId.getIndexName()); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; - if (indexMetadata != null) { - reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - } + var indexMetadata = project.getIndexSafe(shardId.getIndex()); + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); return new ShardFlushRequest(request, shardId, reshardSplitShardCountSummary); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index c0a3e568ffeeb..80cdbd4c4f915 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -12,10 +12,15 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -32,12 +37,17 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TransportShardFlushAction extends TransportReplicationAction { public static final String NAME = FlushAction.NAME + "[s]"; public static final ActionType TYPE = new ActionType<>(NAME); + protected final ProjectResolver projectResolver; + @Inject public TransportShardFlushAction( Settings settings, @@ -46,7 +56,8 @@ public TransportShardFlushAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( settings, @@ -64,6 +75,7 @@ public TransportShardFlushAction( PrimaryActionExecution.RejectOnOverload, ReplicaActionExecution.SubjectToCircuitBreaker ); + this.projectResolver = projectResolver; transportService.registerRequestHandler( PRE_SYNCED_FLUSH_ACTION_NAME, threadPool.executor(ThreadPool.Names.FLUSH), @@ -89,6 +101,25 @@ protected void shardOperationOnPrimary( })); } + // We are here because there was mismatch between the SplitShardCountSummary in the flush request + // and that on the primary shard node. We assume that the request is exactly 1 reshard split behind + // the current state. + @Override + protected Map splitRequestOnPrimary(ShardFlushRequest request) { + ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); + final ShardId sourceShard = request.shardId(); + IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); + SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId()); + Map requestsByShard = new HashMap<>(); + requestsByShard.put(sourceShard, request); + // Create a request for original source shard and for each target shard. + // New requests that are to be handled by target shards should contain the + // latest ShardCountSummary. + int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); + ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); + requestsByShard.put(targetShard, new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)); + } + @Override protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { replica.flush(request.getRequest(), listener.map(flushed -> { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 69492c67f6ba1..ca095ccdcc001 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -403,11 +403,15 @@ private void executeBulkRequestsByShard( final List requests = entry.getValue(); // Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest + var indexMetadata = project.getIndexSafe(shardId.getIndex()); + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + /* var indexMetadata = project.index(shardId.getIndexName()); SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; if (indexMetadata != null) { reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); } + */ BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, reshardSplitShardCountSummary, @@ -416,7 +420,7 @@ private void executeBulkRequestsByShard( bulkRequest.isSimulated() ); - if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) { + if (indexMetadata.getInferenceFields().isEmpty() == false) { bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields()); } bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/ShardBulkSplitHelper.java b/server/src/main/java/org/elasticsearch/action/bulk/ShardBulkSplitHelper.java index fa8c32057d34e..3762943e85d4c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/ShardBulkSplitHelper.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/ShardBulkSplitHelper.java @@ -10,8 +10,10 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.IndexRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -33,7 +35,9 @@ private ShardBulkSplitHelper() {} public static Map splitRequests(BulkShardRequest request, ProjectMetadata project) { final ShardId sourceShardId = request.shardId(); final Index index = sourceShardId.getIndex(); - IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index)); + IndexMetadata indexMetadata = project.getIndexSafe(index); + IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata); + SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, request.shardId().getId()); Map> requestsByShard = new HashMap<>(); Map bulkRequestsPerShard = new HashMap<>(); @@ -57,17 +61,26 @@ public static Map splitRequests(BulkShardRequest requ // All items belong to either the source shard or target shard. if (requestsByShard.size() == 1) { - // Return the original request if no items were split to target. + // Return the original request if no items were split to target. Note that + // this original request still contains the stale SplitShardCountSummary. + // This is alright because we hold primary indexing permits while calling this split + // method and we execute this request on the primary without letting go of the indexing permits. + // This means that a second split cannot occur in the meantime. if (requestsByShard.containsKey(sourceShardId)) { return Map.of(sourceShardId, request); } } + // Create a new BulkShardRequest(s) with the updated SplitShardCountSummary. This is because + // we do not hold primary permits on the target shard, and hence it can proceed with + // a second split operation while this request is still pending. We must verify the + // SplitShardCountSummary again on the target. for (Map.Entry> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, + shardCountSummary, request.getRefreshPolicy(), requests.toArray(new BulkItemRequest[0]), request.isSimulated() diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java index 2a9ea56b17c47..9b9faa9b574f0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java @@ -268,10 +268,24 @@ TargetShardState[] targetShards() { return targetShards.clone(); } + /** Return the source shard from which this target shard was split + * @param targetShard target shard id + * @return source shard id + */ public int sourceShard(int targetShard) { return targetShard % shardCountBefore(); } + /** Return the new target shard that is split from the given source shard + * This calculation assumes we only always double the number of shards in + * a reshard split operation, so that only one target shard is created per source shard. + * @param sourceShard source shard id + * @return target shard id + */ + public int targetShard(int sourceShard) { + return (sourceShard + shardCountBefore()); + } + /** * Create resharding metadata representing a new split operation * Split only supports updating an index to a multiple of its current shard count From 7dfe6fbf1049042050154c0402aecd33892b9ffa Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 23 Oct 2025 11:54:40 -0400 Subject: [PATCH 04/13] commit --- server/src/main/java/module-info.java | 1 - .../action/admin/indices/flush/TransportShardFlushAction.java | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 434f57a6a52da..fd50628539ebd 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -55,7 +55,6 @@ requires org.apache.lucene.queryparser; requires org.apache.lucene.sandbox; requires org.apache.lucene.suggest; - requires org.elasticsearch.server; exports org.elasticsearch; exports org.elasticsearch.action; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 80cdbd4c4f915..2ddc80682ec59 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -118,6 +118,7 @@ protected Map splitRequestOnPrimary(ShardFlushReques int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); requestsByShard.put(targetShard, new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)); + return requestsByShard; } @Override From 4bf8d9a262e67d8bf75d2ae60abea4d577ba2ea0 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 23 Oct 2025 15:17:50 -0400 Subject: [PATCH 05/13] commit --- .../flush/TransportShardFlushAction.java | 39 ++++++++++++++++++- .../TransportBroadcastReplicationAction.java | 3 +- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 2ddc80682ec59..62bfb5810f050 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -25,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -37,6 +37,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -121,6 +123,41 @@ protected Map splitRequestOnPrimary(ShardFlushReques return requestsByShard; } + protected Tuple combineSplitResponses( + ShardFlushRequest originalRequest, + Map splitRequests, + Map> responses + ) { + int failed = 0; + int successful = 0; + int total = 0; + List failures = new ArrayList<>(); + + // Case 1: Both source and target shards return a response: Add up total, successful, failures + // Case 2: Both source and target shards return an exception : return exception + // Case 3: One shards returns a response, the other returns an exception : return exception + for (Map.Entry> entry : responses.entrySet()) { + ShardId shardId = entry.getKey(); + Tuple value = entry.getValue(); + Exception exception = value.v2(); + if (exception != null) { + return new Tuple<>(null, exception); + } else { + ReplicationResponse response = value.v1(); + failed += response.getShardInfo().getFailed(); + successful += response.getShardInfo().getSuccessful(); + total += response.getShardInfo().getTotal(); + Collections.addAll(failures, response.getShardInfo().getFailures()); + } + } + ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]); + assert failureArray.length == failed; + ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray); + ReplicationResponse response = new ReplicationResponse(); + response.setShardInfo(shardInfo); + return new Tuple<>(response, null); + } + @Override protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { replica.flush(request.getRequest(), listener.map(flushed -> { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 49fd56a91529c..4864bdcb86ec9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -184,7 +184,8 @@ protected void shardExecute( Request request, ShardId shardId, ProjectMetadata project, - ActionListener shardActionListener) { + ActionListener shardActionListener + ) { assert Transports.assertNotTransportThread("may hit all the shards"); ShardRequest shardRequest = newShardRequest(request, shardId, project); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); From e5c65b7e0c5a90b1ce778dcc4e7015bb7ad5aec9 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 30 Oct 2025 13:53:53 -0400 Subject: [PATCH 06/13] commit --- .../indices/flush/ShardFlushRequest.java | 5 ++ .../flush/TransportShardFlushAction.java | 4 +- .../refresh/TransportRefreshAction.java | 6 +- .../refresh/TransportShardRefreshAction.java | 74 ++++++++++++++++++- .../replication/BasicReplicationRequest.java | 10 +++ 5 files changed, 96 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index aa6e0f4b785dc..f0fc4eda5720c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -28,6 +28,11 @@ public ShardFlushRequest(FlushRequest request, ShardId shardId) { this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } + /** + * Creates a request for a resolved shard id and SplitShardCountSummary (used + * to determine if the request needs to be executed on a split shard not yet seen by the + * coordinator that sent the request) + */ public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { super(shardId, reshardSplitShardCountSummary); this.request = request; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 62bfb5810f050..1e8924acbb751 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -103,7 +103,7 @@ protected void shardOperationOnPrimary( })); } - // We are here because there was mismatch between the SplitShardCountSummary in the flush request + // We are here because there was a mismatch between the SplitShardCountSummary in the request // and that on the primary shard node. We assume that the request is exactly 1 reshard split behind // the current state. @Override @@ -123,6 +123,7 @@ protected Map splitRequestOnPrimary(ShardFlushReques return requestsByShard; } + @Override protected Tuple combineSplitResponses( ShardFlushRequest originalRequest, Map splitRequests, @@ -133,6 +134,7 @@ protected Tuple combineSplitResponses( int total = 0; List failures = new ArrayList<>(); + // If the action fails on either one of the shards, we return an exception. // Case 1: Both source and target shards return a response: Add up total, successful, failures // Case 2: Both source and target shards return an exception : return exception // Case 3: One shards returns a response, the other returns an exception : return exception diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 6a5f8b3a6e2a6..f46ff106ba130 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; @@ -62,7 +63,10 @@ public TransportRefreshAction( @Override protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, ProjectMetadata project) { - BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId); + // Get effective shardCount for shardId and pass it on as parameter to new shard request + var indexMetadata = project.getIndexSafe(shardId.getIndex()); + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId, reshardSplitShardCountSummary); replicationRequest.waitForActiveShards(ActiveShardCount.NONE); return replicationRequest; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 3dc3e19dcb979..ef6f0e94b2f30 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -13,17 +13,24 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.flush.ShardFlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.logging.LogManager; @@ -32,6 +39,11 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; public class TransportShardRefreshAction extends TransportReplicationAction< @@ -46,6 +58,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction< public static final String SOURCE_API = "api"; private final Executor refreshExecutor; + protected final ProjectResolver projectResolver; @Inject public TransportShardRefreshAction( @@ -55,7 +68,8 @@ public TransportShardRefreshAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( settings, @@ -73,6 +87,7 @@ public TransportShardRefreshAction( PrimaryActionExecution.RejectOnOverload, ReplicaActionExecution.SubjectToCircuitBreaker ); + this.projectResolver = projectResolver; // registers the unpromotable version of shard refresh action new TransportUnpromotableShardRefreshAction( clusterService, @@ -104,6 +119,63 @@ protected void shardOperationOnPrimary( })); } + // We are here because there was mismatch between the SplitShardCountSummary in the request + // and that on the primary shard node. We assume that the request is exactly 1 reshard split behind + // the current state. + @Override + protected Map splitRequestOnPrimary(BasicReplicationRequest request) { + ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); + final ShardId sourceShard = request.shardId(); + IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); + SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId()); + Map requestsByShard = new HashMap<>(); + requestsByShard.put(sourceShard, request); + // Create a request for original source shard and for each target shard. + // New requests that are to be handled by target shards should contain the + // latest ShardCountSummary. + int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); + ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); + requestsByShard.put(targetShard, new BasicReplicationRequest(targetShard, shardCountSummary)); + return requestsByShard; + } + + @Override + protected Tuple combineSplitResponses( + BasicReplicationRequest originalRequest, + Map splitRequests, + Map> responses + ) { + int failed = 0; + int successful = 0; + int total = 0; + List failures = new ArrayList<>(); + + // If the action fails on either one of the shards, we return an exception. + // Case 1: Both source and target shards return a response: Add up total, successful, failures + // Case 2: Both source and target shards return an exception : return exception + // Case 3: One shards returns a response, the other returns an exception : return exception + for (Map.Entry> entry : responses.entrySet()) { + ShardId shardId = entry.getKey(); + Tuple value = entry.getValue(); + Exception exception = value.v2(); + if (exception != null) { + return new Tuple<>(null, exception); + } else { + ReplicationResponse response = value.v1(); + failed += response.getShardInfo().getFailed(); + successful += response.getShardInfo().getSuccessful(); + total += response.getShardInfo().getTotal(); + Collections.addAll(failures, response.getShardInfo().getFailures()); + } + } + ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]); + assert failureArray.length == failed; + ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray); + ReplicationResponse response = new ReplicationResponse(); + response.setShardInfo(shardInfo); + return new Tuple<>(response, null); + } + @Override protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener listener) { replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index 8d04a31101d0c..3243dbb5d3c77 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.shard.ShardId; @@ -28,6 +29,15 @@ public BasicReplicationRequest(ShardId shardId) { super(shardId); } + /** + * Creates a new request with resolved shard id and SplitShardCountSummary (used + * to determine if the request needs to be executed on a split shard not yet seen by the + * coordinator that sent the request) + */ + public BasicReplicationRequest(ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { + super(shardId, reshardSplitShardCountSummary); + } + public BasicReplicationRequest(StreamInput in) throws IOException { super(in); } From ea070830bfafb0247b415dd4a0378c706886f472 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Mon, 3 Nov 2025 10:17:56 -0500 Subject: [PATCH 07/13] commit --- .../admin/indices/refresh/TransportShardRefreshAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index ef6f0e94b2f30..9a7f668f46c2b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.admin.indices.flush.ShardFlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; From 952e6707225d793e921359bfe89847d26f91c37c Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 11 Nov 2025 14:23:09 -0500 Subject: [PATCH 08/13] refactor splitRequest --- .../flush/TransportShardFlushAction.java | 12 ++++- .../refresh/TransportShardRefreshAction.java | 10 ++++ .../ReplicationRequestSplitHelper.java | 48 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 1e8924acbb751..26154bbd24e89 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -48,7 +49,7 @@ public class TransportShardFlushAction extends TransportReplicationAction TYPE = new ActionType<>(NAME); - protected final ProjectResolver projectResolver; + private final ProjectResolver projectResolver; @Inject public TransportShardFlushAction( @@ -108,6 +109,14 @@ protected void shardOperationOnPrimary( // the current state. @Override protected Map splitRequestOnPrimary(ShardFlushRequest request) { + return ReplicationRequestSplitHelper.splitRequestCommon( + request, + projectResolver.getProjectMetadata(clusterService.state()), + (targetShard, shardCountSummary) -> + new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary) + ); + + /* ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); final ShardId sourceShard = request.shardId(); IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); @@ -121,6 +130,7 @@ protected Map splitRequestOnPrimary(ShardFlushReques ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); requestsByShard.put(targetShard, new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)); return requestsByShard; + */ } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 9a7f668f46c2b..4e68fd47a6ca5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -123,6 +124,14 @@ protected void shardOperationOnPrimary( // the current state. @Override protected Map splitRequestOnPrimary(BasicReplicationRequest request) { + return ReplicationRequestSplitHelper.splitRequestCommon( + request, + projectResolver.getProjectMetadata(clusterService.state()), + (targetShard, shardCountSummary) -> + new BasicReplicationRequest(targetShard, shardCountSummary) + ); + + /* ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); final ShardId sourceShard = request.shardId(); IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); @@ -136,6 +145,7 @@ protected Map splitRequestOnPrimary(BasicRepli ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); requestsByShard.put(targetShard, new BasicReplicationRequest(targetShard, shardCountSummary)); return requestsByShard; + */ } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java new file mode 100644 index 0000000000000..83bb87fa8af73 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.index.shard.ShardId; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; + +public final class ReplicationRequestSplitHelper { + private ReplicationRequestSplitHelper() {} + + // We are here because there was a mismatch between the SplitShardCountSummary in the request + // and that on the primary shard node. We assume that the request is exactly 1 reshard split behind + // the current state. + public static Map splitRequestCommon( + T request, + ProjectMetadata project, + BiFunction targetRequestFactory + ) { + final ShardId sourceShard = request.shardId(); + IndexMetadata indexMetadata = project.getIndexSafe(sourceShard.getIndex()); + SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId()); + + Map requestsByShard = new HashMap<>(); + requestsByShard.put(sourceShard, request); + + // Create a request for original source shard and for each target shard. + // New requests that are to be handled by target shards should contain the + // latest ShardCountSummary. + int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); + ShardId targetShard = new ShardId(sourceShard.getIndex(), targetShardId); + + requestsByShard.put(targetShard, targetRequestFactory.apply(targetShard, shardCountSummary)); + return requestsByShard; + } +} From adb8644525f9911837c78b39e2f75d820fe69efa Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 11 Nov 2025 23:04:42 -0500 Subject: [PATCH 09/13] refactor --- server/src/main/java/module-info.java | 1 + .../flush/TransportShardFlushAction.java | 58 +----------------- .../refresh/TransportShardRefreshAction.java | 58 +----------------- .../ReplicationRequestSplitHelper.java | 60 +++++++++++++++++-- .../TransportBroadcastReplicationAction.java | 9 ++- 5 files changed, 70 insertions(+), 116 deletions(-) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 6a4d447c8067d..89cab4302dff0 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -55,6 +55,7 @@ requires org.apache.lucene.queryparser; requires org.apache.lucene.sandbox; requires org.apache.lucene.suggest; + requires org.elasticsearch.server; exports org.elasticsearch; exports org.elasticsearch.action; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 26154bbd24e89..1312f32c1918e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -17,10 +17,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -38,10 +35,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; public class TransportShardFlushAction extends TransportReplicationAction { @@ -109,28 +102,11 @@ protected void shardOperationOnPrimary( // the current state. @Override protected Map splitRequestOnPrimary(ShardFlushRequest request) { - return ReplicationRequestSplitHelper.splitRequestCommon( + return ReplicationRequestSplitHelper.splitRequest( request, projectResolver.getProjectMetadata(clusterService.state()), - (targetShard, shardCountSummary) -> - new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary) + (targetShard, shardCountSummary) -> new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary) ); - - /* - ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); - final ShardId sourceShard = request.shardId(); - IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); - SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId()); - Map requestsByShard = new HashMap<>(); - requestsByShard.put(sourceShard, request); - // Create a request for original source shard and for each target shard. - // New requests that are to be handled by target shards should contain the - // latest ShardCountSummary. - int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); - ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); - requestsByShard.put(targetShard, new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)); - return requestsByShard; - */ } @Override @@ -139,35 +115,7 @@ protected Tuple combineSplitResponses( Map splitRequests, Map> responses ) { - int failed = 0; - int successful = 0; - int total = 0; - List failures = new ArrayList<>(); - - // If the action fails on either one of the shards, we return an exception. - // Case 1: Both source and target shards return a response: Add up total, successful, failures - // Case 2: Both source and target shards return an exception : return exception - // Case 3: One shards returns a response, the other returns an exception : return exception - for (Map.Entry> entry : responses.entrySet()) { - ShardId shardId = entry.getKey(); - Tuple value = entry.getValue(); - Exception exception = value.v2(); - if (exception != null) { - return new Tuple<>(null, exception); - } else { - ReplicationResponse response = value.v1(); - failed += response.getShardInfo().getFailed(); - successful += response.getShardInfo().getSuccessful(); - total += response.getShardInfo().getTotal(); - Collections.addAll(failures, response.getShardInfo().getFailures()); - } - } - ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]); - assert failureArray.length == failed; - ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray); - ReplicationResponse response = new ReplicationResponse(); - response.setShardInfo(shardInfo); - return new Tuple<>(response, null); + return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 4e68fd47a6ca5..53d25f11c9ccd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -20,11 +20,8 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -39,10 +36,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -124,28 +117,11 @@ protected void shardOperationOnPrimary( // the current state. @Override protected Map splitRequestOnPrimary(BasicReplicationRequest request) { - return ReplicationRequestSplitHelper.splitRequestCommon( + return ReplicationRequestSplitHelper.splitRequest( request, projectResolver.getProjectMetadata(clusterService.state()), - (targetShard, shardCountSummary) -> - new BasicReplicationRequest(targetShard, shardCountSummary) + (targetShard, shardCountSummary) -> new BasicReplicationRequest(targetShard, shardCountSummary) ); - - /* - ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); - final ShardId sourceShard = request.shardId(); - IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex()); - SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId()); - Map requestsByShard = new HashMap<>(); - requestsByShard.put(sourceShard, request); - // Create a request for original source shard and for each target shard. - // New requests that are to be handled by target shards should contain the - // latest ShardCountSummary. - int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); - ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId); - requestsByShard.put(targetShard, new BasicReplicationRequest(targetShard, shardCountSummary)); - return requestsByShard; - */ } @Override @@ -154,35 +130,7 @@ protected Tuple combineSplitResponses( Map splitRequests, Map> responses ) { - int failed = 0; - int successful = 0; - int total = 0; - List failures = new ArrayList<>(); - - // If the action fails on either one of the shards, we return an exception. - // Case 1: Both source and target shards return a response: Add up total, successful, failures - // Case 2: Both source and target shards return an exception : return exception - // Case 3: One shards returns a response, the other returns an exception : return exception - for (Map.Entry> entry : responses.entrySet()) { - ShardId shardId = entry.getKey(); - Tuple value = entry.getValue(); - Exception exception = value.v2(); - if (exception != null) { - return new Tuple<>(null, exception); - } else { - ReplicationResponse response = value.v1(); - failed += response.getShardInfo().getFailed(); - successful += response.getShardInfo().getSuccessful(); - total += response.getShardInfo().getTotal(); - Collections.addAll(failures, response.getShardInfo().getFailures()); - } - } - ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]); - assert failureArray.length == failed; - ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray); - ReplicationResponse response = new ReplicationResponse(); - response.setShardInfo(shardInfo); - return new Tuple<>(response, null); + return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java index 83bb87fa8af73..db663264346ec 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java @@ -12,19 +12,32 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.BiFunction; public final class ReplicationRequestSplitHelper { private ReplicationRequestSplitHelper() {} - // We are here because there was a mismatch between the SplitShardCountSummary in the request - // and that on the primary shard node. We assume that the request is exactly 1 reshard split behind - // the current state. - public static Map splitRequestCommon( + /** + * Given a stale Replication Request, like flush or refresh, split it into multiple requests, + * one for the source shard and one for the target shard. + * See {@link org.elasticsearch.action.bulk.ShardBulkSplitHelper} for how we split + * {@link org.elasticsearch.action.bulk.BulkShardRequest} + * We are here because there was a mismatch between the SplitShardCountSummary in the request + * and that on the primary shard node. + * TODO: + * We assume here that the request is exactly 1 reshard split behind + * the current state. We might either revise this assumption or enforce it + * in a follow up + */ + public static > Map splitRequest( T request, ProjectMetadata project, BiFunction targetRequestFactory @@ -45,4 +58,43 @@ public static Map splitRequestCommon( requestsByShard.put(targetShard, targetRequestFactory.apply(targetShard, shardCountSummary)); return requestsByShard; } + + public static > Tuple combineSplitResponses( + T originalRequest, + Map splitRequests, + Map> responses + ) { + int failed = 0; + int successful = 0; + int total = 0; + List failures = new ArrayList<>(); + + // If the action fails on either one of the shards, we return an exception. + // Case 1: Both source and target shards return a response: Add up total, successful, failures + // Case 2: Both source and target shards return an exception : return exception + // Case 3: One shard returns a response, the other returns an exception : return exception + for (Map.Entry> entry : responses.entrySet()) { + Tuple value = entry.getValue(); + Exception exception = value.v2(); + + if (exception != null) { + return new Tuple<>(null, exception); + } + + ReplicationResponse response = value.v1(); + failed += response.getShardInfo().getFailed(); + successful += response.getShardInfo().getSuccessful(); + total += response.getShardInfo().getTotal(); + Collections.addAll(failures, response.getShardInfo().getFailures()); + } + + ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]); + assert failureArray.length == failed; + + ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray); + + ReplicationResponse response = new ReplicationResponse(); + response.setShardInfo(shardInfo); + return new Tuple<>(response, null); + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 4864bdcb86ec9..48fe4e3bf49d9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -201,11 +202,15 @@ protected List shards(Request request, ProjectState projectState) { OperationRouting operationRouting = clusterService.operationRouting(); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request); + ProjectMetadata project = projectState.metadata(); + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request); for (String index : concreteIndices) { Iterator iterator = operationRouting.allWritableShards(projectState, index); + var indexMetadata = project.index(index); while (iterator.hasNext()) { - shardIds.add(iterator.next().shardId()); + ShardId shardId = iterator.next().shardId(); + SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + shardIds.add(shardId); } } return shardIds; From ae72d0080d7d64f13074bb60db8b12893cedc310 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 11 Nov 2025 23:58:50 -0500 Subject: [PATCH 10/13] review comments --- server/src/main/java/module-info.java | 1 - .../indices/flush/TransportFlushAction.java | 8 +-- .../refresh/TransportRefreshAction.java | 8 +-- .../get/TransportShardMultiGetAction.java | 1 + .../replication/BasicReplicationRequest.java | 1 + .../TransportBroadcastReplicationAction.java | 49 +++++++++++++++---- .../BroadcastReplicationTests.java | 13 +++-- 7 files changed, 54 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index f7433ffde1323..9c5d11e1cf9e1 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -55,7 +55,6 @@ requires org.apache.lucene.queryparser; requires org.apache.lucene.sandbox; requires org.apache.lucene.suggest; - requires org.elasticsearch.server; exports org.elasticsearch; exports org.elasticsearch.action; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index bb17a88fa8f42..8d82b7945d90b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; @@ -61,11 +60,8 @@ public TransportFlushAction( } @Override - protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, ProjectMetadata project) { - // Get effective shardCount for shardId and pass it on as parameter to new ShardFlushRequest - var indexMetadata = project.getIndexSafe(shardId.getIndex()); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - return new ShardFlushRequest(request, shardId, reshardSplitShardCountSummary); + protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) { + return new ShardFlushRequest(request, shardId, shardCountSummary); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index f46ff106ba130..2856112c05177 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; @@ -62,11 +61,8 @@ public TransportRefreshAction( } @Override - protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, ProjectMetadata project) { - // Get effective shardCount for shardId and pass it on as parameter to new shard request - var indexMetadata = project.getIndexSafe(shardId.getIndex()); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId, reshardSplitShardCountSummary); + protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) { + BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId, shardCountSummary); replicationRequest.waitForActiveShards(ActiveShardCount.NONE); return replicationRequest; } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 806b55e6ad7c9..65a0a8751cb5a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -174,6 +174,7 @@ private void handleMultiGetOnUnpromotableShard( ShardId shardId = indexShard.shardId(); if (request.refresh()) { logger.trace("send refresh action for shard {}", shardId); + // TODO: Do we need to pass in shardCountSummary here ? var refreshRequest = new BasicReplicationRequest(shardId); refreshRequest.setParentTask(request.getParentTask()); client.executeLocally( diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index 3243dbb5d3c77..fa45bfc7554cf 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -25,6 +25,7 @@ public class BasicReplicationRequest extends ReplicationRequest listener) { final ClusterState clusterState = clusterService.state(); final ProjectState projectState = projectResolver.getProjectState(clusterState); final ProjectMetadata project = projectState.metadata(); - final List shards = shards(request, projectState); + final List> shards = shards(request, projectState); final Map indexMetadataByName = project.indices(); try (var refs = new RefCountingRunnable(() -> finish(listener))) { - for (final ShardId shardId : shards) { + // for (final ShardId shardId : shards) { + shards.forEach(tuple -> { + ShardId shardId = tuple.v1(); + SplitShardCountSummary shardCountSummary = tuple.v2(); // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here? shardExecute( task, request, shardId, - project, // needed to calculate the correct SplitShardCountSummary down the stack + shardCountSummary, ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()) ); - } + }); } } @@ -184,18 +188,16 @@ protected void shardExecute( Task task, Request request, ShardId shardId, - ProjectMetadata project, + SplitShardCountSummary shardCountSummary, ActionListener shardActionListener ) { assert Transports.assertNotTransportThread("may hit all the shards"); - ShardRequest shardRequest = newShardRequest(request, shardId, project); + ShardRequest shardRequest = newShardRequest(request, shardId, shardCountSummary); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener); } - /** - * @return all shard ids the request should run on - */ + /* protected List shards(Request request, ProjectState projectState) { assert Transports.assertNotTransportThread("may hit all the shards"); List shardIds = new ArrayList<>(); @@ -215,8 +217,35 @@ protected List shards(Request request, ProjectState projectState) { } return shardIds; } + */ + + /** + * @return all shard ids the request should run on + */ + protected List> shards(Request request, ProjectState projectState) { + assert Transports.assertNotTransportThread("may hit all the shards"); + + List> shards = new ArrayList<>(); + + OperationRouting operationRouting = clusterService.operationRouting(); + ProjectMetadata project = projectState.metadata(); + + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request); + for (String index : concreteIndices) { + Iterator iterator = operationRouting.allWritableShards(projectState, index); + IndexMetadata indexMetadata = project.index(index); + + while (iterator.hasNext()) { + ShardId shardId = iterator.next().shardId(); + SplitShardCountSummary splitSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + shards.add(new Tuple<>(shardId, splitSummary)); + } + } + + return shards; + } - protected abstract ShardRequest newShardRequest(Request request, ShardId shardId, ProjectMetadata project); + protected abstract ShardRequest newShardRequest(Request request, ShardId shardId, SplitShardCountSummary shardCountSummary); protected abstract Response newResponse( int successfulShards, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 4d11d0b456714..95ad4ad1a0a68 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -246,12 +247,12 @@ public void testShardsList() throws InterruptedException, ExecutionException { index ); logger.debug("--> using initial state:\n{}", clusterService.state()); - List shards = broadcastReplicationAction.shards( + List> shards = broadcastReplicationAction.shards( new DummyBroadcastRequest().indices(shardId.getIndexName()), clusterState.projectState(projectId) ); assertThat(shards.size(), equalTo(1)); - assertThat(shards.get(0), equalTo(shardId)); + assertThat(shards.get(0).v1(), equalTo(shardId)); } private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction< @@ -285,7 +286,11 @@ private class TestBroadcastReplicationAction extends TransportBroadcastReplicati } @Override - protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId, ProjectMetadata project) { + protected BasicReplicationRequest newShardRequest( + DummyBroadcastRequest request, + ShardId shardId, + SplitShardCountSummary shardCountSummary + ) { return new BasicReplicationRequest(shardId); } @@ -304,7 +309,7 @@ protected void shardExecute( Task task, DummyBroadcastRequest request, ShardId shardId, - ProjectMetadata project, + SplitShardCountSummary shardCountSummary, ActionListener shardActionListener ) { capturedShardRequests.add(new Tuple<>(shardId, shardActionListener)); From 7ff45aacdacb0ebfabfabd81353090d5c2273c02 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 12 Nov 2025 18:57:19 -0500 Subject: [PATCH 11/13] review comments --- .../indices/flush/ShardFlushRequest.java | 5 ++- .../refresh/TransportShardRefreshAction.java | 2 +- .../action/bulk/BulkOperation.java | 8 +--- .../ReplicationRequestSplitHelper.java | 1 + .../TransportBroadcastReplicationAction.java | 45 +++++-------------- .../TransportReplicationAction.java | 3 ++ .../BroadcastReplicationTests.java | 4 +- 7 files changed, 23 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index f0fc4eda5720c..5da1ad816172e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -21,11 +21,12 @@ public class ShardFlushRequest extends ReplicationRequest { private final FlushRequest request; + private static final ActiveShardCount DEFAULT_ACTIVE_SHARD_COUNT = ActiveShardCount.NONE; public ShardFlushRequest(FlushRequest request, ShardId shardId) { super(shardId); this.request = request; - this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default + this.waitForActiveShards = DEFAULT_ACTIVE_SHARD_COUNT; // don't wait for any active shards before proceeding, by default } /** @@ -36,7 +37,7 @@ public ShardFlushRequest(FlushRequest request, ShardId shardId) { public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { super(shardId, reshardSplitShardCountSummary); this.request = request; - this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default + this.waitForActiveShards = DEFAULT_ACTIVE_SHARD_COUNT; // don't wait for any active shards before proceeding, by default } public ShardFlushRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 53d25f11c9ccd..7aee16376f9a1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -51,7 +51,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction< public static final String SOURCE_API = "api"; private final Executor refreshExecutor; - protected final ProjectResolver projectResolver; + private final ProjectResolver projectResolver; @Inject public TransportShardRefreshAction( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index ca095ccdcc001..3f3e3a9127526 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -405,13 +405,7 @@ private void executeBulkRequestsByShard( // Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest var indexMetadata = project.getIndexSafe(shardId.getIndex()); SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - /* - var indexMetadata = project.index(shardId.getIndexName()); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; - if (indexMetadata != null) { - reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - } - */ + BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, reshardSplitShardCountSummary, diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java index db663264346ec..822b593225d75 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java @@ -52,6 +52,7 @@ public static > Map splitRequest( // Create a request for original source shard and for each target shard. // New requests that are to be handled by target shards should contain the // latest ShardCountSummary. + // TODO: This will not work if the reshard metadata is gone int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id()); ShardId targetShard = new ShardId(sourceShard.getIndex(), targetShardId); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 11be1c8aa1974..4f7d2e07f986e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.CheckedConsumer; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -64,6 +63,8 @@ public abstract class TransportBroadcastReplicationAction< private final Executor executor; private final ProjectResolver projectResolver; + protected record ShardRecord(ShardId shardId, SplitShardCountSummary splitSummary) {} + public TransportBroadcastReplicationAction( String name, Writeable.Reader requestReader, @@ -104,14 +105,14 @@ public void accept(ActionListener listener) { final ClusterState clusterState = clusterService.state(); final ProjectState projectState = projectResolver.getProjectState(clusterState); final ProjectMetadata project = projectState.metadata(); - final List> shards = shards(request, projectState); + final List shards = shards(request, projectState); + //final List> shards = shards(request, projectState); final Map indexMetadataByName = project.indices(); try (var refs = new RefCountingRunnable(() -> finish(listener))) { - // for (final ShardId shardId : shards) { - shards.forEach(tuple -> { - ShardId shardId = tuple.v1(); - SplitShardCountSummary shardCountSummary = tuple.v2(); + shards.forEach(shardRecord -> { + ShardId shardId = shardRecord.shardId(); + SplitShardCountSummary shardCountSummary = shardRecord.splitSummary(); // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here? shardExecute( task, @@ -197,35 +198,13 @@ protected void shardExecute( client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener); } - /* - protected List shards(Request request, ProjectState projectState) { - assert Transports.assertNotTransportThread("may hit all the shards"); - List shardIds = new ArrayList<>(); - - OperationRouting operationRouting = clusterService.operationRouting(); - - ProjectMetadata project = projectState.metadata(); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request); - for (String index : concreteIndices) { - Iterator iterator = operationRouting.allWritableShards(projectState, index); - var indexMetadata = project.index(index); - while (iterator.hasNext()) { - ShardId shardId = iterator.next().shardId(); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - shardIds.add(shardId); - } - } - return shardIds; - } - */ - /** * @return all shard ids the request should run on */ - protected List> shards(Request request, ProjectState projectState) { + protected List shards(Request request, ProjectState projectState) { assert Transports.assertNotTransportThread("may hit all the shards"); - List> shards = new ArrayList<>(); + List shards = new ArrayList<>(); OperationRouting operationRouting = clusterService.operationRouting(); ProjectMetadata project = projectState.metadata(); @@ -237,11 +216,11 @@ protected List> shards(Request request, P while (iterator.hasNext()) { ShardId shardId = iterator.next().shardId(); - SplitShardCountSummary splitSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); - shards.add(new Tuple<>(shardId, splitSummary)); + SplitShardCountSummary splitSummary = + SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + shards.add(new ShardRecord(shardId, splitSummary)); } } - return shards; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2b134b43f339d..3eed9724997b5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -338,6 +338,9 @@ protected abstract void shardOperationOnReplica( /** * During Resharding, we might need to split the primary request. + * We are here because there was mismatch between the SplitShardCountSummary in the request + * and that on the primary shard node. We assume that the request is exactly 1 reshard split behind + * the current state. */ protected Map splitRequestOnPrimary(Request request) { return Map.of(request.shardId(), request); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 95ad4ad1a0a68..98c83ba796220 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -247,12 +247,12 @@ public void testShardsList() throws InterruptedException, ExecutionException { index ); logger.debug("--> using initial state:\n{}", clusterService.state()); - List> shards = broadcastReplicationAction.shards( + List shards = broadcastReplicationAction.shards( new DummyBroadcastRequest().indices(shardId.getIndexName()), clusterState.projectState(projectId) ); assertThat(shards.size(), equalTo(1)); - assertThat(shards.get(0).v1(), equalTo(shardId)); + assertThat(shards.get(0).shardId(), equalTo(shardId)); } private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction< From da76468a792b9c5476c2d61f5009950e7a5fafdf Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 12 Nov 2025 19:09:37 -0500 Subject: [PATCH 12/13] commit --- .../action/support/replication/BasicReplicationRequest.java | 2 +- .../replication/TransportBroadcastReplicationAction.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index fa45bfc7554cf..c1a023ff5419c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -25,7 +25,7 @@ public class BasicReplicationRequest extends ReplicationRequest listener) { final ProjectState projectState = projectResolver.getProjectState(clusterState); final ProjectMetadata project = projectState.metadata(); final List shards = shards(request, projectState); - //final List> shards = shards(request, projectState); final Map indexMetadataByName = project.indices(); try (var refs = new RefCountingRunnable(() -> finish(listener))) { @@ -216,8 +215,7 @@ protected List shards(Request request, ProjectState projectState) { while (iterator.hasNext()) { ShardId shardId = iterator.next().shardId(); - SplitShardCountSummary splitSummary = - SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + SplitShardCountSummary splitSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); shards.add(new ShardRecord(shardId, splitSummary)); } } From 3efd785adb9a632eea9feddef3d90775bb16110e Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 13 Nov 2025 14:23:09 -0500 Subject: [PATCH 13/13] minor change --- .../action/admin/indices/flush/ShardFlushRequest.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 5da1ad816172e..8d520544dd8a4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -21,13 +21,6 @@ public class ShardFlushRequest extends ReplicationRequest { private final FlushRequest request; - private static final ActiveShardCount DEFAULT_ACTIVE_SHARD_COUNT = ActiveShardCount.NONE; - - public ShardFlushRequest(FlushRequest request, ShardId shardId) { - super(shardId); - this.request = request; - this.waitForActiveShards = DEFAULT_ACTIVE_SHARD_COUNT; // don't wait for any active shards before proceeding, by default - } /** * Creates a request for a resolved shard id and SplitShardCountSummary (used @@ -37,7 +30,7 @@ public ShardFlushRequest(FlushRequest request, ShardId shardId) { public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { super(shardId, reshardSplitShardCountSummary); this.request = request; - this.waitForActiveShards = DEFAULT_ACTIVE_SHARD_COUNT; // don't wait for any active shards before proceeding, by default + this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } public ShardFlushRequest(StreamInput in) throws IOException {