Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that multi_percolate request hands over its context and headers to its corresponding shard requests #7371

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -122,7 +122,7 @@ public void onResponse(MultiGetResponse multiGetItemResponses) {
percolateRequests.set(slot, itemResponse.getFailure());
}
}
new ASyncAction(percolateRequests, listener, clusterState).run();
new ASyncAction(request, percolateRequests, listener, clusterState).run();
}

@Override
Expand All @@ -131,7 +131,7 @@ public void onFailure(Throwable e) {
}
});
} else {
new ASyncAction(percolateRequests, listener, clusterState).run();
new ASyncAction(request, percolateRequests, listener, clusterState).run();
}

}
Expand All @@ -140,6 +140,7 @@ private class ASyncAction {

final ActionListener<MultiPercolateResponse> finalListener;
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
final MultiPercolateRequest multiPercolateRequest;
final List<Object> percolateRequests;

final Map<ShardId, IntArrayList> shardToSlots;
Expand All @@ -148,8 +149,9 @@ private class ASyncAction {
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;

ASyncAction(List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
ASyncAction(MultiPercolateRequest multiPercolateRequest, List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
this.finalListener = finalListener;
this.multiPercolateRequest = multiPercolateRequest;
this.percolateRequests = percolateRequests;
responsesByItemAndShard = new AtomicReferenceArray<>(percolateRequests.size());
expectedOperationsPerItem = new AtomicReferenceArray<>(percolateRequests.size());
Expand Down Expand Up @@ -192,7 +194,7 @@ private class ASyncAction {
ShardId shardId = shard.shardId();
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
if (requests == null) {
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(multiPercolateRequest, shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
}
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
Expand Down
Expand Up @@ -126,8 +126,8 @@ public static class Request extends SingleShardOperationRequest implements Indic
Request() {
}

Request(String concreteIndex, int shardId, String preference) {
this.index = concreteIndex;
Request(MultiPercolateRequest multiPercolateRequest, String concreteIndex, int shardId, String preference) {
super(multiPercolateRequest, concreteIndex);
this.shardId = shardId;
this.preference = preference;
this.items = new ArrayList<>();
Expand Down