Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
840b91a
commit
ankikuma Oct 21, 2025
dfd2a30
[CI] Auto commit changes from spotless
Oct 21, 2025
3048e20
commit
ankikuma Oct 23, 2025
0093f1c
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Oct 23, 2025
292c5ed
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Oct 23, 2025
7dfe6fb
commit
ankikuma Oct 23, 2025
4bf8d9a
commit
ankikuma Oct 23, 2025
21e55aa
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Oct 23, 2025
d4b186f
Merge branch '10202025/ReshardFlushSplit' of github.com:ankikuma/elas…
ankikuma Oct 23, 2025
e5c65b7
commit
ankikuma Oct 30, 2025
ef01d6d
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 3, 2025
ea07083
commit
ankikuma Nov 3, 2025
fbcfa99
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 3, 2025
dd09b3b
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 3, 2025
952e670
refactor splitRequest
ankikuma Nov 11, 2025
adb8644
refactor
ankikuma Nov 12, 2025
6cd99c7
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 12, 2025
ae72d00
review comments
ankikuma Nov 12, 2025
9725853
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 12, 2025
3f1b722
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 12, 2025
7ff45aa
review comments
ankikuma Nov 12, 2025
da76468
commit
ankikuma Nov 13, 2025
99ec802
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 13, 2025
3efd785
minor change
ankikuma Nov 13, 2025
bc3d547
Merge remote-tracking branch 'upstream/main' into 10202025/ReshardFlu…
ankikuma Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -21,8 +22,13 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {

private final FlushRequest request;

public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
/**
* Creates a request for a resolved shard id and SplitShardCountSummary (used
* to determine if the request needs to be executed on a split shard not yet seen by the
* coordinator that sent the request)
*/
public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
super(shardId, reshardSplitShardCountSummary);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -59,8 +60,8 @@ public TransportFlushAction(
}

@Override
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
return new ShardFlushRequest(request, shardId);
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) {
return new ShardFlushRequest(request, shardId, shardCountSummary);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -32,12 +35,15 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;

public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {

public static final String NAME = FlushAction.NAME + "[s]";
public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME);

private final ProjectResolver projectResolver;

@Inject
public TransportShardFlushAction(
Settings settings,
Expand All @@ -46,7 +52,8 @@ public TransportShardFlushAction(
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
settings,
Expand All @@ -64,6 +71,7 @@ public TransportShardFlushAction(
PrimaryActionExecution.RejectOnOverload,
ReplicaActionExecution.SubjectToCircuitBreaker
);
this.projectResolver = projectResolver;
transportService.registerRequestHandler(
PRE_SYNCED_FLUSH_ACTION_NAME,
threadPool.executor(ThreadPool.Names.FLUSH),
Expand All @@ -89,6 +97,27 @@ protected void shardOperationOnPrimary(
}));
}

// We are here because there was a mismatch between the SplitShardCountSummary in the request
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
// the current state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption will need to be revised in a follow up PR. I created ES-13413 for this.

@Override
protected Map<ShardId, ShardFlushRequest> splitRequestOnPrimary(ShardFlushRequest request) {
return ReplicationRequestSplitHelper.splitRequest(
request,
projectResolver.getProjectMetadata(clusterService.state()),
(targetShard, shardCountSummary) -> new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)
);
}

@Override
protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
ShardFlushRequest originalRequest,
Map<ShardId, ShardFlushRequest> splitRequests,
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
) {
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
}

@Override
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
replica.flush(request.getRequest(), listener.map(flushed -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -60,8 +61,8 @@ public TransportRefreshAction(
}

@Override
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) {
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId, shardCountSummary);
replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
return replicationRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
Expand All @@ -32,6 +36,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;

public class TransportShardRefreshAction extends TransportReplicationAction<
Expand All @@ -46,6 +51,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
public static final String SOURCE_API = "api";

private final Executor refreshExecutor;
private final ProjectResolver projectResolver;

@Inject
public TransportShardRefreshAction(
Expand All @@ -55,7 +61,8 @@ public TransportShardRefreshAction(
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
settings,
Expand All @@ -73,6 +80,7 @@ public TransportShardRefreshAction(
PrimaryActionExecution.RejectOnOverload,
ReplicaActionExecution.SubjectToCircuitBreaker
);
this.projectResolver = projectResolver;
// registers the unpromotable version of shard refresh action
new TransportUnpromotableShardRefreshAction(
clusterService,
Expand Down Expand Up @@ -104,6 +112,27 @@ protected void shardOperationOnPrimary(
}));
}

// We are here because there was mismatch between the SplitShardCountSummary in the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be on the method in the super class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that should make it more clear.

// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
// the current state.
@Override
protected Map<ShardId, BasicReplicationRequest> splitRequestOnPrimary(BasicReplicationRequest request) {
return ReplicationRequestSplitHelper.splitRequest(
request,
projectResolver.getProjectMetadata(clusterService.state()),
(targetShard, shardCountSummary) -> new BasicReplicationRequest(targetShard, shardCountSummary)
);
}

@Override
protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
BasicReplicationRequest originalRequest,
Map<ShardId, BasicReplicationRequest> splitRequests,
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
) {
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
}

@Override
protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,9 @@ private void executeBulkRequestsByShard(
final List<BulkItemRequest> requests = entry.getValue();

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
if (indexMetadata != null) {
reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
}
var indexMetadata = project.getIndexSafe(shardId.getIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to replace the commented code below. The indexMetadata cannot really be null at this point and we must have a valid SplitShardCountSummary to pass to the BulkShardRequest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the commented code before pushing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the behavior in any way? I can't really image it does but i want to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to make sure we throw an assert if we do see null indexmetadata, which was not the case earlier. We never expected indexMetadata to be NULL in the first place so the behavior has not changed.

I will remove commented code.

SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());

BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
reshardSplitShardCountSummary,
Expand All @@ -416,7 +414,7 @@ private void executeBulkRequestsByShard(
bulkRequest.isSimulated()
);

if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
if (indexMetadata.getInferenceFields().isEmpty() == false) {
bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields());
}
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -33,7 +35,9 @@ private ShardBulkSplitHelper() {}
public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest request, ProjectMetadata project) {
final ShardId sourceShardId = request.shardId();
final Index index = sourceShardId.getIndex();
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));
IndexMetadata indexMetadata = project.getIndexSafe(index);
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, request.shardId().getId());

Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
Map<ShardId, BulkShardRequest> bulkRequestsPerShard = new HashMap<>();
Expand All @@ -57,17 +61,26 @@ public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest requ

// All items belong to either the source shard or target shard.
if (requestsByShard.size() == 1) {
// Return the original request if no items were split to target.
// Return the original request if no items were split to target. Note that
// this original request still contains the stale SplitShardCountSummary.
// This is alright because we hold primary indexing permits while calling this split
// method and we execute this request on the primary without letting go of the indexing permits.
// This means that a second split cannot occur in the meantime.
if (requestsByShard.containsKey(sourceShardId)) {
return Map.of(sourceShardId, request);
}
}

// Create a new BulkShardRequest(s) with the updated SplitShardCountSummary. This is because
// we do not hold primary permits on the target shard, and hence it can proceed with
// a second split operation while this request is still pending. We must verify the
// SplitShardCountSummary again on the target.
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
shardCountSummary,
request.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0]),
request.isSimulated()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private void handleMultiGetOnUnpromotableShard(
ShardId shardId = indexShard.shardId();
if (request.refresh()) {
logger.trace("send refresh action for shard {}", shardId);
// TODO: Do we need to pass in shardCountSummary here ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we'd look at this while working on get? I think it's a stray todo in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered by ES-13508. Sorry should have mentioned it here. I think I have tickets for all the TODOs.

var refreshRequest = new BasicReplicationRequest(shardId);
refreshRequest.setParentTask(request.getParentTask());
client.executeLocally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;

Expand All @@ -24,10 +25,20 @@ public class BasicReplicationRequest extends ReplicationRequest<BasicReplication
/**
* Creates a new request with resolved shard id
*/
// TODO: Check if callers of this need to be modified to pass in shardCountSummary and eventually remove this constructor
public BasicReplicationRequest(ShardId shardId) {
super(shardId);
}

/**
* Creates a new request with resolved shard id and SplitShardCountSummary (used
* to determine if the request needs to be executed on a split shard not yet seen by the
* coordinator that sent the request)
*/
public BasicReplicationRequest(ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
super(shardId, reshardSplitShardCountSummary);
}

public BasicReplicationRequest(StreamInput in) throws IOException {
super(in);
}
Expand Down
Loading