Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of concurrent requests per node #31206

Merged
merged 5 commits into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference/migration/migrate_7_0/search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ for a particular index with the index setting `index.max_regex_length`.

Search requests with extra content after the main object will no longer be accepted
by the `_search` endpoint. A parsing exception will be thrown instead.

==== Semantics changed for `max_concurrent_shard_requests`

`max_concurrent_shard_requests` used to limit the total number of concurrent shard
requests a single high level search request can execute. In 7.0 this changed to be the
max number of concurrent shard requests per node. The default is now `5`.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@
},
"max_concurrent_shard_requests" : {
"type" : "number",
"description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
"default" : "The default grows with the number of nodes in the cluster but is at most 256."
"description" : "The number of concurrent shard requests per node this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
"default" : "The default is 5."
},
"pre_filter_shard_size" : {
"type" : "number",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
this.timeProvider = timeProvider;
this.logger = logger;
this.searchTransportService = searchTransportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ private SearchRequest buildExpandSearchRequest(SearchRequest orig, SearchSourceB
if (orig.allowPartialSearchResults() != null){
groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults());
}
if (orig.isMaxConcurrentShardRequestsSet()) {
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
}
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
return groupRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand All @@ -52,12 +54,13 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
private final int maxConcurrentShardRequests;
private final int maxConcurrentRequestsPerNode;
private final Executor executor;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger,
int maxConcurrentShardRequests, Executor executor) {
int maxConcurrentRequestsPerNode, Executor executor) {
super(name);
this.request = request;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand All @@ -77,7 +80,9 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
// we process hence we add one for the non active partition here.
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size());
this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size());
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
this.executor = executor;
}

Expand Down Expand Up @@ -109,7 +114,6 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
if (!lastShard) {
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
maybeExecuteNext(); // move to the next execution if needed
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
Expand All @@ -123,15 +127,12 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
}

@Override
public final void run() throws IOException {
public final void run() {
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
}
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -152,22 +153,14 @@ public final void run() throws IOException {
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
for (int index = 0; index < maxConcurrentShardRequests; index++) {
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}

private void maybeExecuteNext() {
final int index = shardExecutionIndex.getAndIncrement();
if (index < shardsIts.size()) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}


private void maybeFork(final Thread thread, final Runnable runnable) {
if (thread == Thread.currentThread()) {
Expand Down Expand Up @@ -197,6 +190,59 @@ public boolean isForceExecution() {
});
}

private static final class PendingExecutions {
private final int permits;
private int permitsTaken = 0;
private ArrayDeque<Runnable> queue = new ArrayDeque<>();

PendingExecutions(int permits) {
assert permits > 0 : "not enough permits: " + permits;
this.permits = permits;
}

void finishAndRunNext() {
synchronized (this) {
permitsTaken--;
assert permitsTaken >= 0 : "illegal taken permits: " + permitsTaken;
}
tryRun(null);
}

void tryRun(Runnable runnable) {
Runnable r = tryQueue(runnable);
if (r != null) {
r.run();
}
}

private synchronized Runnable tryQueue(Runnable runnable) {
Runnable toExecute = null;
if (permitsTaken < permits) {
permitsTaken++;
toExecute = runnable;
if (toExecute == null) { // only poll if we don't have anything to execute
toExecute = queue.poll();
}
if (toExecute == null) {
permitsTaken--;
}
} else if (runnable != null) {
queue.add(runnable);
}
return toExecute;
}
}

private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) {
if (pendingExecutions != null) {
assert throttleConcurrentRequests;
maybeFork(originalThread, pendingExecutions::finishAndRunNext);
} else {
assert throttleConcurrentRequests == false;
}
}


private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
Expand All @@ -205,29 +251,54 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
* could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
final Thread thread = Thread.currentThread();
if (shard == null) {
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
maybeFork(thread, () -> onShardResult(result, shardIt));
}
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
: null;
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
try {
onShardResult(result, shardIt);
} finally {
executeNext(pendingExecutions, thread);
}
}

@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
}
});


} catch (final Exception e) {
try {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
});
} catch (final Exception e) {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might run in to
* nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
}
};
if (pendingExecutions == null) {
r.run();
} else {
assert throttleConcurrentRequests;
pendingExecutions.tryRun(r);
}
}
}
Expand Down Expand Up @@ -257,8 +328,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else if (shardsIt.skip() == false) {
maybeExecuteNext();
}
}

Expand Down Expand Up @@ -376,5 +445,4 @@ protected void skipShard(SearchShardIterator iterator) {
assert iterator.skip();
successfulShardExecution(iterator);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Boolean requestCache;

private Boolean allowPartialSearchResults;


private Scroll scroll;

private int batchedReduceSize = 512;
Expand Down Expand Up @@ -140,7 +140,7 @@ public SearchRequest(StreamInput in) throws IOException {
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
}
}

@Override
Expand All @@ -165,7 +165,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
}
}

@Override
Expand Down Expand Up @@ -362,7 +362,7 @@ public SearchRequest requestCache(Boolean requestCache) {
public Boolean requestCache() {
return this.requestCache;
}

/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
Expand All @@ -374,8 +374,8 @@ public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults

public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}
}


/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
Expand All @@ -397,18 +397,18 @@ public int getBatchedReduceSize() {
}

/**
* Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Returns the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
*/
public int getMaxConcurrentShardRequests() {
return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests;
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
}

/**
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
*/
public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
if (maxConcurrentShardRequests < 1) {
Expand Down Expand Up @@ -510,7 +510,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) {
request.requestCache(requestCache);
return this;
}


/**
* Sets if this request should allow partial results. (If method is not called,
Expand All @@ -509,7 +509,7 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) {
public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) {
request.allowPartialSearchResults(allowPartialSearchResults);
return this;
}
}

/**
* Should the query be profiled. Defaults to <code>false</code>
Expand Down Expand Up @@ -549,9 +549,9 @@ public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) {
}

/**
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}.
*/
public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
Expand Down
Loading