Skip to content

Commit

Permalink
Internal: make sure that multi_percolate request hands over its conte…
Browse files Browse the repository at this point in the history
…xt and headers to its corresponding shard requests

Closes #7371
  • Loading branch information
javanna authored and areek committed Sep 8, 2014
1 parent d44518e commit a0e8ba9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Expand Up @@ -122,7 +122,7 @@ public void onResponse(MultiGetResponse multiGetItemResponses) {
percolateRequests.set(slot, itemResponse.getFailure());
}
}
new ASyncAction(percolateRequests, listener, clusterState).run();
new ASyncAction(request, percolateRequests, listener, clusterState).run();
}

@Override
Expand All @@ -131,7 +131,7 @@ public void onFailure(Throwable e) {
}
});
} else {
new ASyncAction(percolateRequests, listener, clusterState).run();
new ASyncAction(request, percolateRequests, listener, clusterState).run();
}

}
Expand All @@ -140,6 +140,7 @@ private class ASyncAction {

final ActionListener<MultiPercolateResponse> finalListener;
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
final MultiPercolateRequest multiPercolateRequest;
final List<Object> percolateRequests;

final Map<ShardId, IntArrayList> shardToSlots;
Expand All @@ -148,8 +149,9 @@ private class ASyncAction {
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;

ASyncAction(List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
ASyncAction(MultiPercolateRequest multiPercolateRequest, List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
this.finalListener = finalListener;
this.multiPercolateRequest = multiPercolateRequest;
this.percolateRequests = percolateRequests;
responsesByItemAndShard = new AtomicReferenceArray<>(percolateRequests.size());
expectedOperationsPerItem = new AtomicReferenceArray<>(percolateRequests.size());
Expand Down Expand Up @@ -192,7 +194,7 @@ private class ASyncAction {
ShardId shardId = shard.shardId();
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
if (requests == null) {
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(multiPercolateRequest, shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
}
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
Expand Down
Expand Up @@ -126,8 +126,8 @@ public static class Request extends SingleShardOperationRequest implements Indic
Request() {
}

Request(String concreteIndex, int shardId, String preference) {
this.index = concreteIndex;
Request(MultiPercolateRequest multiPercolateRequest, String concreteIndex, int shardId, String preference) {
super(multiPercolateRequest, concreteIndex);
this.shardId = shardId;
this.preference = preference;
this.items = new ArrayList<>();
Expand Down

0 comments on commit a0e8ba9

Please sign in to comment.