From 33cf10aa0204673b40917b24cf107e5d4c12a20c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 8 Jun 2018 16:50:18 +0200 Subject: [PATCH 1/4] Limit the number of concurrent requests per node With `max_concurrent_shard_requests` we used to throttle / limit the number of concurrent shard requests a high level search request can execute per node. This had several problems since it limited the number on a global level based on the number of nodes. This change now throttles the number of concurrent requests per node while still allowing concurrency across multiple nodes. Closes #31192 --- .../resources/rest-api-spec/api/search.json | 4 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../action/search/ExpandSearchPhase.java | 4 +- .../action/search/InitialSearchPhase.java | 137 +++++++++++++----- .../action/search/SearchRequest.java | 30 ++-- .../action/search/SearchRequestBuilder.java | 10 +- .../action/search/SearchResponse.java | 2 +- .../action/search/SearchResponseSections.java | 2 +- .../action/search/SearchTransportService.java | 4 +- .../action/search/TransportSearchAction.java | 21 +-- .../action/search/SearchAsyncActionTests.java | 5 +- 11 files changed, 135 insertions(+), 88 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index af2b3104a93f5..751c2797b9deb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -175,8 +175,8 @@ }, "max_concurrent_shard_requests" : { "type" : "number", - "description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", - "default" : "The default grows with the number of nodes in the cluster but is at most 256." + "description" : "The number of concurrent shard requests per node this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", + "default" : "The default is 5." }, "pre_filter_shard_size" : { "type" : "number", diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 91aec1171dcd6..8aa847f753682 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -79,9 +79,9 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests, + SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters) { - super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor); + super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java index d9ed6e6792f85..a6a99137dc945 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java @@ -131,9 +131,7 @@ private SearchRequest buildExpandSearchRequest(SearchRequest orig, SearchSourceB if (orig.allowPartialSearchResults() != null){ groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults()); } - if (orig.isMaxConcurrentShardRequestsSet()) { - groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests()); - } + groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests()); return groupRequest; } diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index a3be7c39affe8..61738a34195d5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -30,10 +30,14 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -52,12 +56,13 @@ abstract class InitialSearchPhase extends private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); - private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); - private final int maxConcurrentShardRequests; + private final int maxConcurrentRequestsPerNode; private final Executor executor; + private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); + private final boolean throttleConcurrentRequests; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, - int maxConcurrentShardRequests, Executor executor) { + int maxConcurrentRequestsPerNode, Executor executor) { super(name); this.request = request; final List toSkipIterators = new ArrayList<>(); @@ -77,7 +82,9 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); + this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size()); + // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle + this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size(); this.executor = executor; } @@ -109,7 +116,6 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, if (!lastShard) { performPhaseOnShard(shardIndex, shardIt, nextShard); } else { - maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -123,15 +129,12 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, } @Override - public final void run() throws IOException { + public final void run() { for (final SearchShardIterator iterator : toSkipShardsIts) { assert iterator.skip(); skipShard(iterator); } if (shardsIts.size() > 0) { - int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size()); - final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); - assert success; assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); @@ -152,7 +155,7 @@ public final void run() throws IOException { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - for (int index = 0; index < maxConcurrentShardRequests; index++) { + for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); assert shardRoutings.skip() == false; performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); @@ -160,14 +163,6 @@ public final void run() throws IOException { } } - private void maybeExecuteNext() { - final int index = shardExecutionIndex.getAndIncrement(); - if (index < shardsIts.size()) { - final SearchShardIterator shardRoutings = shardsIts.get(index); - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); - } - } - private void maybeFork(final Thread thread, final Runnable runnable) { if (thread == Thread.currentThread()) { @@ -197,6 +192,49 @@ public boolean isForceExecution() { }); } + private static final class PendingExecutions { + private final Semaphore semaphore; + private Queue queue; + PendingExecutions(int permits) { + this.semaphore = new Semaphore(permits); + } + void finishAndRunNext() { + semaphore.release(); + if (semaphore.tryAcquire()) { + final Runnable poll; + synchronized (this) { + poll = queue != null ? queue.poll() : null; + } + if (poll != null) { + poll.run(); + } else { + semaphore.release(); + } + } + } + + synchronized void add(Runnable runnable) { + if (queue == null) { // create this lazily + queue = new LinkedList<>(); + } + queue.add(runnable); + } + + boolean tryAcquire() { + return semaphore.tryAcquire(); + } + } + + private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) { + if (pendingExecutions != null) { + assert throttleConcurrentRequests; + maybeFork(originalThread, pendingExecutions::finishAndRunNext); + } else { + assert throttleConcurrentRequests == false; + } + } + + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the @@ -209,25 +247,50 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator if (shard == null) { fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { - try { - executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), - shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { - @Override - public void innerOnResponse(FirstResult result) { - maybeFork(thread, () -> onShardResult(result, shardIt)); - } + final PendingExecutions pendingExecutions = throttleConcurrentRequests ? pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), + n -> new PendingExecutions(maxConcurrentRequestsPerNode)) : null; + Runnable r = () -> { + try { + executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard + .currentNodeId(), + shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { + @Override + public void innerOnResponse(FirstResult result) { + try { + onShardResult(result, shardIt); + } finally { + executeNext(pendingExecutions, thread); + } + } - @Override - public void onFailure(Exception t) { - maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); + @Override + public void onFailure(Exception t) { + try { + maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); + } finally { + executeNext(pendingExecutions, thread); + } + } + }); + + + } catch (final Exception e) { + try { + /* + * It is possible to run into connection exceptions here because we are getting the connection early and might run in to + * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. + */ + fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); + } finally { + executeNext(pendingExecutions, thread); } - }); - } catch (final Exception e) { - /* - * It is possible to run into connection exceptions here because we are getting the connection early and might run in to - * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. - */ - fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); + } + }; + if (pendingExecutions == null || pendingExecutions.tryAcquire()) { + r.run(); + } else { + assert throttleConcurrentRequests; + pendingExecutions.add(r); } } } @@ -257,8 +320,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } else if (xTotalOps > expectedTotalOps) { throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"); - } else if (shardsIt.skip() == false) { - maybeExecuteNext(); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 6149503a08d25..a390538ec2978 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -75,8 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private Boolean requestCache; private Boolean allowPartialSearchResults; - - + + private Scroll scroll; private int batchedReduceSize = 512; @@ -140,7 +140,7 @@ public SearchRequest(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_6_3_0)) { allowPartialSearchResults = in.readOptionalBoolean(); - } + } } @Override @@ -165,7 +165,7 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeOptionalBoolean(allowPartialSearchResults); - } + } } @Override @@ -362,7 +362,7 @@ public SearchRequest requestCache(Boolean requestCache) { public Boolean requestCache() { return this.requestCache; } - + /** * Sets if this request should allow partial results. (If method is not called, * will default to the cluster level setting). @@ -374,8 +374,8 @@ public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults public Boolean allowPartialSearchResults() { return this.allowPartialSearchResults; - } - + } + /** * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection @@ -397,18 +397,18 @@ public int getBatchedReduceSize() { } /** - * Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Returns the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5} */ public int getMaxConcurrentShardRequests() { - return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests; + return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests; } /** - * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5} */ public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { if (maxConcurrentShardRequests < 1) { @@ -510,7 +510,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, + scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, allowPartialSearchResults); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 22b0b0c44d657..821a7d7be7f73 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -500,7 +500,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { request.requestCache(requestCache); return this; } - + /** * Sets if this request should allow partial results. (If method is not called, @@ -509,7 +509,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) { request.allowPartialSearchResults(allowPartialSearchResults); return this; - } + } /** * Should the query be profiled. Defaults to false @@ -549,9 +549,9 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { } /** - * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}. */ public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 9ad8a20cb1770..e6216c9cd569f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -199,7 +199,7 @@ public void scrollId(String scrollId) { * If profiling was enabled, this returns an object containing the profile results from * each shard. If profiling was not enabled, this will return null * - * @return The profile results or an empty map + * @return The profile results or an empty pendingExecutionsPerNode */ @Nullable public Map getProfileResults() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java index 5b44c9a473ebb..967b38e39918f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java @@ -90,7 +90,7 @@ public final int getNumReducePhases() { /** * Returns the profile results for this search response (including all shards). - * An empty map is returned if profiling was not enabled + * An empty pendingExecutionsPerNode is returned if profiling was not enabled * * @return Profile results */ diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index d2bb7e6e50d77..caaba68c1f0b2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -203,8 +203,8 @@ public RemoteClusterService getRemoteClusterService() { } /** - * Return a map of nodeId to pending number of search requests. - * This is a snapshot of the current pending search and not a live map. + * Return a pendingExecutionsPerNode of nodeId to pending number of search requests. + * This is a snapshot of the current pending search and not a live pendingExecutionsPerNode. */ public Map getPendingSearchRequests() { return new HashMap<>(clientConnections); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ac9248ef98d41..7151aed0d616a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -191,8 +190,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), - (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() - .getDataNodes().size(), SearchResponse.Clusters.EMPTY); + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { @@ -200,11 +198,9 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() - + clusterState.getNodes().getDataNodes().size(); SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, clusters); }, listener::onFailure)); } @@ -258,7 +254,7 @@ static BiFunction processRemoteShards(Map processRemoteShards(Map remoteClusterIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, - Map remoteAliasMap, ActionListener listener, int nodeCount, + Map remoteAliasMap, ActionListener listener, SearchResponse.Clusters clusters) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -340,15 +336,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea } return searchTransportService.getConnection(clusterName, discoveryNode); }; - if (searchRequest.isMaxConcurrentShardRequestsSet() == false) { - /* - * We try to set a default of max concurrent shard requests based on the node count but upper-bound it by 256 by default to keep - * it sane. A single search request that fans out to lots of shards should not hit a cluster too hard while 256 is already a - * lot. - */ - // we use nodeCount * 5 as we used to default this to the default number of shard which used to be 5. - searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount * 5)); - } boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 82e0fcaf5d667..c4042fffe1254 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -174,7 +174,8 @@ public void onFailure(Exception e) { } }; DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + // for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now + DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); AtomicInteger contextIdGenerator = new AtomicInteger(0); GroupShardsIterator shardsIter = getShardsIter("idx", @@ -242,7 +243,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return new SearchPhase("test") { @Override - public void run() throws IOException { + public void run() { latch.countDown(); } }; From 48f79f021c492419a747486630f6fded34d1465f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 8 Jun 2018 17:13:07 +0200 Subject: [PATCH 2/4] make sure we try to execute again after queueing to ensure we don't loose it in a race --- .../action/search/InitialSearchPhase.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 61738a34195d5..0bbe73844db92 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -200,6 +200,17 @@ private static final class PendingExecutions { } void finishAndRunNext() { semaphore.release(); + tryRun(); + } + + synchronized void add(Runnable runnable) { + if (queue == null) { // create this lazily + queue = new LinkedList<>(); + } + queue.add(runnable); + } + + void tryRun() { if (semaphore.tryAcquire()) { final Runnable poll; synchronized (this) { @@ -213,13 +224,6 @@ void finishAndRunNext() { } } - synchronized void add(Runnable runnable) { - if (queue == null) { // create this lazily - queue = new LinkedList<>(); - } - queue.add(runnable); - } - boolean tryAcquire() { return semaphore.tryAcquire(); } @@ -291,6 +295,8 @@ public void onFailure(Exception t) { } else { assert throttleConcurrentRequests; pendingExecutions.add(r); + // now try to run it again in case we had a race while the other request was giving the permit back while we added it. + pendingExecutions.tryRun(); } } } From 86bda1c5423f406c416a3ad078720c67752c8fb6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 8 Jun 2018 17:58:56 +0200 Subject: [PATCH 3/4] fix replace issue --- .../java/org/elasticsearch/action/search/SearchResponse.java | 2 +- .../elasticsearch/action/search/SearchResponseSections.java | 2 +- .../elasticsearch/action/search/SearchTransportService.java | 4 ++-- .../elasticsearch/action/search/TransportSearchAction.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index e6216c9cd569f..9ad8a20cb1770 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -199,7 +199,7 @@ public void scrollId(String scrollId) { * If profiling was enabled, this returns an object containing the profile results from * each shard. If profiling was not enabled, this will return null * - * @return The profile results or an empty pendingExecutionsPerNode + * @return The profile results or an empty map */ @Nullable public Map getProfileResults() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java index 967b38e39918f..5b44c9a473ebb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java @@ -90,7 +90,7 @@ public final int getNumReducePhases() { /** * Returns the profile results for this search response (including all shards). - * An empty pendingExecutionsPerNode is returned if profiling was not enabled + * An empty map is returned if profiling was not enabled * * @return Profile results */ diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index caaba68c1f0b2..d2bb7e6e50d77 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -203,8 +203,8 @@ public RemoteClusterService getRemoteClusterService() { } /** - * Return a pendingExecutionsPerNode of nodeId to pending number of search requests. - * This is a snapshot of the current pending search and not a live pendingExecutionsPerNode. + * Return a map of nodeId to pending number of search requests. + * This is a snapshot of the current pending search and not a live map. */ public Map getPendingSearchRequests() { return new HashMap<>(clientConnections); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 7151aed0d616a..52c3be8789416 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -254,7 +254,7 @@ static BiFunction processRemoteShards(Map Date: Fri, 8 Jun 2018 18:37:14 +0200 Subject: [PATCH 4/4] address comments --- .../migration/migrate_7_0/search.asciidoc | 6 ++ .../action/search/InitialSearchPhase.java | 77 ++++++++++--------- 2 files changed, 45 insertions(+), 38 deletions(-) diff --git a/docs/reference/migration/migrate_7_0/search.asciidoc b/docs/reference/migration/migrate_7_0/search.asciidoc index 53a7d88394bb9..123ad201cbbaf 100644 --- a/docs/reference/migration/migrate_7_0/search.asciidoc +++ b/docs/reference/migration/migrate_7_0/search.asciidoc @@ -84,3 +84,9 @@ for a particular index with the index setting `index.max_regex_length`. Search requests with extra content after the main object will no longer be accepted by the `_search` endpoint. A parsing exception will be thrown instead. + +==== Semantics changed for `max_concurrent_shard_requests` + +`max_concurrent_shard_requests` used to limit the total number of concurrent shard +requests a single high level search request can execute. In 7.0 this changed to be the +max number of concurrent shard requests per node. The default is now `5`. diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 0bbe73844db92..742ad995a8a50 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -30,14 +30,12 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -193,39 +191,45 @@ public boolean isForceExecution() { } private static final class PendingExecutions { - private final Semaphore semaphore; - private Queue queue; + private final int permits; + private int permitsTaken = 0; + private ArrayDeque queue = new ArrayDeque<>(); + PendingExecutions(int permits) { - this.semaphore = new Semaphore(permits); + assert permits > 0 : "not enough permits: " + permits; + this.permits = permits; } + void finishAndRunNext() { - semaphore.release(); - tryRun(); + synchronized (this) { + permitsTaken--; + assert permitsTaken >= 0 : "illegal taken permits: " + permitsTaken; + } + tryRun(null); } - synchronized void add(Runnable runnable) { - if (queue == null) { // create this lazily - queue = new LinkedList<>(); + void tryRun(Runnable runnable) { + Runnable r = tryQueue(runnable); + if (r != null) { + r.run(); } - queue.add(runnable); } - void tryRun() { - if (semaphore.tryAcquire()) { - final Runnable poll; - synchronized (this) { - poll = queue != null ? queue.poll() : null; + private synchronized Runnable tryQueue(Runnable runnable) { + Runnable toExecute = null; + if (permitsTaken < permits) { + permitsTaken++; + toExecute = runnable; + if (toExecute == null) { // only poll if we don't have anything to execute + toExecute = queue.poll(); } - if (poll != null) { - poll.run(); - } else { - semaphore.release(); + if (toExecute == null) { + permitsTaken--; } + } else if (runnable != null) { + queue.add(runnable); } - } - - boolean tryAcquire() { - return semaphore.tryAcquire(); + return toExecute; } } @@ -247,16 +251,16 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise * we can continue (cf. InitialSearchPhase#maybeFork). */ - final Thread thread = Thread.currentThread(); if (shard == null) { fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { - final PendingExecutions pendingExecutions = throttleConcurrentRequests ? pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), - n -> new PendingExecutions(maxConcurrentRequestsPerNode)) : null; + final PendingExecutions pendingExecutions = throttleConcurrentRequests ? + pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) + : null; Runnable r = () -> { + final Thread thread = Thread.currentThread(); try { - executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard - .currentNodeId(), + executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { @@ -270,7 +274,7 @@ public void innerOnResponse(FirstResult result) { @Override public void onFailure(Exception t) { try { - maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); + onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); } finally { executeNext(pendingExecutions, thread); } @@ -281,8 +285,8 @@ public void onFailure(Exception t) { } catch (final Exception e) { try { /* - * It is possible to run into connection exceptions here because we are getting the connection early and might run in to - * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. + * It is possible to run into connection exceptions here because we are getting the connection early and might + * run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); } finally { @@ -290,13 +294,11 @@ public void onFailure(Exception t) { } } }; - if (pendingExecutions == null || pendingExecutions.tryAcquire()) { + if (pendingExecutions == null) { r.run(); } else { assert throttleConcurrentRequests; - pendingExecutions.add(r); - // now try to run it again in case we had a race while the other request was giving the permit back while we added it. - pendingExecutions.tryRun(); + pendingExecutions.tryRun(r); } } } @@ -443,5 +445,4 @@ protected void skipShard(SearchShardIterator iterator) { assert iterator.skip(); successfulShardExecution(iterator); } - }