Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always rewrite search shard request outside of the search thread pool #51708

Merged
merged 12 commits into from
Feb 6, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,32 @@ setup:
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- length: { aggregations.idx_terms.buckets: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,6 +83,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, Set<String>> indexRoutings;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final SearchTimeProvider timeProvider;
Expand Down Expand Up @@ -467,6 +469,7 @@ private void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
results.consumeResult(result);
hasShardResponse.set(true);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
}
Expand Down Expand Up @@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get());
return shardRequest;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
Expand All @@ -65,6 +66,7 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

public final class SearchPhaseController {

Expand Down Expand Up @@ -427,6 +429,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
int total = queryResults.size();
queryResults = queryResults.stream()
.filter(res -> res.queryResult().isNull() == false)
.collect(Collectors.toList());
String errorMsg = "must have at least one non-empty search result, got 0 out of " + total;
assert queryResults.isEmpty() == false : errorMsg;
if (queryResults.isEmpty()) {
throw new IllegalStateException(errorMsg);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
Expand Down Expand Up @@ -497,6 +508,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/*
* Returns the size of the requested top documents (from + size)
*/
static int getTopDocsSize(SearchRequest request) {
if (request.source() == null) {
return SearchService.DEFAULT_SIZE;
}
SearchSourceBuilder source = request.source();
return (source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size()) +
(source.from() == -1 ? SearchService.DEFAULT_FROM : source.from());
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
Expand Down Expand Up @@ -576,6 +599,7 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
private final SearchProgressListener progressListener;
private int numReducePhases = 0;
private final TopDocsStats topDocsStats;
private final int topNSize;
private final boolean performFinalReduce;

/**
Expand All @@ -589,7 +613,7 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
*/
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, boolean performFinalReduce) {
int trackTotalHitsUpTo, int topNSize, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
Expand All @@ -610,6 +634,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;
this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo);
this.topNSize = topNSize;
this.performFinalReduce = performFinalReduce;
}

Expand All @@ -622,36 +647,38 @@ public void consumeResult(SearchPhaseResult result) {
}

private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (querySearchResult.isNull() == false) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
topNSize, 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
Expand Down Expand Up @@ -706,9 +733,10 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
int topNSize = getTopDocsSize(request);
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, request.isFinalReduce());
trackTotalHitsUpTo, topNSize, request.isFinalReduce());
}
}
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
Expand All @@ -731,7 +759,7 @@ ReducedQueryPhase reduce() {

static final class TopDocsStats {
final int trackTotalHitsUpTo;
private long totalHits;
long totalHits;
private TotalHits.Relation totalHitsRelation;
long fetchHits;
private float maxScore = Float.NEGATIVE_INFINITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop
markSearcherAccessed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
return wrapSearcher(searcher);
}

/**
* Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}.
*/
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;
Expand Down
Loading