diff --git a/docs/reference/migration/migrate_7_0/search.asciidoc b/docs/reference/migration/migrate_7_0/search.asciidoc index 53a7d88394bb9..123ad201cbbaf 100644 --- a/docs/reference/migration/migrate_7_0/search.asciidoc +++ b/docs/reference/migration/migrate_7_0/search.asciidoc @@ -84,3 +84,9 @@ for a particular index with the index setting `index.max_regex_length`. Search requests with extra content after the main object will no longer be accepted by the `_search` endpoint. A parsing exception will be thrown instead. + +==== Semantics changed for `max_concurrent_shard_requests` + +`max_concurrent_shard_requests` used to limit the total number of concurrent shard +requests a single high level search request can execute. In 7.0 this changed to be the +max number of concurrent shard requests per node. The default is now `5`. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index af2b3104a93f5..751c2797b9deb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -175,8 +175,8 @@ }, "max_concurrent_shard_requests" : { "type" : "number", - "description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", - "default" : "The default grows with the number of nodes in the cluster but is at most 256." + "description" : "The number of concurrent shard requests per node this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", + "default" : "The default is 5." }, "pre_filter_shard_size" : { "type" : "number", diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 91aec1171dcd6..8aa847f753682 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -79,9 +79,9 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests, + SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters) { - super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor); + super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java index d9ed6e6792f85..a6a99137dc945 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java @@ -131,9 +131,7 @@ private SearchRequest buildExpandSearchRequest(SearchRequest orig, SearchSourceB if (orig.allowPartialSearchResults() != null){ groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults()); } - if (orig.isMaxConcurrentShardRequestsSet()) { - groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests()); - } + groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests()); return groupRequest; } diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index a3be7c39affe8..742ad995a8a50 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -30,9 +30,11 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -52,12 +54,13 @@ abstract class InitialSearchPhase extends private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); - private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); - private final int maxConcurrentShardRequests; + private final int maxConcurrentRequestsPerNode; private final Executor executor; + private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); + private final boolean throttleConcurrentRequests; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, - int maxConcurrentShardRequests, Executor executor) { + int maxConcurrentRequestsPerNode, Executor executor) { super(name); this.request = request; final List toSkipIterators = new ArrayList<>(); @@ -77,7 +80,9 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); + this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size()); + // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle + this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size(); this.executor = executor; } @@ -109,7 +114,6 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, if (!lastShard) { performPhaseOnShard(shardIndex, shardIt, nextShard); } else { - maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -123,15 +127,12 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, } @Override - public final void run() throws IOException { + public final void run() { for (final SearchShardIterator iterator : toSkipShardsIts) { assert iterator.skip(); skipShard(iterator); } if (shardsIts.size() > 0) { - int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size()); - final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); - assert success; assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); @@ -152,7 +153,7 @@ public final void run() throws IOException { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - for (int index = 0; index < maxConcurrentShardRequests; index++) { + for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); assert shardRoutings.skip() == false; performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); @@ -160,14 +161,6 @@ public final void run() throws IOException { } } - private void maybeExecuteNext() { - final int index = shardExecutionIndex.getAndIncrement(); - if (index < shardsIts.size()) { - final SearchShardIterator shardRoutings = shardsIts.get(index); - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); - } - } - private void maybeFork(final Thread thread, final Runnable runnable) { if (thread == Thread.currentThread()) { @@ -197,6 +190,59 @@ public boolean isForceExecution() { }); } + private static final class PendingExecutions { + private final int permits; + private int permitsTaken = 0; + private ArrayDeque queue = new ArrayDeque<>(); + + PendingExecutions(int permits) { + assert permits > 0 : "not enough permits: " + permits; + this.permits = permits; + } + + void finishAndRunNext() { + synchronized (this) { + permitsTaken--; + assert permitsTaken >= 0 : "illegal taken permits: " + permitsTaken; + } + tryRun(null); + } + + void tryRun(Runnable runnable) { + Runnable r = tryQueue(runnable); + if (r != null) { + r.run(); + } + } + + private synchronized Runnable tryQueue(Runnable runnable) { + Runnable toExecute = null; + if (permitsTaken < permits) { + permitsTaken++; + toExecute = runnable; + if (toExecute == null) { // only poll if we don't have anything to execute + toExecute = queue.poll(); + } + if (toExecute == null) { + permitsTaken--; + } + } else if (runnable != null) { + queue.add(runnable); + } + return toExecute; + } + } + + private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) { + if (pendingExecutions != null) { + assert throttleConcurrentRequests; + maybeFork(originalThread, pendingExecutions::finishAndRunNext); + } else { + assert throttleConcurrentRequests == false; + } + } + + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the @@ -205,29 +251,54 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise * we can continue (cf. InitialSearchPhase#maybeFork). */ - final Thread thread = Thread.currentThread(); if (shard == null) { fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { - try { - executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), - shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { - @Override - public void innerOnResponse(FirstResult result) { - maybeFork(thread, () -> onShardResult(result, shardIt)); - } + final PendingExecutions pendingExecutions = throttleConcurrentRequests ? + pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) + : null; + Runnable r = () -> { + final Thread thread = Thread.currentThread(); + try { + executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), + shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { + @Override + public void innerOnResponse(FirstResult result) { + try { + onShardResult(result, shardIt); + } finally { + executeNext(pendingExecutions, thread); + } + } - @Override - public void onFailure(Exception t) { - maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); + @Override + public void onFailure(Exception t) { + try { + onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); + } finally { + executeNext(pendingExecutions, thread); + } + } + }); + + + } catch (final Exception e) { + try { + /* + * It is possible to run into connection exceptions here because we are getting the connection early and might + * run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy. + */ + fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); + } finally { + executeNext(pendingExecutions, thread); } - }); - } catch (final Exception e) { - /* - * It is possible to run into connection exceptions here because we are getting the connection early and might run in to - * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. - */ - fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); + } + }; + if (pendingExecutions == null) { + r.run(); + } else { + assert throttleConcurrentRequests; + pendingExecutions.tryRun(r); } } } @@ -257,8 +328,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } else if (xTotalOps > expectedTotalOps) { throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"); - } else if (shardsIt.skip() == false) { - maybeExecuteNext(); } } @@ -376,5 +445,4 @@ protected void skipShard(SearchShardIterator iterator) { assert iterator.skip(); successfulShardExecution(iterator); } - } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 6149503a08d25..a390538ec2978 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -75,8 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private Boolean requestCache; private Boolean allowPartialSearchResults; - - + + private Scroll scroll; private int batchedReduceSize = 512; @@ -140,7 +140,7 @@ public SearchRequest(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_6_3_0)) { allowPartialSearchResults = in.readOptionalBoolean(); - } + } } @Override @@ -165,7 +165,7 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeOptionalBoolean(allowPartialSearchResults); - } + } } @Override @@ -362,7 +362,7 @@ public SearchRequest requestCache(Boolean requestCache) { public Boolean requestCache() { return this.requestCache; } - + /** * Sets if this request should allow partial results. (If method is not called, * will default to the cluster level setting). @@ -374,8 +374,8 @@ public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults public Boolean allowPartialSearchResults() { return this.allowPartialSearchResults; - } - + } + /** * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection @@ -397,18 +397,18 @@ public int getBatchedReduceSize() { } /** - * Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Returns the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5} */ public int getMaxConcurrentShardRequests() { - return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests; + return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests; } /** - * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5} */ public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { if (maxConcurrentShardRequests < 1) { @@ -510,7 +510,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, + scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, allowPartialSearchResults); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 22b0b0c44d657..821a7d7be7f73 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -500,7 +500,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { request.requestCache(requestCache); return this; } - + /** * Sets if this request should allow partial results. (If method is not called, @@ -509,7 +509,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) { request.allowPartialSearchResults(allowPartialSearchResults); return this; - } + } /** * Should the query be profiled. Defaults to false @@ -549,9 +549,9 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) { } /** - * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to - * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled - * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}. + * Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a + * protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire + * cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}. */ public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ac9248ef98d41..52c3be8789416 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -191,8 +190,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), - (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() - .getDataNodes().size(), SearchResponse.Clusters.EMPTY); + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { @@ -200,11 +198,9 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() - + clusterState.getNodes().getDataNodes().size(); SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, clusters); }, listener::onFailure)); } @@ -280,7 +276,7 @@ static BiFunction processRemoteShards(Map remoteClusterIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, - Map remoteAliasMap, ActionListener listener, int nodeCount, + Map remoteAliasMap, ActionListener listener, SearchResponse.Clusters clusters) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -340,15 +336,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea } return searchTransportService.getConnection(clusterName, discoveryNode); }; - if (searchRequest.isMaxConcurrentShardRequestsSet() == false) { - /* - * We try to set a default of max concurrent shard requests based on the node count but upper-bound it by 256 by default to keep - * it sane. A single search request that fans out to lots of shards should not hit a cluster too hard while 256 is already a - * lot. - */ - // we use nodeCount * 5 as we used to default this to the default number of shard which used to be 5. - searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount * 5)); - } boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 82e0fcaf5d667..c4042fffe1254 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -174,7 +174,8 @@ public void onFailure(Exception e) { } }; DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + // for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now + DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); AtomicInteger contextIdGenerator = new AtomicInteger(0); GroupShardsIterator shardsIter = getShardsIter("idx", @@ -242,7 +243,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return new SearchPhase("test") { @Override - public void run() throws IOException { + public void run() { latch.countDown(); } };