Skip to content

Conversation

@ankikuma
Copy link
Contributor

@ankikuma ankikuma commented Oct 21, 2025

Address ES-13295 and ES-13170

Implement shard split logic for Flush and Refresh. Also forward the most recent shardCountRequestSummary to the target shards, when splitting the request at the source.

@elasticsearchmachine elasticsearchmachine added v9.3.0 serverless-linked Added by automation, don't add manually labels Oct 21, 2025
@ankikuma ankikuma marked this pull request as ready for review November 3, 2025 16:29
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Nov 3, 2025
@ankikuma ankikuma added :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. Team:Distributed Indexing Meta label for Distributed Indexing team labels Nov 3, 2025
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Nov 3, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@ankikuma ankikuma added >non-issue needs:triage Requires assignment of a team area label labels Nov 3, 2025
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Nov 3, 2025

// We are here because there was a mismatch between the SplitShardCountSummary in the request
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
// the current state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption will need to be revised in a follow up PR. I created ES-13413 for this.

// If the action fails on either one of the shards, we return an exception.
// Case 1: Both source and target shards return a response: Add up total, successful, failures
// Case 2: Both source and target shards return an exception : return exception
// Case 3: One shards returns a response, the other returns an exception : return exception
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fail the entire request with an exception if the action fails on either one of the 2 shards. Please comment if you think this behavior is incorrect.

assert failureArray.length == failed;
ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray);
ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(shardInfo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShardInfo adds up the successes and failures from the 2 responses.

Map<ShardId, ShardFlushRequest> requestsByShard = new HashMap<>();
requestsByShard.put(sourceShard, request);
// Create a request for original source shard and for each target shard.
// New requests that are to be handled by target shards should contain the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful with using cluster state on the source shard node. During handoff, the target shard node applies a non-acked cluster state update to transition to HANDOFF and then acks handoff request from the source shard. At this point there is no guarantee that this update to HANDOFF was observed and that SplitShardCountSummary.forIndexing returns a correct result. In practice it shouldn't really matter though because a particular request can only be split once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created ES-13488

// latest ShardCountSummary.
int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id());
ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId);
requestsByShard.put(targetShard, new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to skip this logic if shardCountSummary is up to date (the caveat above applies)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard, it can't be up to date because it is checked in the caller.

public static final String NAME = FlushAction.NAME + "[s]";
public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME);

protected final ProjectResolver projectResolver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.getIndexSafe(shardId.getIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to replace the commented code below. The indexMetadata cannot really be null at this point and we must have a valid SplitShardCountSummary to pass to the BulkShardRequest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the commented code before pushing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the behavior in any way? I can't really image it does but i want to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to make sure we throw an assert if we do see null indexmetadata, which was not the case earlier. We never expected indexMetadata to be NULL in the first place so the behavior has not changed.

I will remove commented code.

Comment on lines 64 to 68
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, ProjectMetadata project) {
// Get effective shardCount for shardId and pass it on as parameter to new ShardFlushRequest
var indexMetadata = project.getIndexSafe(shardId.getIndex());
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
return new ShardFlushRequest(request, shardId, reshardSplitShardCountSummary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have been tempted to modify TransportBroadcastReplicationAction.shards to return a list of shardid, summary pairs, to keep the binding between the shards we've chosen for routing and the summary that is calculated based on that routing table explicit. It doesn't address any kind of bug here, it just makes the API a little clearer and harder to use incorrectly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes sense. Made the changes in latest upload.

Comment on lines 121 to 176
// We are here because there was mismatch between the SplitShardCountSummary in the request
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
// the current state.
@Override
protected Map<ShardId, BasicReplicationRequest> splitRequestOnPrimary(BasicReplicationRequest request) {
ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
final ShardId sourceShard = request.shardId();
IndexMetadata indexMetadata = project.getIndexSafe(request.shardId().getIndex());
SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, sourceShard.getId());
Map<ShardId, BasicReplicationRequest> requestsByShard = new HashMap<>();
requestsByShard.put(sourceShard, request);
// Create a request for original source shard and for each target shard.
// New requests that are to be handled by target shards should contain the
// latest ShardCountSummary.
int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id());
ShardId targetShard = new ShardId(request.shardId().getIndex(), targetShardId);
requestsByShard.put(targetShard, new BasicReplicationRequest(targetShard, shardCountSummary));
return requestsByShard;
}

@Override
protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
BasicReplicationRequest originalRequest,
Map<ShardId, BasicReplicationRequest> splitRequests,
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
) {
int failed = 0;
int successful = 0;
int total = 0;
List<ReplicationResponse.ShardInfo.Failure> failures = new ArrayList<>();

// If the action fails on either one of the shards, we return an exception.
// Case 1: Both source and target shards return a response: Add up total, successful, failures
// Case 2: Both source and target shards return an exception : return exception
// Case 3: One shards returns a response, the other returns an exception : return exception
for (Map.Entry<ShardId, Tuple<ReplicationResponse, Exception>> entry : responses.entrySet()) {
ShardId shardId = entry.getKey();
Tuple<ReplicationResponse, Exception> value = entry.getValue();
Exception exception = value.v2();
if (exception != null) {
return new Tuple<>(null, exception);
} else {
ReplicationResponse response = value.v1();
failed += response.getShardInfo().getFailed();
successful += response.getShardInfo().getSuccessful();
total += response.getShardInfo().getTotal();
Collections.addAll(failures, response.getShardInfo().getFailures());
}
}
ReplicationResponse.ShardInfo.Failure[] failureArray = failures.toArray(new ReplicationResponse.ShardInfo.Failure[0]);
assert failureArray.length == failed;
ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful, failureArray);
ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(shardInfo);
return new Tuple<>(response, null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we share this logic with the flush action instead of duplicating it? That way e.g. fixing the possible stale cluster read happens in one place.

Copy link
Contributor Author

@ankikuma ankikuma Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created ReplicationRequestSplitHelper for this.

Copy link
Contributor

@bcully bcully left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. How are we tracking the TODOs though?

final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.getIndexSafe(shardId.getIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the commented code before pushing?


try (var refs = new RefCountingRunnable(() -> finish(listener))) {
for (final ShardId shardId : shards) {
// for (final ShardId shardId : shards) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove this

Comment on lines 200 to 219
/*
protected List<ShardId> shards(Request request, ProjectState projectState) {
assert Transports.assertNotTransportThread("may hit all the shards");
List<ShardId> shardIds = new ArrayList<>();
OperationRouting operationRouting = clusterService.operationRouting();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request);
ProjectMetadata project = projectState.metadata();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request);
for (String index : concreteIndices) {
Iterator<IndexShardRoutingTable> iterator = operationRouting.allWritableShards(projectState, index);
var indexMetadata = project.index(index);
while (iterator.hasNext()) {
shardIds.add(iterator.next().shardId());
ShardId shardId = iterator.next().shardId();
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
shardIds.add(shardId);
}
}
return shardIds;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better not to commit all this commented out code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry, I will remove it.

/**
* @return all shard ids the request should run on
*/
protected List<Tuple<ShardId, SplitShardCountSummary>> shards(Request request, ProjectState projectState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine but I do personally generally prefer records to tuples to make it easier to read the code where the result is consumed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can switch to using a record

Copy link
Contributor

@lkts lkts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, but i agree with feedback from @bcully - let's remove commented code and make sure we track all TODOs.

public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
super(shardId, reshardSplitShardCountSummary);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract this into a constant and share it between two constructors

public static final String SOURCE_API = "api";

private final Executor refreshExecutor;
protected final ProjectResolver projectResolver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected final ProjectResolver projectResolver;
private final ProjectResolver projectResolver;

}));
}

// We are here because there was mismatch between the SplitShardCountSummary in the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be on the method in the super class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that should make it more clear.

final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.getIndexSafe(shardId.getIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the behavior in any way? I can't really image it does but i want to check.

/**
* Creates a new request with resolved shard id
*/
// TODO: Check if callers of this need to be modified to pass in shardCountSummary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i would prefer for this constructor to not exist eventually because it's a "trap". If a particular call site does not need to pass summary, it can (with explanation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a ticket ES-13508 for this.

ShardId shardId = indexShard.shardId();
if (request.refresh()) {
logger.trace("send refresh action for shard {}", shardId);
// TODO: Do we need to pass in shardCountSummary here ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we'd look at this while working on get? I think it's a stray todo in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered by ES-13508. Sorry should have mentioned it here. I think I have tickets for all the TODOs.

@ankikuma ankikuma merged commit f9ae4c3 into elastic:main Nov 14, 2025
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >non-issue serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants