From eb69c6fe7cf5620e1726ceecb77b35cd0f4e016d Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 6 Feb 2020 08:55:20 +0100 Subject: [PATCH] Always rewrite search shard request outside of the search thread pool (#51708) This change ensures that the rewrite of the shard request is executed in the network thread or in the refresh listener when waiting for an active shard. This allows queries that rewrite to match_no_docs to bypass the search thread pool entirely even if the can_match phase was skipped (pre_filter_shard_size > number of shards). Coordinating nodes don't have the ability to create empty responses so this change also ensures that at least one shard creates a full empty response while the other can return null ones. This is needed since creating true empty responses on shards require to create concrete aggregators which would be too costly to build on a network thread. We should move this functionality to aggregation builders in a follow up but that would be a much bigger change. This change is also important for #49601 since we want to add the ability to use the result of other shards to rewrite the request of subsequent ones. For instance if the first M shards have their top N computed, the top worst document in the global queue can be pass to subsequent shards that can then rewrite to match_no_docs if they can guarantee that they don't have any document better than the provided one. --- .../search/140_pre_filter_search_shards.yml | 29 ++ .../search/AbstractSearchAsyncAction.java | 10 +- .../action/search/SearchPhaseController.java | 86 ++++-- .../elasticsearch/index/shard/IndexShard.java | 7 + .../elasticsearch/search/SearchService.java | 200 ++++++++++--- .../search/internal/ShardSearchRequest.java | 26 +- .../search/query/QuerySearchResult.java | 55 +++- .../search/SearchPhaseControllerTests.java | 61 +++- .../indices/IndicesRequestCacheIT.java | 13 +- .../search/SearchServiceTests.java | 268 +++++++++++++----- .../internal/ShardSearchRequestTests.java | 6 +- .../search/query/QuerySearchResultTests.java | 7 + .../index/engine/FrozenIndexTests.java | 27 +- 13 files changed, 625 insertions(+), 170 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml index e9fb959406e0e..636f3d88e10cf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml @@ -153,3 +153,32 @@ setup: - match: { _shards.failed: 0 } - match: { hits.total: 2 } - length: { aggregations.idx_terms.buckets: 2 } + + # check that empty responses are correctly handled when rewriting to match_no_docs + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 1 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 2 } + - length: { aggregations.idx_terms.buckets: 2 } + + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 2 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 0 } + - length: { aggregations.idx_terms.buckets: 0 } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index ca68bb4008146..d4d313c0afab1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -82,6 +83,7 @@ abstract class AbstractSearchAsyncAction exten private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); + private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); private final SearchTimeProvider timeProvider; @@ -467,6 +469,7 @@ private void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getSearchShardTarget() != null : "search shard target must not be null"; successfulOps.incrementAndGet(); results.consumeResult(result); + hasShardResponse.set(true); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } @@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar String indexName = shardIt.shardId().getIndex().getName(); final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) .toArray(new String[0]); - return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings); + // if we already received a search result we can inform the shard that it + // can return a null response if the request rewrites to match none rather + // than creating an empty response in the search thread pool. + shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get()); + return shardRequest; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 27b5c9cf3b2a8..59a5082ffe922 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -65,6 +66,7 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.stream.Collectors; public final class SearchPhaseController { @@ -427,6 +429,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection res.queryResult().isNull() == false) + .collect(Collectors.toList()); + String errorMsg = "must have at least one non-empty search result, got 0 out of " + total; + assert queryResults.isEmpty() == false : errorMsg; + if (queryResults.isEmpty()) { + throw new IllegalStateException(errorMsg); + } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); @@ -497,6 +508,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection= 2 if there is more than one expected result"); @@ -610,6 +634,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.hasAggs = hasAggs; this.bufferSize = bufferSize; this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo); + this.topNSize = topNSize; this.performFinalReduce = performFinalReduce; } @@ -622,36 +647,38 @@ public void consumeResult(SearchPhaseResult result) { } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - if (index == bufferSize) { + if (querySearchResult.isNull() == false) { + if (index == bufferSize) { + if (hasAggs) { + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), + // we have to merge here in the same way we collect on a shard + topNSize, 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } + numReducePhases++; + index = 1; + if (hasAggs) { + progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), + topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); + } + } + final int i = index++; if (hasAggs) { - ReduceContext reduceContext = controller.reduceContextFunction.apply(false); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); - Arrays.fill(aggsBuffer, null); - aggsBuffer[0] = reducedAggs; + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); } if (hasTopDocs) { - TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), - // we have to merge here in the same way we collect on a shard - querySearchResult.from() + querySearchResult.size(), 0); - Arrays.fill(topDocsBuffer, null); - topDocsBuffer[0] = reducedTopDocs; + final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); + setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs.topDocs; } - numReducePhases++; - index = 1; - if (hasAggs) { - progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), - topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); - } - } - final int i = index++; - if (hasAggs) { - aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); - } - if (hasTopDocs) { - final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null - topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); - setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); - topDocsBuffer[i] = topDocs.topDocs; } processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } @@ -706,9 +733,10 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { + int topNSize = getTopDocsSize(request); // only use this if there are aggs and if there are more shards than we should reduce at once return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, - trackTotalHitsUpTo, request.isFinalReduce()); + trackTotalHitsUpTo, topNSize, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { @@ -731,7 +759,7 @@ ReducedQueryPhase reduce() { static final class TopDocsStats { final int trackTotalHitsUpTo; - private long totalHits; + long totalHits; private TotalHits.Relation totalHitsRelation; long fetchHits; private float maxScore = Float.NEGATIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 759feb1d7c68e..7ab04ab6d48b3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1210,6 +1210,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); + return wrapSearcher(searcher); + } + + /** + * Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}. + */ + public Engine.Searcher wrapSearcher(Engine.Searcher searcher) { assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader"; boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f65ccf8d6638d..c5483e1b2740d 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -308,11 +308,31 @@ protected void doClose() { } public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); + rewriteShardRequest(request, ActionListener.wrap( + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context -> { + try { + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeDfsPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); } - private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { - final SearchContext context = createAndPutContext(request); + private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException { + final SearchContext context = createAndPutContext(rewriteContext); context.incRef(); try { context.setTask(task); @@ -343,15 +363,59 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); + assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 + : "empty responses require more than one shard"; + rewriteShardRequest(request, ActionListener.wrap( + context -> { + try { + ShardSearchRequest rewritten = context.request; + if (rewritten.canReturnNullResponseIfMatchNoDocs() + && canRewriteToMatchNone(rewritten.source()) + && rewritten.source().query() instanceof MatchNoneQueryBuilder) { + onMatchNoDocs(context, listener); + } else { + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeQueryPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); + } + + private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener listener) { + // creates a lightweight search context that we use to inform context listeners + // before closing + SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout); + try (searchContext) { + onNewContext(searchContext); + onFreeContext(searchContext); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(QuerySearchResult.nullInstance()); } private void runAsync(long id, Supplier executable, ActionListener listener) { getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { - final SearchContext context = createAndPutContext(request); + private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception { + final SearchContext context = createAndPutContext(rewriteContext); + final ShardSearchRequest request = rewriteContext.request; context.incRef(); try { context.setTask(task); @@ -542,15 +606,8 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear } } - final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException { - if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) { - throw new ElasticsearchException( - "Trying to create too many scroll contexts. Must be less than or equal to: [" + - maxOpenScrollContext + "]. " + "This limit can be set by changing the [" - + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); - } - - SearchContext context = createContext(request); + final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) { + SearchContext context = createContext(rewriteContext); onNewContext(context); boolean success = false; try { @@ -584,9 +641,16 @@ private void onNewContext(SearchContext context) { } } - final SearchContext createContext(ShardSearchRequest request) throws IOException { - final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); + final SearchContext createContext(SearchRewriteContext rewriteContext) { + final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout); try { + if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) { + throw new ElasticsearchException( + "Trying to create too many scroll contexts. Must be less than or equal to: [" + + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); + } + final ShardSearchRequest request = rewriteContext.request; if (request.scroll() != null) { context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); @@ -622,41 +686,32 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException } public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { - return createSearchContext(request, timeout, true, "search"); + IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + // make sure that we wrap the searcher when executing the query + return createSearchContext(rewriteContext.wrapSearcher(), timeout); } - private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, - boolean assertAsyncActions, String source) - throws IOException { + private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) { + final ShardSearchRequest request = rewriteContext.request; + final Engine.Searcher searcher = rewriteContext.searcher; IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); - Engine.Searcher searcher = indexShard.acquireSearcher(source); - boolean success = false; - DefaultSearchContext searchContext = null; try { - searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, + DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); - // we clone the query shard context here just for rewriting otherwise we - // might end up with incorrect state since we are using now() or script services - // during rewrite and normalized / evaluate templates etc. - QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext()); - Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions); - assert searchContext.getQueryShardContext().isCacheable(); success = true; + return searchContext; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(searchContext); - if (searchContext == null) { - // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise - // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). - IOUtils.closeWhileHandlingException(searcher); - } + // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise + // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). + IOUtils.closeWhileHandlingException(rewriteContext.searcher); } } - return searchContext; } private void freeAllContextForIndex(Index index) { @@ -1062,24 +1117,50 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { return aggregations == null || aggregations.mustVisitAllDocs() == false; } - /* * Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously - * The action listener is guaranteed to be executed on the search thread-pool + * and then rewrites with a searcher when the shard is active. + * The provided action listener is executed on the same thread or in a listener threadpool. */ - private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); - Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register - shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))), - listener::onFailure); + shard.awaitShardSearchActive(b -> { + try { + // we can now acquire a searcher and rewrite the request with it + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + listener.onResponse(rewriteContext); + } catch (Exception e) { + listener.onFailure(e); + } + }), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); } + SearchRewriteContext acquireSearcherAndRewrite(ShardSearchRequest request, IndexShard shard) throws IOException { + // acquire the searcher for rewrite with no wrapping in order to avoid costly + // operations. We'll wrap the searcher at a later stage (when executing the query). + Engine.Searcher searcher = shard.acquireSearcherNoWrap("search"); + boolean success = false; + try { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher, + request::nowInMillis, request.getClusterAlias()); + Rewriteable.rewrite(request.getRewriteable(), context, true); + SearchRewriteContext rewrite = new SearchRewriteContext(request, shard, searcher, getExecutor(shard)); + success = true; + return rewrite; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(searcher); + } + } + } + /** * Returns a new {@link QueryRewriteContext} with the given {@code now} provider */ @@ -1096,6 +1177,37 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce); } + static class SearchRewriteContext { + private final ShardSearchRequest request; + private final IndexShard shard; + private Engine.Searcher searcher; + private final Executor executor; + + private boolean isWrapped; + + private SearchRewriteContext(ShardSearchRequest request, + IndexShard shard, + Engine.Searcher searcher, + Executor executor) { + this.request = request; + this.shard = shard; + this.searcher = searcher; + this.executor = executor; + } + + SearchRewriteContext wrapSearcher() { + assert isWrapped == false : "searcher already wrapped"; + isWrapped = true; + searcher = shard.wrapSearcher(searcher); + return this; + } + + void execute(Runnable runnable) { + assert isWrapped : "searcher is not wrapped"; + executor.execute(runnable); + } + } + public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; private final MinAndMax minAndMax; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 27543a2285d32..f46be3755cee5 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -47,6 +47,7 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; @@ -75,7 +76,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final String preference; private final OriginalIndices originalIndices; - //these are the only two mutable fields, as they are subject to rewriting + private boolean canReturnNullResponseIfMatchNoDocs; + + //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; private SearchSourceBuilder source; @@ -167,6 +170,11 @@ public ShardSearchRequest(StreamInput in) throws IOException { allowPartialSearchResults = in.readBoolean(); indexRoutings = in.readStringArray(); preference = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + canReturnNullResponseIfMatchNoDocs = in.readBoolean(); + } else { + canReturnNullResponseIfMatchNoDocs = false; + } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -201,6 +209,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeStringArray(indexRoutings); out.writeOptionalString(preference); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(canReturnNullResponseIfMatchNoDocs); + } } @Override @@ -275,6 +286,19 @@ public String preference() { return preference; } + /** + * Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}. + * Defaults to false since the coordinator node needs at least one shard response to build the global + * response. + */ + public boolean canReturnNullResponseIfMatchNoDocs() { + return canReturnNullResponseIfMatchNoDocs; + } + + public void canReturnNullResponseIfMatchNoDocs(boolean value) { + this.canReturnNullResponseIfMatchNoDocs = value; + } + /** * Returns the cache key for this shard search request, based on its content */ diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 3d49c96d56119..3151dadadde6b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -63,18 +63,56 @@ public final class QuerySearchResult extends SearchPhaseResult { private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; + private final boolean isNull; + public QuerySearchResult() { + this(false); } public QuerySearchResult(StreamInput in) throws IOException { super(in); - long id = in.readLong(); - readFromWithId(id, in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + isNull = in.readBoolean(); + } else { + isNull = false; + } + if (isNull == false) { + long id = in.readLong(); + readFromWithId(id, in); + } } public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); + isNull = false; + } + + private QuerySearchResult(boolean isNull) { + this.isNull = isNull; + } + + private static final QuerySearchResult nullInstance = new QuerySearchResult(true); + + /** + * Returns an instance that contains no response. + */ + public static QuerySearchResult nullInstance() { + return nullInstance; + } + + /** + * Returns true if the result doesn't contain any useful information. + * It is used by the search action to avoid creating an empty response on + * shard request that rewrites to match_no_docs. + * + * TODO: Currently we need the concrete aggregators to build empty responses. This means that we cannot + * build an empty response in the coordinating node so we rely on this hack to ensure that at least one shard + * returns a valid empty response. We should move the ability to create empty responses to aggregation builders + * in order to allow building empty responses directly from the coordinating node. + */ + public boolean isNull() { + return isNull; } @Override @@ -173,6 +211,10 @@ public void aggregations(InternalAggregations aggregations) { hasAggs = aggregations != null; } + public InternalAggregations aggregations() { + return aggregations; + } + /** * Returns and nulls out the profiled results for this search, or potentially null if result was empty. * This allows to free up memory once the profiled result is consumed. @@ -300,8 +342,13 @@ public void readFromWithId(long id, StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(requestId); - writeToNoId(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(isNull); + } + if (isNull == false) { + out.writeLong(requestId); + writeToNoId(out); + } } public void writeToNoId(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 641d5bf2c59b4..f49d3a69caca0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -338,16 +338,37 @@ private static SearchRequest randomSearchRequest() { } public void testConsumer() { + consumerTestCase(0); + } + + public void testConsumerWithEmptyResponse() { + consumerTestCase(randomIntBetween(1, 5)); + } + + private void consumerTestCase(int numEmptyResponses) { + int numShards = 3 + numEmptyResponses; int bufferSize = randomIntBetween(2, 3); SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3); - assertEquals(0, reductions.size()); + ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(NOOP, request, 3+numEmptyResponses); + if (numEmptyResponses == 0) { + assertEquals(0, reductions.size()); + } + if (numEmptyResponses > 0) { + QuerySearchResult empty = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + empty.setShardIndex(2+numEmptyResponses); + empty.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(empty); + numEmptyResponses --; + } + QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -356,7 +377,7 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -365,20 +386,38 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(1); consumer.consumeResult(result); + + while (numEmptyResponses > 0) { + result = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + result.setShardIndex(shardId); + result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(result); + numEmptyResponses--; + + } + final int numTotalReducePhases; - if (bufferSize == 2) { + if (numShards > bufferSize) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); - assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases()); - assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); - assertEquals(1, reductions.size()); - assertEquals(false, reductions.get(0)); - numTotalReducePhases = 2; + if (bufferSize == 2) { + assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(1, reductions.size()); + assertEquals(false, reductions.get(0)); + numTotalReducePhases = 2; + } else { + assertEquals(0, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(3, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(0, reductions.size()); + numTotalReducePhases = 1; + } } else { assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); assertEquals(0, reductions.size()); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java index 3ea41962e4855..ca660aa1a2134 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; @@ -123,20 +124,26 @@ public void testQueryRewrite() throws Exception { assertCacheState(client, "index", 0, 0); final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE).get(); + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) + // to ensure that query is executed even if it rewrites to match_no_docs + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 0, 5); final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .addAggregation(new GlobalAggregationBuilder("global")) .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 3, 7); final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")) + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE) .get(); ElasticsearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(7L)); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 5691530dd32fe..c9dc231f0997b 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -77,6 +78,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.Before; @@ -168,10 +170,12 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override public void onNewContext(SearchContext context) { - if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); - } else { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + if (context.query() != null) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } } } @@ -357,15 +361,11 @@ public void testTimeout() throws IOException { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final SearchContext contextWithDefaultTimeout = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext contextWithDefaultTimeout = service.createContext(rewriteContext); try { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -376,15 +376,11 @@ public void testTimeout() throws IOException { final long seconds = randomIntBetween(6, 10); searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds))); - final SearchContext context = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext context = service.createContext(rewriteContext); try { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); @@ -412,19 +408,25 @@ public void testMaxDocvalueFieldsSearch() throws IOException { for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) { searchSourceBuilder.docValueField("field" + i); } - try (SearchContext context = service.createContext( - new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)) - ) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); searchSourceBuilder.docValueField("one_field_too_much"); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " - + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", - ex.getMessage()); + "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", ex.getMessage()); } } @@ -447,20 +449,28 @@ public void testMaxScriptFieldsSearch() throws IOException { searchSourceBuilder.scriptField("field" + i, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { searchSourceBuilder.scriptField("anotherScriptField", - new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" - + (maxScriptFields + 1) - + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", - ex.getMessage()); + "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + + (maxScriptFields + 1) + + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", + ex.getMessage()); } } @@ -477,17 +487,19 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { searchSourceBuilder.scriptField("field" + 0, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, - searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertEquals(0, context.scriptFields().fields().size()); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertEquals(0, context.scriptFields().fields().size()); } } /** * test that creating more than the allowed number of scroll contexts throws an exception */ - public void testMaxOpenScrollContexts() throws RuntimeException { + public void testMaxOpenScrollContexts() throws RuntimeException, IOException { createIndex("index"); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -513,8 +525,10 @@ public void testMaxOpenScrollContexts() throws RuntimeException { client().prepareSearch("index").setSize(1).setScroll("1m").get(); } + SearchService.SearchRewriteContext rewriteContext = + service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()))); + () -> service.createAndPutContext(rewriteContext)); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -592,7 +606,7 @@ public Scroll scroll() { } } - public void testCanMatch() throws IOException { + public void testCanMatch() throws IOException, InterruptedException { createIndex("index"); final SearchService service = getInstanceFromNode(SearchService.class); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -625,11 +639,32 @@ public void testCanMatch() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); assertEquals(numWrapReader, numWrapInvocations.get()); - // make sure that the wrapper is called when the context is actually created - service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1f, -1, null, null)).close(); - assertEquals(numWrapReader+1, numWrapInvocations.get()); + ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + CountDownLatch latch = new CountDownLatch(1); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(request, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + try { + // make sure that the wrapper is called when the query is actually executed + assertEquals(numWrapReader+1, numWrapInvocations.get()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); } public void testCanRewriteToMatchNone() { @@ -744,18 +779,123 @@ public void testCreateSearchContextFailure() throws IOException { final IndexService indexService = createIndex(index); final SearchService service = getInstanceFromNode(SearchService.class); final ShardId shardId = new ShardId(indexService.index(), 0); + IndexShard indexShard = indexService.getShard(0); - NullPointerException e = expectThrows(NullPointerException.class, - () -> service.createContext( - new ShardSearchRequest(shardId, 0, null) { - @Override - public SearchType searchType() { - // induce an artificial NPE - throw new NullPointerException("expected"); - } + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY) { + @Override + public SearchType searchType() { + // induce an artificial NPE + throw new NullPointerException("expected"); } - )); + }, indexShard); + NullPointerException e = expectThrows(NullPointerException.class, + () -> service.createContext(rewriteContext)); assertEquals("expected", e.getMessage()); assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); } + + public void testMatchNoDocsEmptyResponse() throws InterruptedException { + createIndex("index"); + Thread currentThread = Thread.currentThread(); + SearchService service = getInstanceFromNode(SearchService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + IndexShard indexShard = indexService.getShard(0); + SearchRequest searchRequest = new SearchRequest() + .allowPartialSearchResults(false) + .source(new SearchSourceBuilder() + .aggregation(AggregationBuilders.count("count").field("value"))); + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 5, AliasFilter.EMPTY, 1.0f, 0, null, null); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchAllQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchNoneQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.canReturnNullResponseIfMatchNoDocs(true); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + // make sure we don't use the search threadpool + assertSame(Thread.currentThread(), currentThread); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertTrue(result.queryResult().isNull()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index c913bbe4b9a37..2d0aa2f591d8d 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -75,6 +75,8 @@ public void testSerialization() throws Exception { assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias()); assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults()); + assertEquals(deserializedRequest.canReturnNullResponseIfMatchNoDocs(), + shardSearchTransportRequest.canReturnNullResponseIfMatchNoDocs()); } private ShardSearchRequest createShardSearchRequest() throws IOException { @@ -88,9 +90,11 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } final String[] routings = generateRandomStringArray(5, 10, false, true); - return new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, + ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); + req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); + return req; } public void testFilteringAliases() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index ef49d8e436e3e..c67850436b393 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -99,4 +99,11 @@ public void testSerialization() throws Exception { } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); } + + public void testNullResponse() throws Exception { + QuerySearchResult querySearchResult = QuerySearchResult.nullInstance(); + QuerySearchResult deserialized = + copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, Version.CURRENT); + assertEquals(querySearchResult.isNull(), deserialized.isNull()); + } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 3d06431c554b9..eb89391fac177 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.index.engine; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -46,9 +45,9 @@ import org.hamcrest.Matchers; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; -import java.util.concurrent.CountDownLatch; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -118,33 +117,37 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx } assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); int numRequests = randomIntBetween(20, 50); - CountDownLatch latch = new CountDownLatch(numRequests); int numRefreshes = 0; for (int i = 0; i < numRequests; i++) { numRefreshes++; - switch (randomIntBetween(0, 3)) { + // make sure that we don't share the frozen reader in concurrent requests since we acquire the + // searcher and rewrite the request outside of the search-throttle thread pool + switch (randomFrom(Arrays.asList(0, 1, 2))) { case 0: - client().prepareGet("index", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareGet("index", "" + randomIntBetween(0, 9)) + .get(); break; case 1: client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setSearchType(SearchType.QUERY_THEN_FETCH) - .execute(ActionListener.wrap(latch::countDown)); + .get(); // in total 4 refreshes 1x query & 1x fetch per shard (we have 2) numRefreshes += 3; break; case 2: - client().prepareTermVectors("index", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareTermVectors("index", "" + randomIntBetween(0, 9)) + .get(); break; case 3: - client().prepareExplain("index", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder()) - .execute(ActionListener.wrap(latch::countDown)); + client().prepareExplain("index", "" + randomIntBetween(0, 9)) + .setQuery(new MatchAllQueryBuilder()) + .get(); break; - default: - assert false; + + default: + assert false; } } - latch.await(); IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(numRefreshes, index.getTotal().refresh.getTotal()); }