diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java index 626ecab0f4a34..551e57abb62a1 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java @@ -158,8 +158,12 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException { topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE); maxScoreCollector = new MaxScoreCollector(); } - for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { - intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); + try { + for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { + intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); + } + } finally { + clearReleasables(Lifetime.COLLECTION); } TopDocs topDocs = topDocsCollector.topDocs(from(), size()); float maxScore = Float.NaN; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 53d8d266ea795..16e8c17688906 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -285,13 +285,13 @@ public void writeTo(StreamOutput out) throws IOException { public static void registerRequestHandler(TransportService transportService, SearchService searchService) { transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new, (request, channel, task) -> { - boolean freed = searchService.freeReaderContext(request.id()); + boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new, (request, channel, task) -> { - boolean freed = searchService.freeReaderContext(request.id()); + boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); diff --git a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java index 55ec170c1e02b..fecf5c8407e98 100644 --- a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java @@ -421,7 +421,11 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException { topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE); maxScoreCollector = new MaxScoreCollector(); } - intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); + try { + intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); + } finally { + clearReleasables(Lifetime.COLLECTION); + } TopDocs td = topDocsCollector.topDocs(from(), size()); float maxScore = Float.NaN; diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index 664b631249f68..5446b1ed0e372 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -24,8 +24,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import java.util.HashMap; @@ -148,25 +146,25 @@ private StatsHolder groupStats(String group) { } @Override - public void onNewReaderContext(ReaderContext readerContext) { + public void onNewContext(SearchContext context) { openContexts.inc(); } @Override - public void onFreeReaderContext(ReaderContext readerContext) { + public void onFreeContext(SearchContext context) { openContexts.dec(); } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(SearchContext context) { totalStats.scrollCurrent.inc(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(SearchContext context) { totalStats.scrollCurrent.dec(); assert totalStats.scrollCurrent.count() >= 0; - totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano())); + totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - context.getOriginNanoTime())); } static final class StatsHolder { diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index 77e83edcdd18f..ede86e6ec222d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -21,8 +21,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -78,34 +76,34 @@ default void onFailedFetchPhase(SearchContext searchContext) {} default void onFetchPhase(SearchContext searchContext, long tookInNanos) {} /** - * Executed when a new reader context was created - * @param readerContext the created context + * Executed when a new search context was created + * @param context the created context */ - default void onNewReaderContext(ReaderContext readerContext) {} + default void onNewContext(SearchContext context) {} /** - * Executed when a previously created reader context is freed. + * Executed when a previously created search context is freed. * This happens either when the search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param readerContext the freed reader context + * @param context the freed search context */ - default void onFreeReaderContext(ReaderContext readerContext) {} + default void onFreeContext(SearchContext context) {} /** * Executed when a new scroll search {@link SearchContext} was created - * @param scrollContext the created search context + * @param context the created search context */ - default void onNewScrollContext(ScrollContext scrollContext) {} + default void onNewScrollContext(SearchContext context) {} /** * Executed when a scroll search {@link SearchContext} is freed. * This happens either when the scroll search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param scrollContext the freed search context + * @param context the freed search context */ - default void onFreeScrollContext(ScrollContext scrollContext) {} + default void onFreeScrollContext(SearchContext context) {} /** * Executed prior to using a {@link SearchContext} that has been retrieved @@ -116,14 +114,6 @@ default void onFreeScrollContext(ScrollContext scrollContext) {} */ default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {} - /** - * Executed when a search context was freed. The implementor can implement - * this method to release resources used by the search context. - */ - default void onFreeSearchContext(SearchContext context) { - - } - /** * A Composite listener that multiplexes calls to each of the listeners methods. */ @@ -203,10 +193,10 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { } @Override - public void onNewReaderContext(ReaderContext readerContext) { + public void onNewContext(SearchContext context) { for (SearchOperationListener listener : listeners) { try { - listener.onNewReaderContext(readerContext); + listener.onNewContext(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewContext listener [{}] failed", listener), e); } @@ -214,10 +204,10 @@ public void onNewReaderContext(ReaderContext readerContext) { } @Override - public void onFreeReaderContext(ReaderContext readerContext) { + public void onFreeContext(SearchContext context) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeReaderContext(readerContext); + listener.onFreeContext(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeContext listener [{}] failed", listener), e); } @@ -225,10 +215,10 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(SearchContext context) { for (SearchOperationListener listener : listeners) { try { - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e); } @@ -236,10 +226,10 @@ public void onNewScrollContext(ScrollContext scrollContext) { } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(SearchContext context) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeScrollContext(scrollContext); + listener.onFreeScrollContext(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e); } @@ -258,16 +248,5 @@ public void validateSearchContext(SearchContext context, TransportRequest reques } ExceptionsHelper.reThrowIfNotNull(exception); } - - @Override - public void onFreeSearchContext(SearchContext context) { - for (SearchOperationListener listener : listeners) { - try { - listener.onFreeSearchContext(context); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("onFreeSearchContext listener [{}] failed", listener), e); - } - } - } } } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index e74c52b70f3ec..5ea5ed3f23e35 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery; import org.elasticsearch.common.lucene.search.function.WeightFactorFunction; @@ -60,7 +61,6 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext; import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight; import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -75,6 +75,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,11 +83,12 @@ final class DefaultSearchContext extends SearchContext { - private final ReaderContext readerContext; + private final long id; private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; private SearchType searchType; + private final Engine.Searcher engineSearcher; private final BigArrays bigArrays; private final IndexShard indexShard; private final ClusterService clusterService; @@ -100,6 +102,7 @@ final class DefaultSearchContext extends SearchContext { // terminate after count private int terminateAfter = DEFAULT_TERMINATE_AFTER; private List groupStats; + private ScrollContext scrollContext; private boolean explain; private boolean version = false; // by default, we don't return versions private boolean seqAndPrimaryTerm = false; @@ -139,6 +142,10 @@ final class DefaultSearchContext extends SearchContext { private SearchContextAggregations aggregations; private SearchContextHighlight highlight; private SuggestionSearchContext suggest; + private List rescore; + private volatile long keepAlive; + private final long originNanoTime = System.nanoTime(); + private volatile long lastAccessTime = -1; private Profilers profilers; private final Map searchExtBuilders = new HashMap<>(); @@ -146,36 +153,36 @@ final class DefaultSearchContext extends SearchContext { private final QueryShardContext queryShardContext; private final FetchPhase fetchPhase; - DefaultSearchContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget shardTarget, - ClusterService clusterService, IndexService indexService, IndexShard indexShard, BigArrays bigArrays, - LongSupplier relativeTimeSupplier, TimeValue timeout, FetchPhase fetchPhase) { - this.readerContext = readerContext; + DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, + Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, + IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout, + FetchPhase fetchPhase) { + this.id = id; this.request = request; this.fetchPhase = fetchPhase; this.searchType = request.searchType(); this.shardTarget = shardTarget; + this.engineSearcher = engineSearcher; // SearchContexts use a BigArrays that can circuit break this.bigArrays = bigArrays.withCircuitBreaking(); - this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget); - this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget); - this.fetchResult = new FetchSearchResult(readerContext.id(), shardTarget); + this.dfsResult = new DfsSearchResult(id, shardTarget); + this.queryResult = new QuerySearchResult(id, shardTarget); + this.fetchResult = new FetchSearchResult(id, shardTarget); this.indexShard = indexShard; this.indexService = indexService; this.clusterService = clusterService; - final Engine.Searcher engineSearcher = readerContext.engineSearcher(); this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy()); - searcher.setAggregatedDfs(readerContext.aggregatedDfs()); this.relativeTimeSupplier = relativeTimeSupplier; this.timeout = timeout; - queryShardContext = indexService.newQueryShardContext(request.shardId().id(), this.searcher, + queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher, request::nowInMillis, shardTarget.getClusterAlias()); queryBoost = request.indexBoost(); } @Override public void doClose() { - readerContext.decRef(); + Releasables.close(engineSearcher); } /** @@ -192,7 +199,7 @@ public void preProcess(boolean rewrite) { int maxResultWindow = indexService.getIndexSettings().getMaxResultWindow(); if (resultWindow > maxResultWindow) { - if (scrollContext() == null) { + if (scrollContext == null) { throw new IllegalArgumentException( "Result window is too large, from + size must be less than or equal to: [" + maxResultWindow + "] but was [" + resultWindow + "]. See the scroll api for a more efficient way to request large data sets. " @@ -204,12 +211,12 @@ public void preProcess(boolean rewrite) { + "]. Scroll batch sizes cost as much memory as result windows so they are controlled by the [" + IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey() + "] index level setting."); } - if (rescore().isEmpty() == false) { + if (rescore != null) { if (sort != null) { throw new IllegalArgumentException("Cannot use [sort] option in conjunction with [rescore]."); } int maxWindow = indexService.getIndexSettings().getMaxRescoreWindow(); - for (RescoreContext rescoreContext: rescore()) { + for (RescoreContext rescoreContext: rescore) { if (rescoreContext.getWindowSize() > maxWindow) { throw new IllegalArgumentException("Rescore window [" + rescoreContext.getWindowSize() + "] is too large. " + "It must be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results " @@ -290,12 +297,12 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query) @Override public long id() { - return readerContext.id(); + return this.id; } @Override public String source() { - return readerContext.source(); + return engineSearcher.source(); } @Override @@ -323,11 +330,21 @@ public float queryBoost() { return queryBoost; } + @Override + public long getOriginNanoTime() { + return originNanoTime; + } + @Override public ScrollContext scrollContext() { - return readerContext.scrollContext(); + return this.scrollContext; } + @Override + public SearchContext scrollContext(ScrollContext scrollContext) { + this.scrollContext = scrollContext; + return this; + } @Override public SearchContextAggregations aggregations() { @@ -374,7 +391,18 @@ public void suggest(SuggestionSearchContext suggest) { @Override public List rescore() { - return readerContext.rescore(); + if (rescore == null) { + return Collections.emptyList(); + } + return rescore; + } + + @Override + public void addRescore(RescoreContext rescore) { + if (this.rescore == null) { + this.rescore = new ArrayList<>(); + } + this.rescore.add(rescore); } @Override @@ -708,6 +736,26 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return this; } + @Override + public void accessed(long accessTime) { + this.lastAccessTime = accessTime; + } + + @Override + public long lastAccessTime() { + return this.lastAccessTime; + } + + @Override + public long keepAlive() { + return this.keepAlive; + } + + @Override + public void keepAlive(long keepAlive) { + this.keepAlive = keepAlive; + } + @Override public DfsSearchResult dfsResult() { return dfsResult; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 84d47729f45b5..f65ccf8d6638d 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -32,12 +32,10 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -86,8 +84,9 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.profile.Profilers; import org.elasticsearch.search.query.QueryPhase; @@ -105,6 +104,7 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportRequest; import java.io.IOException; import java.util.ArrayList; @@ -120,6 +120,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.common.unit.TimeValue.timeValueHours; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -190,7 +191,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicLong idGenerator = new AtomicLong(); - private final ConcurrentMapLong activeReaders = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); + private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final MultiBucketConsumerService multiBucketConsumerService; @@ -280,13 +281,13 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem } - protected void putReaderContext(ReaderContext context) { - final ReaderContext previous = activeReaders.put(context.id(), context); + protected void putContext(SearchContext context) { + final SearchContext previous = activeContexts.put(context.id(), context); assert previous == null; } - protected ReaderContext removeReaderContext(long id) { - return activeReaders.remove(id); + protected SearchContext removeContext(long id) { + return activeContexts.remove(id); } @Override @@ -295,8 +296,8 @@ protected void doStart() { @Override protected void doStop() { - for (final ReaderContext context : activeReaders.values()) { - freeReaderContext(context.id()); + for (final SearchContext context : activeContexts.values()) { + freeContext(context.id()); } } @@ -311,14 +312,20 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac } private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { - final ReaderContext reader = createAndPutReaderContext(request); - try (SearchContext context = createContext(reader, reader.request(), task, true)) { + final SearchContext context = createAndPutContext(request); + context.incRef(); + try { + context.setTask(task); + contextProcessing(context); dfsPhase.execute(context); + contextProcessedSuccessfully(context); return context.dfsResult(); } catch (Exception e) { logger.trace("Dfs phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } } @@ -339,18 +346,23 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } - private void runAsync(long id, CheckedSupplier executable, ActionListener listener) { + private void runAsync(long id, Supplier executable, ActionListener listener) { getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { - final ReaderContext reader = createAndPutReaderContext(request); - try (SearchContext context = createContext(reader, reader.request(), task, true)) { + final SearchContext context = createAndPutContext(request); + context.incRef(); + try { + context.setTask(task); final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + contextProcessing(context); loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - freeReaderContext(reader.id()); + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); } afterQueryTime = executor.success(); } @@ -365,8 +377,10 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } } @@ -375,7 +389,9 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, long aft shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { - freeReaderContext(context.id()); + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); } executor.success(); } @@ -386,41 +402,49 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - processScroll(request, reader, context); + final SearchContext context = findContext(request.id(), request); + context.incRef(); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + context.setTask(task); + contextProcessing(context); + processScroll(request, context); queryPhase.execute(context); + contextProcessedSuccessfully(context); executor.success(); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } }, listener); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - reader.aggregatedDfs(request.dfs()); - try (SearchContext context = createContext(reader, reader.request(), task, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + final SearchContext context = findContext(request.id(), request); + context.setTask(task); + context.incRef(); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + contextProcessing(context); context.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase - freeReaderContext(reader.id()); + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); } executor.success(); return context.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } }, listener); } @@ -436,7 +460,11 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } final Executor getExecutor(long id) { - return getExecutor(findReaderContext(id).indexShard()); + SearchContext context = activeContexts.get(id); + if (context == null) { + throw new SearchContextMissingException(id); + } + return getExecutor(context.indexShard()); } private Executor getExecutor(IndexShard indexShard) { @@ -447,28 +475,33 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - processScroll(request, reader, context); + final SearchContext context = findContext(request.id(), request); + context.setTask(task); + context.incRef(); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ + contextProcessing(context); + processScroll(request, context); queryPhase.execute(context); final long afterQueryTime = executor.success(); QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } }, listener); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + final SearchContext context = findContext(request.id(), request); + context.incRef(); + try { + context.setTask(task); + contextProcessing(context); if (request.lastEmittedDoc() != null) { context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -476,74 +509,89 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { - freeReaderContext(request.id()); + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); } executor.success(); } return context.fetchResult(); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(context, e); throw e; + } finally { + cleanContext(context); } }, listener); } - private ReaderContext findReaderContext(long id) throws SearchContextMissingException { - final ReaderContext reader = activeReaders.get(id); - if (reader == null) { + private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException { + SearchContext context = activeContexts.get(id); + if (context == null) { throw new SearchContextMissingException(id); } - return reader; + + SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + try { + operationListener.validateSearchContext(context, request); + return context; + } catch (Exception e) { + processFailure(context, e); + throw e; + } } - final ReaderContext createAndPutReaderContext(ShardSearchRequest request) { + final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException { if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) { throw new ElasticsearchException( "Trying to create too many scroll contexts. Must be less than or equal to: [" + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); } - final IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); - final SearchOperationListener searchOperationListener = indexShard.getSearchOperationListener(); - Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); - ReaderContext readerContext = null; + + SearchContext context = createContext(request); + onNewContext(context); + boolean success = false; try { - readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, request); - engineSearcher = null; - final ReaderContext finalReaderContext = readerContext; - searchOperationListener.onNewReaderContext(finalReaderContext); - if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onNewScrollContext(finalReaderContext.scrollContext()); + putContext(context); + success = true; + return context; + } finally { + if (success == false) { + freeContext(context.id()); + } + } + } + + private void onNewContext(SearchContext context) { + boolean success = false; + try { + if (context.scrollContext() != null) { openScrollContexts.incrementAndGet(); + context.indexShard().getSearchOperationListener().onNewScrollContext(context); } - readerContext.addOnClose(() -> { - try { - if (finalReaderContext.scrollContext() != null) { - openScrollContexts.decrementAndGet(); - searchOperationListener.onFreeScrollContext(finalReaderContext.scrollContext()); - } - } finally { - searchOperationListener.onFreeReaderContext(finalReaderContext); - } - }); - putReaderContext(finalReaderContext); - readerContext = null; - return finalReaderContext; + context.indexShard().getSearchOperationListener().onNewContext(context); + success = true; } finally { - Releasables.close(engineSearcher, readerContext); + // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the + // right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future. + if (success == false) { + try (context) { + onFreeContext(context); + } + } } } - final SearchContext createContext(ReaderContext reader, ShardSearchRequest request, SearchShardTask task, - boolean includeAggregations) throws IOException { - final DefaultSearchContext context = createSearchContext(reader, request, defaultSearchTimeout); + final SearchContext createContext(ShardSearchRequest request) throws IOException { + final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); try { - context.setTask(task); if (request.scroll() != null) { + context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); } - parseSource(reader, context, request.source(), includeAggregations); + parseSource(context, request.source()); // if the from and size are still not set, default them if (context.from() == -1) { @@ -563,12 +611,8 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque if (request.scroll() != null && request.scroll().keepAlive() != null) { keepAlive = request.scroll().keepAlive().millis(); } - contextScrollKeepAlive(reader, keepAlive); + contextScrollKeepAlive(context, keepAlive); context.lowLevelCancellation(lowLevelCancellation); - // Disable timeout while searching and mark the reader as accessed when releasing the context. - context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis())); - reader.accessed(-1L); - context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context)); } catch (Exception e) { context.close(); throw e; @@ -578,34 +622,28 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque } public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); - Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); - try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, request)) { - engineSearcher = null; // transfer ownership to readerContext - return createSearchContext(readerContext, request, timeout); - } finally { - Releasables.close(engineSearcher); - } + return createSearchContext(request, timeout, true, "search"); } - private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSearchRequest request, TimeValue timeout) + private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, + boolean assertAsyncActions, String source) throws IOException { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); + Engine.Searcher searcher = indexShard.acquireSearcher(source); boolean success = false; DefaultSearchContext searchContext = null; - reader.incRef(); try { - searchContext = new DefaultSearchContext(reader, request, shardTarget, - clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); + searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, + searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services // during rewrite and normalized / evaluate templates etc. QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext()); - Rewriteable.rewrite(request.getRewriteable(), context, true); + Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions); assert searchContext.getQueryShardContext().isCacheable(); success = true; } finally { @@ -614,7 +652,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear if (searchContext == null) { // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). - IOUtils.closeWhileHandlingException(reader::decRef); + IOUtils.closeWhileHandlingException(searcher); } } } @@ -623,41 +661,72 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear private void freeAllContextForIndex(Index index) { assert index != null; - for (ReaderContext ctx : activeReaders.values()) { + for (SearchContext ctx : activeContexts.values()) { if (index.equals(ctx.indexShard().shardId().getIndex())) { - freeReaderContext(ctx.id()); + freeContext(ctx.id()); } } } - public boolean freeReaderContext(long id) { - try (ReaderContext context = removeReaderContext(id)) { - return context != null; + public boolean freeContext(long id) { + try (SearchContext context = removeContext(id)) { + if (context != null) { + onFreeContext(context); + return true; + } + return false; } } + private void onFreeContext(SearchContext context) { + assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); + assert activeContexts.containsKey(context.id()) == false; + context.indexShard().getSearchOperationListener().onFreeContext(context); + if (context.scrollContext() != null) { + openScrollContexts.decrementAndGet(); + context.indexShard().getSearchOperationListener().onFreeScrollContext(context); + } + } public void freeAllScrollContexts() { - for (ReaderContext readerContext : activeReaders.values()) { - if (readerContext.scrollContext() != null) { - freeReaderContext(readerContext.id()); + for (SearchContext searchContext : activeContexts.values()) { + if (searchContext.scrollContext() != null) { + freeContext(searchContext.id()); } } } - private void contextScrollKeepAlive(ReaderContext readerContext, long keepAlive) { + private void contextScrollKeepAlive(SearchContext context, long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( "Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " + "It must be less than (" + TimeValue.timeValueMillis(maxKeepAlive) + "). " + "This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() + "] cluster level setting."); } - readerContext.keepAlive(keepAlive); + context.keepAlive(keepAlive); + } + + private void contextProcessing(SearchContext context) { + // disable timeout while executing a search + context.accessed(-1); + } + + private void contextProcessedSuccessfully(SearchContext context) { + context.accessed(threadPool.relativeTimeInMillis()); + } + + private void cleanContext(SearchContext context) { + try { + context.clearReleasables(Lifetime.PHASE); + context.setTask(null); + } finally { + context.decRef(); + } } - private void processFailure(ReaderContext context, Exception e) { - freeReaderContext(context.id()); + private void processFailure(SearchContext context, Exception e) { + freeContext(context.id()); try { if (Lucene.isCorruptionException(e)) { context.indexShard().failShard("search execution corruption failure", e); @@ -668,8 +737,7 @@ private void processFailure(ReaderContext context, Exception e) { } } - private void parseSource(ReaderContext reader, DefaultSearchContext context, - SearchSourceBuilder source, boolean includeAggregations) throws SearchException { + private void parseSource(DefaultSearchContext context, SearchSourceBuilder source) throws SearchException { // nothing to parse... if (source == null) { return; @@ -725,7 +793,7 @@ private void parseSource(ReaderContext reader, DefaultSearchContext context, context.timeout(source.timeout()); } context.terminateAfter(source.terminateAfter()); - if (source.aggregations() != null && includeAggregations) { + if (source.aggregations() != null) { try { AggregatorFactories factories = source.aggregations().build(queryShardContext, null); context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create())); @@ -740,10 +808,10 @@ private void parseSource(ReaderContext reader, DefaultSearchContext context, throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e); } } - if (source.rescores() != null && reader.rescore().isEmpty()) { + if (source.rescores() != null) { try { for (RescorerBuilder rescore : source.rescores()) { - reader.addRescore(rescore.buildContext(queryShardContext)); + context.addRescore(rescore.buildContext(queryShardContext)); } } catch (IOException e) { throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e); @@ -901,13 +969,13 @@ private void shortcutDocIdsToLoad(SearchContext context) { context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } - private void processScroll(InternalScrollSearchRequest request, ReaderContext reader, SearchContext context) { + private void processScroll(InternalScrollSearchRequest request, SearchContext context) { // process scroll context.from(context.from() + context.size()); context.scrollContext().scroll = request.scroll(); // update the context keep alive based on the new scroll value if (request.scroll() != null && request.scroll().keepAlive() != null) { - contextScrollKeepAlive(reader, request.scroll().keepAlive().millis()); + contextScrollKeepAlive(context, request.scroll().keepAlive().millis()); } } @@ -916,7 +984,7 @@ private void processScroll(InternalScrollSearchRequest request, ReaderContext re * SearchService */ public int getActiveContexts() { - return this.activeReaders.size(); + return this.activeContexts.size(); } public ResponseCollectorService getResponseCollectorService() { @@ -927,7 +995,7 @@ class Reaper implements Runnable { @Override public void run() { final long time = threadPool.relativeTimeInMillis(); - for (ReaderContext context : activeReaders.values()) { + for (SearchContext context : activeContexts.values()) { // Use the same value for both checks since lastAccessTime can // be modified by another thread between checks! final long lastAccessTime = context.lastAccessTime(); @@ -937,7 +1005,7 @@ public void run() { if ((time - lastAccessTime > context.keepAlive())) { logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive()); - freeReaderContext(context.id()); + freeContext(context.id()); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index fca5b14e34be3..c150f99cfe196 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -117,6 +117,8 @@ public void execute(SearchContext context) { context.searcher().search(query, collector); } catch (Exception e) { throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e); + } finally { + context.clearReleasables(SearchContext.Lifetime.COLLECTION); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 43e38c731bbe1..ea89700521798 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.query.QueryPhaseExecutionException; import java.io.IOException; @@ -75,7 +76,7 @@ protected AggregatorBase(String name, AggregatorFactories factories, SearchConte this.breakerService = context.bigArrays().breakerService(); assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.subAggregators = factories.createSubAggregators(context, this); - context.addReleasable(this); + context.addReleasable(this, Lifetime.PHASE); final SearchShardTarget shardTarget = context.shardTarget(); // Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent preCollection call) collectableSubAggregators = new BucketCollector() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java index 5451285a379c8..970ef725f027d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; import java.util.List; @@ -49,7 +50,7 @@ public static final class MultiBucketAggregatorWrapper extends Aggregator { this.parent = parent; this.factory = factory; this.first = first; - context.addReleasable(this); + context.addReleasable(this, Lifetime.PHASE); aggregators = bigArrays.newObjectArray(1); aggregators.set(0, first); collectors = bigArrays.newObjectArray(1); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java index 345a4c5a0a7ad..3a7d5c96b8179 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java @@ -48,6 +48,8 @@ public void hitExecute(SearchContext context, HitContext hitContext) { } catch (IOException e) { throw new FetchPhaseExecutionException(context.shardTarget(), "Failed to explain doc [" + hitContext.hit().getId() + "]", e); + } finally { + context.clearReleasables(SearchContext.Lifetime.COLLECTION); } } } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java index 319bd40cfbd70..c2f6980781dba 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java @@ -31,6 +31,7 @@ import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; import java.util.ArrayList; @@ -91,6 +92,8 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) { } } catch (IOException e) { throw ExceptionsHelper.convertToElastic(e); + } finally { + context.clearReleasables(Lifetime.COLLECTION); } } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index ab605062de50a..8c04954a4efcf 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -140,11 +140,21 @@ public float queryBoost() { return in.queryBoost(); } + @Override + public long getOriginNanoTime() { + return in.getOriginNanoTime(); + } + @Override public ScrollContext scrollContext() { return in.scrollContext(); } + @Override + public SearchContext scrollContext(ScrollContext scroll) { + return in.scrollContext(scroll); + } + @Override public SearchContextAggregations aggregations() { return in.aggregations(); @@ -185,6 +195,11 @@ public List rescore() { return in.rescore(); } + @Override + public void addRescore(RescoreContext rescore) { + in.addRescore(rescore); + } + @Override public boolean hasScriptFields() { return in.hasScriptFields(); @@ -436,6 +451,26 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return in.docIdsToLoad(docIdsToLoad, docsIdsToLoadFrom, docsIdsToLoadSize); } + @Override + public void accessed(long accessTime) { + in.accessed(accessTime); + } + + @Override + public long lastAccessTime() { + return in.lastAccessTime(); + } + + @Override + public long keepAlive() { + return in.keepAlive(); + } + + @Override + public void keepAlive(long keepAlive) { + in.keepAlive(keepAlive); + } + @Override public SearchLookup lookup() { return in.lookup(); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java deleted file mode 100644 index 7c9dfe14dc9f8..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.internal; - -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.concurrent.AbstractRefCounted; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.search.dfs.AggregatedDfs; -import org.elasticsearch.search.rescore.RescoreContext; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Holds a reference to a point in time {@link Engine.Searcher} that will be used to construct {@link SearchContext}. - * This class also implements {@link org.elasticsearch.common.util.concurrent.RefCounted} since in some situations like - * in {@link org.elasticsearch.search.SearchService} a SearchContext can be closed concurrently due to independent events - * ie. when an index gets removed. To prevent accessing closed IndexReader / IndexSearcher instances the SearchContext - * can be guarded by a reference count and fail if it's been closed by an external event. - * - * For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095. - */ -public class ReaderContext extends AbstractRefCounted implements Releasable { - private final long id; - private final IndexShard indexShard; - private final Engine.Searcher engineSearcher; - private final AtomicBoolean closed = new AtomicBoolean(false); - - private volatile long keepAlive; - private volatile long lastAccessTime = -1L; - - // BWC - private final ShardSearchRequest request; - private final ScrollContext scrollContext; - private AggregatedDfs aggregatedDfs; - private List rescore; - - private final List onCloses = new CopyOnWriteArrayList<>(); - - public ReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher, ShardSearchRequest request) { - super("reader_context"); - this.id = id; - this.indexShard = indexShard; - this.engineSearcher = engineSearcher; - this.request = request; - if (request.scroll() != null) { - this.scrollContext = new ScrollContext(); - } else { - this.scrollContext = null; - } - } - - @Override - public final void close() { - if (closed.compareAndSet(false, true)) { - decRef(); - } else { - assert false : "ReaderContext was closed already"; - } - } - - @Override - protected void closeInternal() { - Releasables.close(Releasables.wrap(onCloses), engineSearcher); - } - - public void addOnClose(Releasable releasable) { - onCloses.add(releasable); - } - - public long id() { - return id; - } - - public IndexShard indexShard() { - return indexShard; - } - - public Engine.Searcher engineSearcher() { - return engineSearcher; - } - - public String source() { - return engineSearcher.source(); - } - - public ShardSearchRequest request() { - return request; - } - - public ScrollContext scrollContext() { - return scrollContext; - } - - public AggregatedDfs aggregatedDfs() { - return aggregatedDfs; - } - - public void aggregatedDfs(AggregatedDfs aggregatedDfs) { - this.aggregatedDfs = aggregatedDfs; - } - - public void accessed(long accessTime) { - this.lastAccessTime = accessTime; - } - - public long lastAccessTime() { - return this.lastAccessTime; - } - - public long keepAlive() { - return this.keepAlive; - } - - public void keepAlive(long keepAlive) { - this.keepAlive = keepAlive; - } - - public List rescore() { - if (rescore == null) { - return Collections.emptyList(); - } else { - return Collections.unmodifiableList(rescore); - } - } - - public void addRescore(RescoreContext rescore) { - if (this.rescore == null) { - this.rescore = new ArrayList<>(); - } - this.rescore.add(rescore); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java index 440408646d127..41d7680a780b0 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java @@ -36,8 +36,6 @@ public final class ScrollContext { public ScoreDoc lastEmittedDoc; public Scroll scroll; - private final long startTimeInNano = System.nanoTime(); - /** * Returns the object or null if the given key does not have a * value in the context @@ -56,9 +54,4 @@ public void putInContext(String key, Object value) { } context.put(key, value); } - - - public long getStartTimeInNano() { - return startTimeInNano; - } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index d0eddac1d86c6..c0477ece69aa0 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -29,6 +29,9 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.common.util.concurrent.RefCounted; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; @@ -58,29 +61,36 @@ import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestionSearchContext; +import java.util.ArrayList; +import java.util.EnumMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; /** * This class encapsulates the state needed to execute a search. It holds a reference to the * shards point in time snapshot (IndexReader / ContextIndexSearcher) and allows passing on * state from one query / fetch phase to another. + * + * This class also implements {@link RefCounted} since in some situations like in {@link org.elasticsearch.search.SearchService} + * a SearchContext can be closed concurrently due to independent events ie. when an index gets removed. To prevent accessing closed + * IndexReader / IndexSearcher instances the SearchContext can be guarded by a reference count and fail if it's been closed by + * an external event. */ -public abstract class SearchContext implements Releasable { +// For reference why we use RefCounted here see #20095 +public abstract class SearchContext extends AbstractRefCounted implements Releasable { public static final int DEFAULT_TERMINATE_AFTER = 0; public static final int TRACK_TOTAL_HITS_ACCURATE = Integer.MAX_VALUE; public static final int TRACK_TOTAL_HITS_DISABLED = -1; public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000; - private final List releasables = new CopyOnWriteArrayList<>(); + private Map> clearables = null; private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; protected SearchContext() { - + super("search_context"); } public abstract void setTask(SearchShardTask task); @@ -91,15 +101,25 @@ protected SearchContext() { @Override public final void close() { - if (closed.compareAndSet(false, true)) { - try { - Releasables.close(releasables); - } finally { - doClose(); - } + if (closed.compareAndSet(false, true)) { // prevent double closing + decRef(); + } + } + + @Override + protected final void closeInternal() { + try { + clearReleasables(Lifetime.CONTEXT); + } finally { + doClose(); } } + @Override + protected void alreadyClosed() { + throw new IllegalStateException("search context is already closed can't increment refCount current count [" + refCount() + "]"); + } + protected abstract void doClose(); /** @@ -126,8 +146,12 @@ public final void close() { public abstract float queryBoost(); + public abstract long getOriginNanoTime(); + public abstract ScrollContext scrollContext(); + public abstract SearchContext scrollContext(ScrollContext scroll); + public abstract SearchContextAggregations aggregations(); public abstract SearchContext aggregations(SearchContextAggregations aggregations); @@ -156,6 +180,8 @@ public InnerHitsContext innerHits() { */ public abstract List rescore(); + public abstract void addRescore(RescoreContext rescore); + public abstract boolean hasScriptFields(); public abstract ScriptFieldsContext scriptFields(); @@ -296,6 +322,14 @@ public InnerHitsContext innerHits() { public abstract SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int docsIdsToLoadSize); + public abstract void accessed(long accessTime); + + public abstract long lastAccessTime(); + + public abstract long keepAlive(); + + public abstract void keepAlive(long keepAlive); + public SearchLookup lookup() { return getQueryShardContext().lookup(); } @@ -313,14 +347,38 @@ public SearchLookup lookup() { */ public abstract Profilers getProfilers(); - /** - * Adds a releasable that will be freed when this context is closed. + * Schedule the release of a resource. The time when {@link Releasable#close()} will be called on this object + * is function of the provided {@link Lifetime}. */ - public void addReleasable(Releasable releasable) { + public void addReleasable(Releasable releasable, Lifetime lifetime) { + if (clearables == null) { + clearables = new EnumMap<>(Lifetime.class); + } + List releasables = clearables.get(lifetime); + if (releasables == null) { + releasables = new ArrayList<>(); + clearables.put(lifetime, releasables); + } releasables.add(releasable); } + public void clearReleasables(Lifetime lifetime) { + if (clearables != null) { + List>releasables = new ArrayList<>(); + for (Lifetime lc : Lifetime.values()) { + if (lc.compareTo(lifetime) > 0) { + break; + } + List remove = clearables.remove(lc); + if (remove != null) { + releasables.add(remove); + } + } + Releasables.close(Iterables.flatten(releasables)); + } + } + /** * @return true if the request contains only suggest */ @@ -345,6 +403,24 @@ public final boolean hasOnlySuggest() { /** Return a view of the additional query collectors that should be run for this context. */ public abstract Map, Collector> queryCollectors(); + /** + * The life time of an object that is used during search execution. + */ + public enum Lifetime { + /** + * This life time is for objects that only live during collection time. + */ + COLLECTION, + /** + * This life time is for objects that need to live until the end of the current search phase. + */ + PHASE, + /** + * This life time is for objects that need to live until the search context they are attached to is destroyed. + */ + CONTEXT + } + public abstract QueryShardContext getQueryShardContext(); @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java index 8ce36550f7b94..e918aed603750 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext; import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.rescore.RescoreContext; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestionSearchContext; @@ -84,6 +85,11 @@ public Query buildFilteredQuery(Query query) { throw new UnsupportedOperationException("this context should be read only"); } + @Override + public SearchContext scrollContext(ScrollContext scrollContext) { + throw new UnsupportedOperationException("Not supported"); + } + @Override public SearchContext aggregations(SearchContextAggregations aggregations) { throw new UnsupportedOperationException("Not supported"); @@ -104,6 +110,11 @@ public void suggest(SuggestionSearchContext suggest) { throw new UnsupportedOperationException("Not supported"); } + @Override + public void addRescore(RescoreContext rescore) { + throw new UnsupportedOperationException("Not supported"); + } + @Override public boolean hasScriptFields() { return scriptFields != null; @@ -321,6 +332,16 @@ public CollapseContext collapse() { return null; } + @Override + public void accessed(long accessTime) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public void keepAlive(long keepAlive) { + throw new UnsupportedOperationException("Not supported"); + } + @Override public QuerySearchResult queryResult() { return querySearchResult; diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 01636b2d7f1c3..e58cf5ad8f7c6 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -339,6 +339,8 @@ private static boolean searchWithCollector(SearchContext searchContext, ContextI throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); } queryResult.searchTimedOut(true); + } finally { + searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); } if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) { queryResult.terminatedEarly(false); @@ -393,6 +395,8 @@ private static boolean searchWithCollectorManager(SearchContext searchContext, C throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); } searchContext.queryResult().searchTimedOut(true); + } finally { + searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); } return false; // no rescoring when sorting by field } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 18b9b5f4f6d58..2c11b26580f3d 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -78,10 +78,11 @@ import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -102,7 +103,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; public class IndexModuleTests extends ESTestCase { private Index index; @@ -272,8 +272,9 @@ public void testAddSearchOperationListener() throws IOException { IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap()); AtomicBoolean executed = new AtomicBoolean(false); SearchOperationListener listener = new SearchOperationListener() { + @Override - public void onNewReaderContext(ReaderContext readerContext) { + public void onNewContext(SearchContext context) { executed.set(true); } }; @@ -286,8 +287,9 @@ public void onNewReaderContext(ReaderContext readerContext) { assertEquals(2, indexService.getSearchOperationListener().size()); assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass()); assertSame(listener, indexService.getSearchOperationListener().get(1)); + for (SearchOperationListener l : indexService.getSearchOperationListener()) { - l.onNewReaderContext(mock(ReaderContext.class)); + l.onNewContext(new TestSearchContext(null)); } assertTrue(executed.get()); indexService.close("simon says", false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java index 34c528dfeda6a..28bab8da0fdfb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.index.shard; -import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; @@ -35,7 +33,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; public class SearchOperationListenerTests extends ESTestCase { @@ -93,26 +90,26 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { } @Override - public void onNewReaderContext(ReaderContext readerContext) { - assertNotNull(readerContext); + public void onNewContext(SearchContext context) { + assertNotNull(context); newContext.incrementAndGet(); } @Override - public void onFreeReaderContext(ReaderContext readerContext) { - assertNotNull(readerContext); + public void onFreeContext(SearchContext context) { + assertNotNull(context); freeContext.incrementAndGet(); } @Override - public void onNewScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onNewScrollContext(SearchContext context) { + assertNotNull(context); newScrollContext.incrementAndGet(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onFreeScrollContext(SearchContext context) { + assertNotNull(context); freeScrollContext.incrementAndGet(); } @@ -219,7 +216,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewReaderContext(mock(ReaderContext.class)); + compositeListener.onNewContext(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -232,7 +229,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewScrollContext(new ScrollContext()); + compositeListener.onNewScrollContext(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -245,7 +242,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeReaderContext(mock(ReaderContext.class)); + compositeListener.onFreeContext(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -258,7 +255,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeScrollContext(new ScrollContext()); + compositeListener.onFreeScrollContext(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 198daf0f41a11..560f052664604 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -50,7 +50,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.rescore.RescoreContext; import org.elasticsearch.search.slice.SliceBuilder; @@ -111,33 +111,26 @@ public void testPreProcess() throws Exception { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); - IndexReader reader = w.getReader()) { - - final Engine.Searcher engineSearcher = new Engine.Searcher("test", w.getReader(), - IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), reader); + IndexReader reader = w.getReader(); + Engine.Searcher searcher = new Engine.Searcher("test", reader, + IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), reader)) { SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); - ReaderContext readerWithoutScroll = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); - DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null, - indexService, indexShard, bigArrays, null, timeout, null); - contextWithoutScroll.from(300); - contextWithoutScroll.close(); + DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, target, searcher, null, indexService, + indexShard, bigArrays, null, timeout, null); + context1.from(300); // resultWindow greater than maxResultWindow and scrollContext is null - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> contextWithoutScroll.preProcess(false)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Result window is too large, from + size must be less than or equal to:" + " [" + maxResultWindow + "] but was [310]. See the scroll api for a more efficient way to request large data sets. " + "This limit can be set by changing the [" + IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey() + "] index level setting.")); // resultWindow greater than maxResultWindow and scrollContext isn't null - when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000)))); - ReaderContext readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); - DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, - indexShard, bigArrays, null, timeout, null); - context1.from(300); + context1.scrollContext(new ScrollContext()); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Batch size is too large, size must be less than or equal to: [" + maxResultWindow + "] but was [310]. Scroll batch sizes cost as much memory as result windows so they are " @@ -151,7 +144,7 @@ public void testPreProcess() throws Exception { RescoreContext rescoreContext = mock(RescoreContext.class); when(rescoreContext.getWindowSize()).thenReturn(500); - readerContext.addRescore(rescoreContext); + context1.addRescore(rescoreContext); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Cannot use [sort] option in conjunction with [rescore].")); @@ -165,10 +158,8 @@ public void testPreProcess() throws Exception { + "to be rescored. This limit can be set by changing the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey() + "] index level setting.")); - readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); // rescore is null but sliceBuilder is not null - DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target, + DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null); SliceBuilder sliceBuilder = mock(SliceBuilder.class); @@ -185,7 +176,7 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY); when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST); - DefaultSearchContext context3 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, + DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false); @@ -195,9 +186,7 @@ public void testPreProcess() throws Exception { when(queryShardContext.fieldMapper(anyString())).thenReturn(mock(MappedFieldType.class)); when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]); - readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); - DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, + DefaultSearchContext context4 = new DefaultSearchContext(4L, shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null); context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false); Query query1 = context4.query(); @@ -205,7 +194,6 @@ public void testPreProcess() throws Exception { Query query2 = context4.query(); assertTrue(query1 instanceof MatchNoDocsQuery || query2 instanceof MatchNoDocsQuery); - readerContext.close(); } } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index c47ccefca3217..5691530dd32fe 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -76,7 +75,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -169,8 +167,8 @@ protected Map, Object>> pluginScripts() { public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override - public void onNewReaderContext(ReaderContext readerContext) { - if ("throttled_threadpool_index".equals(readerContext.indexShard().shardId().getIndex().getName())) { + public void onNewContext(SearchContext context) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); @@ -324,7 +322,7 @@ public void onFailure(Exception e) { service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); if (useScroll) { - service.freeReaderContext(searchPhaseResult.getRequestId()); + service.freeContext(searchPhaseResult.getRequestId()); } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); @@ -333,7 +331,7 @@ public void onFailure(Exception e) { } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { - assertEquals("reader_context is already closed can't increment refCount current count [0]", ex.getMessage()); + assertEquals("search context is already closed can't increment refCount current count [0]", ex.getMessage()); } catch (SearchContextMissingException ex) { // that's fine } @@ -359,34 +357,42 @@ public void testTimeout() throws IOException { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final ShardSearchRequest requestWithDefaultTimeout = new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); - - try (ReaderContext reader = createReaderContext(indexShard, requestWithDefaultTimeout); - SearchContext contextWithDefaultTimeout = service.createContext(reader, requestWithDefaultTimeout, null, randomBoolean())) { + final SearchContext contextWithDefaultTimeout = service.createContext( + new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null, null) + ); + try { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); + } finally { + contextWithDefaultTimeout.decRef(); + service.freeContext(contextWithDefaultTimeout.id()); } final long seconds = randomIntBetween(6, 10); searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds))); - final ShardSearchRequest requestWithCustomTimeout = new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, requestWithCustomTimeout); - SearchContext context = service.createContext(reader, requestWithCustomTimeout, null, randomBoolean())) { + final SearchContext context = service.createContext( + new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null, null) + ); + try { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); + } finally { + context.decRef(); + service.freeContext(context.id()); } + } /** @@ -406,20 +412,19 @@ public void testMaxDocvalueFieldsSearch() throws IOException { for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) { searchSourceBuilder.docValueField("field" + i); } - final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, request); - SearchContext context = service.createContext(reader, request, null, randomBoolean())) { + try (SearchContext context = service.createContext( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)) + ) { assertNotNull(context); - } - searchSourceBuilder.docValueField("one_field_too_much"); - try (ReaderContext reader = createReaderContext(indexShard, request)) { + searchSourceBuilder.docValueField("one_field_too_much"); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(reader, request, null, randomBoolean())); + () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); assertEquals( - "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " - + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", - ex.getMessage()); + "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", + ex.getMessage()); } } @@ -442,22 +447,20 @@ public void testMaxScriptFieldsSearch() throws IOException { searchSourceBuilder.scriptField("field" + i, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } - final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - - try(ReaderContext reader = createReaderContext(indexShard, request)) { - try (SearchContext context = service.createContext(reader, request, null, randomBoolean())) { - assertNotNull(context); - } + try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null, null))) { + assertNotNull(context); searchSourceBuilder.scriptField("anotherScriptField", - new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(reader, request, null, randomBoolean())); + () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); assertEquals( - "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" - + (maxScriptFields + 1) - + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", - ex.getMessage()); + "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + + (maxScriptFields + 1) + + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", + ex.getMessage()); } } @@ -474,12 +477,10 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { searchSourceBuilder.scriptField("field" + 0, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); - final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, + try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, request); - SearchContext context = service.createContext(reader, request, null, randomBoolean())) { - assertEquals(0, context.scriptFields().fields().size()); + 1.0f, -1, null, null))) { + assertEquals(0, context.scriptFields().fields().size()); } } @@ -513,7 +514,7 @@ public void testMaxOpenScrollContexts() throws RuntimeException { } ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutReaderContext(new ShardScrollRequestTest(indexShard.shardId()))); + () -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()))); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -625,11 +626,10 @@ public void testCanMatch() throws IOException { assertEquals(numWrapReader, numWrapInvocations.get()); // make sure that the wrapper is called when the context is actually created - try (ReaderContext reader = createReaderContext(indexShard, new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)); - SearchContext context = service.createContext(reader, reader.request(), null, randomBoolean())) { - } - assertEquals(numWrapReader + 1, numWrapInvocations.get()); + service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), + 1f, -1, null, null)).close(); + assertEquals(numWrapReader+1, numWrapInvocations.get()); } public void testCanRewriteToMatchNone() { @@ -744,23 +744,18 @@ public void testCreateSearchContextFailure() throws IOException { final IndexService indexService = createIndex(index); final SearchService service = getInstanceFromNode(SearchService.class); final ShardId shardId = new ShardId(indexService.index(), 0); - final ShardSearchRequest request = new ShardSearchRequest(shardId, 0, null) { - @Override - public SearchType searchType() { - // induce an artificial NPE - throw new NullPointerException("expected"); - } - }; - try (ReaderContext reader = createReaderContext(indexService.getShard(shardId.id()), request)) { - NullPointerException e = expectThrows(NullPointerException.class, - () -> service.createContext(reader, request, null, randomBoolean())); - assertEquals("expected", e.getMessage()); - } - assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); - } - private ReaderContext createReaderContext(IndexShard shard, ShardSearchRequest request) { - Engine.Searcher searcher = shard.acquireSearcher("test"); - return new ReaderContext(randomNonNegativeLong(), shard, searcher, request); + NullPointerException e = expectThrows(NullPointerException.class, + () -> service.createContext( + new ShardSearchRequest(shardId, 0, null) { + @Override + public SearchType searchType() { + // induce an artificial NPE + throw new NullPointerException("expected"); + } + } + )); + assertEquals("expected", e.getMessage()); + assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index b2d4314b3a0a5..4c33177e268f9 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -316,12 +316,13 @@ public void testInOrderScrollOptimization() throws Exception { } w.close(); IndexReader reader = DirectoryReader.open(dir); - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader)); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; + context.scrollContext(scrollContext); context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -544,12 +545,13 @@ public void testIndexSortScrollOptimization() throws Exception { // search sort is a prefix of the index sort searchSortAndFormats.add(new SortAndFormats(new Sort(indexSort.getSort()[0]), new DocValueFormat[]{DocValueFormat.RAW})); for (SortAndFormats searchSortAndFormat : searchSortAndFormats) { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader)); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; + context.scrollContext(scrollContext); context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 1a5c3d7815440..9653f3b66a515 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -27,7 +27,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; @@ -40,11 +40,11 @@ public class MockSearchService extends SearchService { */ public static class TestPlugin extends Plugin {} - private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); + private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); /** Throw an {@link AssertionError} if there are still in-flight contexts. */ public static void assertNoInFlightContext() { - final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); + final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); if (copy.isEmpty() == false) { throw new AssertionError( "There are still [" + copy.size() @@ -56,14 +56,14 @@ public static void assertNoInFlightContext() { /** * Add an active search context to the list of tracked contexts. Package private for testing. */ - static void addActiveContext(ReaderContext context) { + static void addActiveContext(SearchContext context) { ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(context.toString())); } /** * Clear an active search context from the list of tracked contexts. Package private for testing. */ - static void removeActiveContext(ReaderContext context) { + static void removeActiveContext(SearchContext context) { ACTIVE_SEARCH_CONTEXTS.remove(context); } @@ -74,14 +74,14 @@ public MockSearchService(ClusterService clusterService, } @Override - protected void putReaderContext(ReaderContext context) { + protected void putContext(SearchContext context) { addActiveContext(context); - super.putReaderContext(context); + super.putContext(context); } @Override - protected ReaderContext removeReaderContext(long id) { - final ReaderContext removed = super.removeReaderContext(id); + protected SearchContext removeContext(long id) { + final SearchContext removed = super.removeContext(id); if (removed != null) { removeActiveContext(removed); } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index cb461009b7a0d..c671b9c109017 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -249,7 +249,7 @@ public boolean shouldCache(Query query) { * close their sub-aggregations. This is fairly similar to what the production code does. */ releasables.add((Releasable) invocation.getArguments()[0]); return null; - }).when(searchContext).addReleasable(anyObject()); + }).when(searchContext).addReleasable(anyObject(), anyObject()); return searchContext; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index fc9a8fe7efa62..4edb9adacd13b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -110,18 +110,12 @@ public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexSh } public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard, ContextIndexSearcher searcher) { - this(queryShardContext, indexShard, searcher, null); - } - - public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard, - ContextIndexSearcher searcher, ScrollContext scrollContext) { this.bigArrays = null; this.indexService = null; this.fixedBitSetFilterCache = null; this.indexShard = indexShard; this.queryShardContext = queryShardContext; this.searcher = searcher; - this.scrollContext = scrollContext; } public void setSearcher(ContextIndexSearcher searcher) { @@ -172,11 +166,22 @@ public float queryBoost() { return 0; } + @Override + public long getOriginNanoTime() { + return originNanoTime; + } + @Override public ScrollContext scrollContext() { return scrollContext; } + @Override + public SearchContext scrollContext(ScrollContext scrollContext) { + this.scrollContext = scrollContext; + return this; + } + @Override public SearchContextAggregations aggregations() { return aggregations; @@ -221,6 +226,10 @@ public List rescore() { return Collections.emptyList(); } + @Override + public void addRescore(RescoreContext rescore) { + } + @Override public boolean hasScriptFields() { return false; @@ -527,6 +536,24 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return null; } + @Override + public void accessed(long accessTime) { + } + + @Override + public long lastAccessTime() { + return 0; + } + + @Override + public long keepAlive() { + return 0; + } + + @Override + public void keepAlive(long keepAlive) { + } + @Override public DfsSearchResult dfsResult() { return null; diff --git a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java index b49f1fb0acaf5..8a8842487f14a 100644 --- a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java @@ -19,25 +19,59 @@ package org.elasticsearch.search; -import org.elasticsearch.search.internal.ReaderContext; +import org.apache.lucene.search.Query; +import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; - -import static org.mockito.Mockito.mock; +import org.elasticsearch.test.TestSearchContext; public class MockSearchServiceTests extends ESTestCase { + public static final IndexMetaData EMPTY_INDEX_METADATA = IndexMetaData.builder("") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(0).build(); public void testAssertNoInFlightContext() { - ReaderContext reader = mock(ReaderContext.class); - MockSearchService.addActiveContext(reader); + final long nowInMillis = randomNonNegativeLong(); + SearchContext s = new TestSearchContext(new QueryShardContext(0, + new IndexSettings(EMPTY_INDEX_METADATA, Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, null, null, null, null, null, + xContentRegistry(), writableRegistry(), null, null, () -> nowInMillis, null, null)) { + + @Override + public SearchShardTarget shardTarget() { + return new SearchShardTarget("node", new ShardId("idx", "ignored", 0), null, OriginalIndices.NONE); + } + + @Override + public SearchType searchType() { + return SearchType.DEFAULT; + } + + @Override + public Query query() { + return Queries.newMatchAllQuery(); + } + }; + MockSearchService.addActiveContext(s); try { Throwable e = expectThrows(AssertionError.class, () -> MockSearchService.assertNoInFlightContext()); assertEquals("There are still [1] in-flight contexts. The first one's creation site is listed as the cause of this exception.", e.getMessage()); e = e.getCause(); + // The next line with throw an exception if the date looks wrong + assertEquals("[node][idx][0] query=[*:*]", e.getMessage()); assertEquals(MockSearchService.class.getName(), e.getStackTrace()[0].getClassName()); assertEquals(MockSearchServiceTests.class.getName(), e.getStackTrace()[1].getClassName()); } finally { - MockSearchService.removeActiveContext(reader); + MockSearchService.removeActiveContext(s); } } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index e78e937a1f142..5a22857494557 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -286,34 +285,27 @@ public void validateSearchContext(SearchContext context, TransportRequest transp } catch (IOException e) { throw new UncheckedIOException(e); } + // also register a release resource in this case if we have multiple roundtrips like in DFS + registerRelease(context, lazyDirectoryReader); } } - @Override - public void onFreeSearchContext(SearchContext context) { - DirectoryReader dirReader = context.searcher().getDirectoryReader(); - LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); - if (lazyDirectoryReader != null) { + private void registerRelease(SearchContext context, LazyDirectoryReader lazyDirectoryReader) { + context.addReleasable(() -> { try { lazyDirectoryReader.release(); } catch (IOException e) { throw new UncheckedIOException(e); } - } + }, SearchContext.Lifetime.PHASE); } @Override - public void onNewReaderContext(ReaderContext readerContext) { - DirectoryReader dirReader = readerContext.engineSearcher().getDirectoryReader(); + public void onNewContext(SearchContext context) { + DirectoryReader dirReader = context.searcher().getDirectoryReader(); LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); if (lazyDirectoryReader != null) { - readerContext.addOnClose(() -> { - try { - lazyDirectoryReader.release(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + registerRelease(context, lazyDirectoryReader); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index aef1652b42fc5..5e0c2945caadc 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -46,9 +46,10 @@ public SecuritySearchOperationListener(ThreadContext threadContext, XPackLicense * Adds the {@link Authentication} to the {@link ScrollContext} */ @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(SearchContext searchContext) { if (licenseState.isAuthAllowed()) { - scrollContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, Authentication.getAuthentication(threadContext)); + searchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, + Authentication.getAuthentication(threadContext)); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 05f0ed7f0c42f..5b73d6d212fc5 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -49,19 +49,18 @@ public void testUnlicensed() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); AuditTrailService auditTrailService = mock(AuditTrailService.class); SearchContext searchContext = mock(SearchContext.class); - ScrollContext scrollContext = new ScrollContext(); - when(searchContext.scrollContext()).thenReturn(scrollContext); + when(searchContext.scrollContext()).thenReturn(new ScrollContext()); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(threadContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(searchContext); listener.validateSearchContext(searchContext, Empty.INSTANCE); verify(licenseState, times(2)).isAuthAllowed(); verifyZeroInteractions(auditTrailService, searchContext); } public void testOnNewContextSetsAuthentication() throws Exception { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); + TestScrollSearchContext testSearchContext = new TestScrollSearchContext(); + testSearchContext.scrollContext(new ScrollContext()); final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(2L)); testSearchContext.scrollContext().scroll = scroll; XPackLicenseState licenseState = mock(XPackLicenseState.class); @@ -72,7 +71,7 @@ public void testOnNewContextSetsAuthentication() throws Exception { authentication.writeToContext(threadContext); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(threadContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(testSearchContext); Authentication contextAuth = testSearchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); assertEquals(authentication, contextAuth); @@ -83,8 +82,8 @@ public void testOnNewContextSetsAuthentication() throws Exception { } public void testValidateSearchContext() throws Exception { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); + TestScrollSearchContext testSearchContext = new TestScrollSearchContext(); + testSearchContext.scrollContext(new ScrollContext()); testSearchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null)); testSearchContext.scrollContext().scroll = new Scroll(TimeValue.timeValueSeconds(2L)); @@ -236,4 +235,24 @@ public void testEnsuredAuthenticatedUserIsSame() { verify(auditTrail).accessDenied(eq(auditId), eq(runAsDiffType), eq(action), eq(request), authzInfoRoles(original.getUser().roles())); } + + static class TestScrollSearchContext extends TestSearchContext { + + private ScrollContext scrollContext; + + TestScrollSearchContext() { + super(null); + } + + @Override + public ScrollContext scrollContext() { + return scrollContext; + } + + @Override + public SearchContext scrollContext(ScrollContext scrollContext) { + this.scrollContext = scrollContext; + return this; + } + } }