From 5ef42803a0a761affd03b3b0eb39bbee7f10e8e7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 Jan 2020 18:03:37 -0500 Subject: [PATCH 1/3] Cut over from SearchContext to ReaderContext --- .../action/search/SearchTransportService.java | 4 +- .../index/search/stats/ShardSearchStats.java | 12 +- .../index/shard/SearchOperationListener.java | 38 +-- .../search/DefaultSearchContext.java | 88 ++--- .../elasticsearch/search/SearchService.java | 304 +++++++----------- .../search/aggregations/AggregatorBase.java | 2 +- .../aggregations/AggregatorFactory.java | 2 +- .../internal/FilteredSearchContext.java | 35 -- .../search/internal/ReaderContext.java | 155 +++++++++ .../search/internal/ScrollContext.java | 7 + .../search/internal/SearchContext.java | 42 +-- .../search/internal/SubSearchContext.java | 21 -- .../elasticsearch/index/IndexModuleTests.java | 10 +- .../shard/SearchOperationListenerTests.java | 27 +- .../search/DefaultSearchContextTests.java | 36 ++- .../search/SearchServiceTests.java | 149 ++++----- .../search/query/QueryPhaseTests.java | 10 +- .../search/MockSearchService.java | 18 +- .../elasticsearch/test/TestSearchContext.java | 39 +-- .../search/MockSearchServiceTests.java | 46 +-- .../index/engine/FrozenEngine.java | 31 +- .../SecuritySearchOperationListener.java | 5 +- .../SecuritySearchOperationListenerTests.java | 35 +- 23 files changed, 504 insertions(+), 612 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 822557696f7e5..76aa1fa274823 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -286,13 +286,13 @@ public void writeTo(StreamOutput out) throws IOException { public static void registerRequestHandler(TransportService transportService, SearchService searchService) { transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new, (request, channel, task) -> { - boolean freed = searchService.freeContext(request.id()); + boolean freed = searchService.freeReaderContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new, (request, channel, task) -> { - boolean freed = searchService.freeContext(request.id()); + boolean freed = searchService.freeReaderContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index 5446b1ed0e372..664b631249f68 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import java.util.HashMap; @@ -146,25 +148,25 @@ private StatsHolder groupStats(String group) { } @Override - public void onNewContext(SearchContext context) { + public void onNewReaderContext(ReaderContext readerContext) { openContexts.inc(); } @Override - public void onFreeContext(SearchContext context) { + public void onFreeReaderContext(ReaderContext readerContext) { openContexts.dec(); } @Override - public void onNewScrollContext(SearchContext context) { + public void onNewScrollContext(ScrollContext scrollContext) { totalStats.scrollCurrent.inc(); } @Override - public void onFreeScrollContext(SearchContext context) { + public void onFreeScrollContext(ScrollContext scrollContext) { totalStats.scrollCurrent.dec(); assert totalStats.scrollCurrent.count() >= 0; - totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - context.getOriginNanoTime())); + totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano())); } static final class StatsHolder { diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index ede86e6ec222d..7c58fd96b8439 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -76,34 +78,34 @@ default void onFailedFetchPhase(SearchContext searchContext) {} default void onFetchPhase(SearchContext searchContext, long tookInNanos) {} /** - * Executed when a new search context was created - * @param context the created context + * Executed when a new reader context was created + * @param readerContext the created context */ - default void onNewContext(SearchContext context) {} + default void onNewReaderContext(ReaderContext readerContext) {} /** - * Executed when a previously created search context is freed. + * Executed when a previously created reader context is freed. * This happens either when the search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param context the freed search context + * @param readerContext the freed reader context */ - default void onFreeContext(SearchContext context) {} + default void onFreeReaderContext(ReaderContext readerContext) {} /** * Executed when a new scroll search {@link SearchContext} was created - * @param context the created search context + * @param scrollContext the created search context */ - default void onNewScrollContext(SearchContext context) {} + default void onNewScrollContext(ScrollContext scrollContext) {} /** * Executed when a scroll search {@link SearchContext} is freed. * This happens either when the scroll search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param context the freed search context + * @param scrollContext the freed search context */ - default void onFreeScrollContext(SearchContext context) {} + default void onFreeScrollContext(ScrollContext scrollContext) {} /** * Executed prior to using a {@link SearchContext} that has been retrieved @@ -193,10 +195,10 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { } @Override - public void onNewContext(SearchContext context) { + public void onNewReaderContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onNewContext(context); + listener.onNewReaderContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewContext listener [{}] failed", listener), e); } @@ -204,10 +206,10 @@ public void onNewContext(SearchContext context) { } @Override - public void onFreeContext(SearchContext context) { + public void onFreeReaderContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeContext(context); + listener.onFreeReaderContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeContext listener [{}] failed", listener), e); } @@ -215,10 +217,10 @@ public void onFreeContext(SearchContext context) { } @Override - public void onNewScrollContext(SearchContext context) { + public void onNewScrollContext(ScrollContext scrollContext) { for (SearchOperationListener listener : listeners) { try { - listener.onNewScrollContext(context); + listener.onNewScrollContext(scrollContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e); } @@ -226,10 +228,10 @@ public void onNewScrollContext(SearchContext context) { } @Override - public void onFreeScrollContext(SearchContext context) { + public void onFreeScrollContext(ScrollContext scrollContext) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeScrollContext(context); + listener.onFreeScrollContext(scrollContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index c7f0f885d793b..384f6b9633937 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery; import org.elasticsearch.common.lucene.search.function.WeightFactorFunction; @@ -60,6 +59,7 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext; import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight; import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -74,7 +74,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,12 +81,11 @@ final class DefaultSearchContext extends SearchContext { - private final long id; + private final ReaderContext readerContext; private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; private SearchType searchType; - private final Engine.Searcher engineSearcher; private final BigArrays bigArrays; private final IndexShard indexShard; private final ClusterService clusterService; @@ -101,7 +99,6 @@ final class DefaultSearchContext extends SearchContext { // terminate after count private int terminateAfter = DEFAULT_TERMINATE_AFTER; private List groupStats; - private ScrollContext scrollContext; private boolean explain; private boolean version = false; // by default, we don't return versions private boolean seqAndPrimaryTerm = false; @@ -141,10 +138,6 @@ final class DefaultSearchContext extends SearchContext { private SearchContextAggregations aggregations; private SearchContextHighlight highlight; private SuggestionSearchContext suggest; - private List rescore; - private volatile long keepAlive; - private final long originNanoTime = System.nanoTime(); - private volatile long lastAccessTime = -1; private Profilers profilers; private final Map searchExtBuilders = new HashMap<>(); @@ -152,36 +145,36 @@ final class DefaultSearchContext extends SearchContext { private final QueryShardContext queryShardContext; private final FetchPhase fetchPhase; - DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, - Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, - IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout, - FetchPhase fetchPhase) { - this.id = id; + DefaultSearchContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget shardTarget, + ClusterService clusterService, IndexService indexService, IndexShard indexShard, BigArrays bigArrays, + LongSupplier relativeTimeSupplier, TimeValue timeout, FetchPhase fetchPhase) { + this.readerContext = readerContext; this.request = request; this.fetchPhase = fetchPhase; this.searchType = request.searchType(); this.shardTarget = shardTarget; - this.engineSearcher = engineSearcher; // SearchContexts use a BigArrays that can circuit break this.bigArrays = bigArrays.withCircuitBreaking(); - this.dfsResult = new DfsSearchResult(id, shardTarget); - this.queryResult = new QuerySearchResult(id, shardTarget); - this.fetchResult = new FetchSearchResult(id, shardTarget); + this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget); + this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget); + this.fetchResult = new FetchSearchResult(readerContext.id(), shardTarget); this.indexShard = indexShard; this.indexService = indexService; this.clusterService = clusterService; + final Engine.Searcher engineSearcher = readerContext.engineSearcher(); this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy()); + searcher.setAggregatedDfs(readerContext.aggregatedDfs()); this.relativeTimeSupplier = relativeTimeSupplier; this.timeout = timeout; - queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher, + queryShardContext = indexService.newQueryShardContext(request.shardId().id(), this.searcher, request::nowInMillis, shardTarget.getClusterAlias()); queryBoost = request.indexBoost(); } @Override public void doClose() { - Releasables.close(engineSearcher); + readerContext.decRef(); } /** @@ -198,7 +191,7 @@ public void preProcess(boolean rewrite) { int maxResultWindow = indexService.getIndexSettings().getMaxResultWindow(); if (resultWindow > maxResultWindow) { - if (scrollContext == null) { + if (scrollContext() == null) { throw new IllegalArgumentException( "Result window is too large, from + size must be less than or equal to: [" + maxResultWindow + "] but was [" + resultWindow + "]. See the scroll api for a more efficient way to request large data sets. " @@ -210,12 +203,12 @@ public void preProcess(boolean rewrite) { + "]. Scroll batch sizes cost as much memory as result windows so they are controlled by the [" + IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey() + "] index level setting."); } - if (rescore != null) { + if (rescore().isEmpty() == false) { if (sort != null) { throw new IllegalArgumentException("Cannot use [sort] option in conjunction with [rescore]."); } int maxWindow = indexService.getIndexSettings().getMaxRescoreWindow(); - for (RescoreContext rescoreContext: rescore) { + for (RescoreContext rescoreContext: rescore()) { if (rescoreContext.getWindowSize() > maxWindow) { throw new IllegalArgumentException("Rescore window [" + rescoreContext.getWindowSize() + "] is too large. " + "It must be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results " @@ -291,12 +284,12 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query) @Override public long id() { - return this.id; + return readerContext.id(); } @Override public String source() { - return engineSearcher.source(); + return readerContext.source(); } @Override @@ -324,21 +317,11 @@ public float queryBoost() { return queryBoost; } - @Override - public long getOriginNanoTime() { - return originNanoTime; - } - @Override public ScrollContext scrollContext() { - return this.scrollContext; + return readerContext.scrollContext(); } - @Override - public SearchContext scrollContext(ScrollContext scrollContext) { - this.scrollContext = scrollContext; - return this; - } @Override public SearchContextAggregations aggregations() { @@ -385,18 +368,7 @@ public void suggest(SuggestionSearchContext suggest) { @Override public List rescore() { - if (rescore == null) { - return Collections.emptyList(); - } - return rescore; - } - - @Override - public void addRescore(RescoreContext rescore) { - if (this.rescore == null) { - this.rescore = new ArrayList<>(); - } - this.rescore.add(rescore); + return readerContext.rescore(); } @Override @@ -730,26 +702,6 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return this; } - @Override - public void accessed(long accessTime) { - this.lastAccessTime = accessTime; - } - - @Override - public long lastAccessTime() { - return this.lastAccessTime; - } - - @Override - public long keepAlive() { - return this.keepAlive; - } - - @Override - public void keepAlive(long keepAlive) { - this.keepAlive = keepAlive; - } - @Override public DfsSearchResult dfsResult() { return dfsResult; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 84c4903dfd838..1afc05eccb550 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -32,9 +32,11 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -82,7 +84,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.ScrollContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -102,7 +104,6 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportRequest; import java.io.IOException; import java.util.ArrayList; @@ -118,7 +119,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; -import java.util.function.Supplier; import static org.elasticsearch.common.unit.TimeValue.timeValueHours; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -189,7 +189,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicLong idGenerator = new AtomicLong(); - private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); + private final ConcurrentMapLong activeReaders = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final MultiBucketConsumerService multiBucketConsumerService; @@ -278,13 +278,13 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem } - protected void putContext(SearchContext context) { - final SearchContext previous = activeContexts.put(context.id(), context); + protected void putReaderContext(ReaderContext context) { + final ReaderContext previous = activeReaders.put(context.id(), context); assert previous == null; } - protected SearchContext removeContext(long id) { - return activeContexts.remove(id); + protected ReaderContext removeReaderContext(long id) { + return activeReaders.remove(id); } @Override @@ -293,8 +293,8 @@ protected void doStart() { @Override protected void doStop() { - for (final SearchContext context : activeContexts.values()) { - freeContext(context.id()); + for (final ReaderContext context : activeReaders.values()) { + freeReaderContext(context.id()); } } @@ -309,20 +309,14 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac } private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { - final SearchContext context = createAndPutContext(request); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); + final ReaderContext reader = createAndPutReaderContext(request); + try (SearchContext context = createContext(reader, reader.request(), task, true)) { dfsPhase.execute(context); - contextProcessedSuccessfully(context); return context.dfsResult(); } catch (Exception e) { logger.trace("Dfs phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } } @@ -343,23 +337,18 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } - private void runAsync(long id, Supplier executable, ActionListener listener) { + private void runAsync(long id, CheckedSupplier executable, ActionListener listener) { getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { - final SearchContext context = createAndPutContext(request); - context.incRef(); - try { - context.setTask(task); + final ReaderContext reader = createAndPutReaderContext(request); + try (SearchContext context = createContext(reader, reader.request(), task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - contextProcessing(context); loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + freeReaderContext(reader.id()); } afterQueryTime = executor.success(); } @@ -374,10 +363,8 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } logger.trace("Query phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } } @@ -386,9 +373,7 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, long aft shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + freeReaderContext(context.id()); } executor.success(); } @@ -399,49 +384,41 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final SearchContext context = findContext(request.id(), request); - context.incRef(); - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - context.setTask(task); - contextProcessing(context); - processScroll(request, context); + final ReaderContext reader = findReaderContext(request.id()); + try (SearchContext context = createContext(reader, reader.request(), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + processScroll(request, reader, context); queryPhase.execute(context); - contextProcessedSuccessfully(context); executor.success(); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } }, listener); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final SearchContext context = findContext(request.id(), request); - context.setTask(task); - context.incRef(); - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - contextProcessing(context); + final ReaderContext reader = findReaderContext(request.id()); + reader.aggregatedDfs(request.dfs()); + try (SearchContext context = createContext(reader, reader.request(), task, true); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); context.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + freeReaderContext(reader.id()); } executor.success(); return context.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } }, listener); } @@ -457,11 +434,7 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } final Executor getExecutor(long id) { - SearchContext context = activeContexts.get(id); - if (context == null) { - throw new SearchContextMissingException(id); - } - return getExecutor(context.indexShard()); + return getExecutor(findReaderContext(id).indexShard()); } private Executor getExecutor(IndexShard indexShard) { @@ -472,33 +445,28 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final SearchContext context = findContext(request.id(), request); - context.setTask(task); - context.incRef(); - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ - contextProcessing(context); - processScroll(request, context); + final ReaderContext reader = findReaderContext(request.id()); + try (SearchContext context = createContext(reader, reader.request(), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + processScroll(request, reader, context); queryPhase.execute(context); final long afterQueryTime = executor.success(); QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } }, listener); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final SearchContext context = findContext(request.id(), request); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); + final ReaderContext reader = findReaderContext(request.id()); + try (SearchContext context = createContext(reader, reader.request(), task, false)) { + reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); if (request.lastEmittedDoc() != null) { context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -506,89 +474,74 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + freeReaderContext(request.id()); } executor.success(); } return context.fetchResult(); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(context, e); + processFailure(reader, e); throw e; - } finally { - cleanContext(context); } }, listener); } - private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException { - SearchContext context = activeContexts.get(id); - if (context == null) { + private ReaderContext findReaderContext(long id) throws SearchContextMissingException { + final ReaderContext reader = activeReaders.get(id); + if (reader == null) { throw new SearchContextMissingException(id); } - - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - try { - operationListener.validateSearchContext(context, request); - return context; - } catch (Exception e) { - processFailure(context, e); - throw e; - } + return reader; } - final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException { + final ReaderContext createAndPutReaderContext(ShardSearchRequest request) { if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) { throw new ElasticsearchException( "Trying to create too many scroll contexts. Must be less than or equal to: [" + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); } - - SearchContext context = createContext(request); - onNewContext(context); - boolean success = false; + final IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); + final SearchOperationListener searchOperationListener = indexShard.getSearchOperationListener(); + Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); + ReaderContext readerContext = null; try { - putContext(context); - success = true; - return context; - } finally { - if (success == false) { - freeContext(context.id()); - } - } - } - - private void onNewContext(SearchContext context) { - boolean success = false; - try { - if (context.scrollContext() != null) { + readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, request); + engineSearcher = null; + final ReaderContext finalReaderContext = readerContext; + searchOperationListener.onNewReaderContext(finalReaderContext); + if (finalReaderContext.scrollContext() != null) { + searchOperationListener.onNewScrollContext(finalReaderContext.scrollContext()); openScrollContexts.incrementAndGet(); - context.indexShard().getSearchOperationListener().onNewScrollContext(context); } - context.indexShard().getSearchOperationListener().onNewContext(context); - success = true; - } finally { - // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the - // right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future. - if (success == false) { - try (context) { - onFreeContext(context); + readerContext.addOnClose(() -> { + try { + if (finalReaderContext.scrollContext() != null) { + openScrollContexts.decrementAndGet(); + searchOperationListener.onFreeScrollContext(finalReaderContext.scrollContext()); + } + } finally { + searchOperationListener.onFreeReaderContext(finalReaderContext); } - } + }); + putReaderContext(finalReaderContext); + readerContext = null; + return finalReaderContext; + } finally { + Releasables.close(engineSearcher, readerContext); } } - final SearchContext createContext(ShardSearchRequest request) throws IOException { - final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); + final SearchContext createContext(ReaderContext reader, ShardSearchRequest request, SearchShardTask task, + boolean includeAggregations) throws IOException { + final DefaultSearchContext context = createSearchContext(reader, request, defaultSearchTimeout); try { + context.setTask(task); if (request.scroll() != null) { - context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); } - parseSource(context, request.source()); + parseSource(reader, context, request.source(), includeAggregations); // if the from and size are still not set, default them if (context.from() == -1) { @@ -608,8 +561,11 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException if (request.scroll() != null && request.scroll().keepAlive() != null) { keepAlive = request.scroll().keepAlive().millis(); } - contextScrollKeepAlive(context, keepAlive); + contextScrollKeepAlive(reader, keepAlive); context.lowLevelCancellation(lowLevelCancellation); + // Disable timeout while searching and mark the reader as accessed when releasing the context. + context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis()), Lifetime.CONTEXT); + reader.accessed(-1L); } catch (Exception e) { context.close(); throw e; @@ -619,28 +575,34 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException } public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { - return createSearchContext(request, timeout, true, "search"); + IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); + Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); + try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, request)) { + engineSearcher = null; // transfer ownership to readerContext + return createSearchContext(readerContext, request, timeout); + } finally { + Releasables.close(engineSearcher); + } } - private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, - boolean assertAsyncActions, String source) + private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSearchRequest request, TimeValue timeout) throws IOException { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); - Engine.Searcher searcher = indexShard.acquireSearcher(source); boolean success = false; DefaultSearchContext searchContext = null; + reader.incRef(); try { - searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, - searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); + searchContext = new DefaultSearchContext(reader, request, shardTarget, + clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services // during rewrite and normalized / evaluate templates etc. QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext()); - Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions); + Rewriteable.rewrite(request.getRewriteable(), context, true); assert searchContext.getQueryShardContext().isCacheable(); success = true; } finally { @@ -649,7 +611,7 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim if (searchContext == null) { // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). - IOUtils.closeWhileHandlingException(searcher); + IOUtils.closeWhileHandlingException(reader::decRef); } } } @@ -658,72 +620,41 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim private void freeAllContextForIndex(Index index) { assert index != null; - for (SearchContext ctx : activeContexts.values()) { + for (ReaderContext ctx : activeReaders.values()) { if (index.equals(ctx.indexShard().shardId().getIndex())) { - freeContext(ctx.id()); + freeReaderContext(ctx.id()); } } } - public boolean freeContext(long id) { - try (SearchContext context = removeContext(id)) { - if (context != null) { - onFreeContext(context); - return true; - } - return false; + public boolean freeReaderContext(long id) { + try (ReaderContext context = removeReaderContext(id)) { + return context != null; } } - private void onFreeContext(SearchContext context) { - assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); - assert activeContexts.containsKey(context.id()) == false; - context.indexShard().getSearchOperationListener().onFreeContext(context); - if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); - context.indexShard().getSearchOperationListener().onFreeScrollContext(context); - } - } public void freeAllScrollContexts() { - for (SearchContext searchContext : activeContexts.values()) { - if (searchContext.scrollContext() != null) { - freeContext(searchContext.id()); + for (ReaderContext readerContext : activeReaders.values()) { + if (readerContext.scrollContext() != null) { + freeReaderContext(readerContext.id()); } } } - private void contextScrollKeepAlive(SearchContext context, long keepAlive) { + private void contextScrollKeepAlive(ReaderContext readerContext, long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( "Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " + "It must be less than (" + TimeValue.timeValueMillis(maxKeepAlive) + "). " + "This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() + "] cluster level setting."); } - context.keepAlive(keepAlive); - } - - private void contextProcessing(SearchContext context) { - // disable timeout while executing a search - context.accessed(-1); - } - - private void contextProcessedSuccessfully(SearchContext context) { - context.accessed(threadPool.relativeTimeInMillis()); - } - - private void cleanContext(SearchContext context) { - try { - context.clearReleasables(Lifetime.PHASE); - context.setTask(null); - } finally { - context.decRef(); - } + readerContext.keepAlive(keepAlive); } - private void processFailure(SearchContext context, Exception e) { - freeContext(context.id()); + private void processFailure(ReaderContext context, Exception e) { + freeReaderContext(context.id()); try { if (Lucene.isCorruptionException(e)) { context.indexShard().failShard("search execution corruption failure", e); @@ -734,7 +665,8 @@ private void processFailure(SearchContext context, Exception e) { } } - private void parseSource(DefaultSearchContext context, SearchSourceBuilder source) throws SearchException { + private void parseSource(ReaderContext reader, DefaultSearchContext context, + SearchSourceBuilder source, boolean includeAggregations) throws SearchException { // nothing to parse... if (source == null) { return; @@ -790,7 +722,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.timeout(source.timeout()); } context.terminateAfter(source.terminateAfter()); - if (source.aggregations() != null) { + if (source.aggregations() != null && includeAggregations) { try { AggregatorFactories factories = source.aggregations().build(queryShardContext, null); context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create())); @@ -805,10 +737,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e); } } - if (source.rescores() != null) { + if (source.rescores() != null && reader.rescore().isEmpty()) { try { for (RescorerBuilder rescore : source.rescores()) { - context.addRescore(rescore.buildContext(queryShardContext)); + reader.addRescore(rescore.buildContext(queryShardContext)); } } catch (IOException e) { throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e); @@ -966,13 +898,13 @@ private void shortcutDocIdsToLoad(SearchContext context) { context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } - private void processScroll(InternalScrollSearchRequest request, SearchContext context) { + private void processScroll(InternalScrollSearchRequest request, ReaderContext reader, SearchContext context) { // process scroll context.from(context.from() + context.size()); context.scrollContext().scroll = request.scroll(); // update the context keep alive based on the new scroll value if (request.scroll() != null && request.scroll().keepAlive() != null) { - contextScrollKeepAlive(context, request.scroll().keepAlive().millis()); + contextScrollKeepAlive(reader, request.scroll().keepAlive().millis()); } } @@ -981,7 +913,7 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co * SearchService */ public int getActiveContexts() { - return this.activeContexts.size(); + return this.activeReaders.size(); } public ResponseCollectorService getResponseCollectorService() { @@ -992,7 +924,7 @@ class Reaper implements Runnable { @Override public void run() { final long time = threadPool.relativeTimeInMillis(); - for (SearchContext context : activeContexts.values()) { + for (ReaderContext context : activeReaders.values()) { // Use the same value for both checks since lastAccessTime can // be modified by another thread between checks! final long lastAccessTime = context.lastAccessTime(); @@ -1002,7 +934,7 @@ public void run() { if ((time - lastAccessTime > context.keepAlive())) { logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive()); - freeContext(context.id()); + freeReaderContext(context.id()); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index ea89700521798..8788f97de0b2a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -76,7 +76,7 @@ protected AggregatorBase(String name, AggregatorFactories factories, SearchConte this.breakerService = context.bigArrays().breakerService(); assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.subAggregators = factories.createSubAggregators(context, this); - context.addReleasable(this, Lifetime.PHASE); + context.addReleasable(this, Lifetime.CONTEXT); final SearchShardTarget shardTarget = context.shardTarget(); // Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent preCollection call) collectableSubAggregators = new BucketCollector() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java index 970ef725f027d..743d8d730033a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java @@ -50,7 +50,7 @@ public static final class MultiBucketAggregatorWrapper extends Aggregator { this.parent = parent; this.factory = factory; this.first = first; - context.addReleasable(this, Lifetime.PHASE); + context.addReleasable(this, Lifetime.CONTEXT); aggregators = bigArrays.newObjectArray(1); aggregators.set(0, first); collectors = bigArrays.newObjectArray(1); diff --git a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 8c04954a4efcf..ab605062de50a 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -140,21 +140,11 @@ public float queryBoost() { return in.queryBoost(); } - @Override - public long getOriginNanoTime() { - return in.getOriginNanoTime(); - } - @Override public ScrollContext scrollContext() { return in.scrollContext(); } - @Override - public SearchContext scrollContext(ScrollContext scroll) { - return in.scrollContext(scroll); - } - @Override public SearchContextAggregations aggregations() { return in.aggregations(); @@ -195,11 +185,6 @@ public List rescore() { return in.rescore(); } - @Override - public void addRescore(RescoreContext rescore) { - in.addRescore(rescore); - } - @Override public boolean hasScriptFields() { return in.hasScriptFields(); @@ -451,26 +436,6 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return in.docIdsToLoad(docIdsToLoad, docsIdsToLoadFrom, docsIdsToLoadSize); } - @Override - public void accessed(long accessTime) { - in.accessed(accessTime); - } - - @Override - public long lastAccessTime() { - return in.lastAccessTime(); - } - - @Override - public long keepAlive() { - return in.keepAlive(); - } - - @Override - public void keepAlive(long keepAlive) { - in.keepAlive(keepAlive); - } - @Override public SearchLookup lookup() { return in.lookup(); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java new file mode 100644 index 0000000000000..7c9dfe14dc9f8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.internal; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.rescore.RescoreContext; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Holds a reference to a point in time {@link Engine.Searcher} that will be used to construct {@link SearchContext}. + * This class also implements {@link org.elasticsearch.common.util.concurrent.RefCounted} since in some situations like + * in {@link org.elasticsearch.search.SearchService} a SearchContext can be closed concurrently due to independent events + * ie. when an index gets removed. To prevent accessing closed IndexReader / IndexSearcher instances the SearchContext + * can be guarded by a reference count and fail if it's been closed by an external event. + * + * For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095. + */ +public class ReaderContext extends AbstractRefCounted implements Releasable { + private final long id; + private final IndexShard indexShard; + private final Engine.Searcher engineSearcher; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private volatile long keepAlive; + private volatile long lastAccessTime = -1L; + + // BWC + private final ShardSearchRequest request; + private final ScrollContext scrollContext; + private AggregatedDfs aggregatedDfs; + private List rescore; + + private final List onCloses = new CopyOnWriteArrayList<>(); + + public ReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher, ShardSearchRequest request) { + super("reader_context"); + this.id = id; + this.indexShard = indexShard; + this.engineSearcher = engineSearcher; + this.request = request; + if (request.scroll() != null) { + this.scrollContext = new ScrollContext(); + } else { + this.scrollContext = null; + } + } + + @Override + public final void close() { + if (closed.compareAndSet(false, true)) { + decRef(); + } else { + assert false : "ReaderContext was closed already"; + } + } + + @Override + protected void closeInternal() { + Releasables.close(Releasables.wrap(onCloses), engineSearcher); + } + + public void addOnClose(Releasable releasable) { + onCloses.add(releasable); + } + + public long id() { + return id; + } + + public IndexShard indexShard() { + return indexShard; + } + + public Engine.Searcher engineSearcher() { + return engineSearcher; + } + + public String source() { + return engineSearcher.source(); + } + + public ShardSearchRequest request() { + return request; + } + + public ScrollContext scrollContext() { + return scrollContext; + } + + public AggregatedDfs aggregatedDfs() { + return aggregatedDfs; + } + + public void aggregatedDfs(AggregatedDfs aggregatedDfs) { + this.aggregatedDfs = aggregatedDfs; + } + + public void accessed(long accessTime) { + this.lastAccessTime = accessTime; + } + + public long lastAccessTime() { + return this.lastAccessTime; + } + + public long keepAlive() { + return this.keepAlive; + } + + public void keepAlive(long keepAlive) { + this.keepAlive = keepAlive; + } + + public List rescore() { + if (rescore == null) { + return Collections.emptyList(); + } else { + return Collections.unmodifiableList(rescore); + } + } + + public void addRescore(RescoreContext rescore) { + if (this.rescore == null) { + this.rescore = new ArrayList<>(); + } + this.rescore.add(rescore); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java index 41d7680a780b0..440408646d127 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java @@ -36,6 +36,8 @@ public final class ScrollContext { public ScoreDoc lastEmittedDoc; public Scroll scroll; + private final long startTimeInNano = System.nanoTime(); + /** * Returns the object or null if the given key does not have a * value in the context @@ -54,4 +56,9 @@ public void putInContext(String key, Object value) { } context.put(key, value); } + + + public long getStartTimeInNano() { + return startTimeInNano; + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index c0477ece69aa0..937ea85097d79 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -29,8 +29,6 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AbstractRefCounted; -import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.fielddata.IndexFieldData; @@ -71,14 +69,8 @@ * This class encapsulates the state needed to execute a search. It holds a reference to the * shards point in time snapshot (IndexReader / ContextIndexSearcher) and allows passing on * state from one query / fetch phase to another. - * - * This class also implements {@link RefCounted} since in some situations like in {@link org.elasticsearch.search.SearchService} - * a SearchContext can be closed concurrently due to independent events ie. when an index gets removed. To prevent accessing closed - * IndexReader / IndexSearcher instances the SearchContext can be guarded by a reference count and fail if it's been closed by - * an external event. */ -// For reference why we use RefCounted here see #20095 -public abstract class SearchContext extends AbstractRefCounted implements Releasable { +public abstract class SearchContext implements Releasable { public static final int DEFAULT_TERMINATE_AFTER = 0; public static final int TRACK_TOTAL_HITS_ACCURATE = Integer.MAX_VALUE; @@ -90,7 +82,7 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas private InnerHitsContext innerHitsContext; protected SearchContext() { - super("search_context"); + } public abstract void setTask(SearchShardTask task); @@ -101,13 +93,6 @@ protected SearchContext() { @Override public final void close() { - if (closed.compareAndSet(false, true)) { // prevent double closing - decRef(); - } - } - - @Override - protected final void closeInternal() { try { clearReleasables(Lifetime.CONTEXT); } finally { @@ -115,11 +100,6 @@ protected final void closeInternal() { } } - @Override - protected void alreadyClosed() { - throw new IllegalStateException("search context is already closed can't increment refCount current count [" + refCount() + "]"); - } - protected abstract void doClose(); /** @@ -146,12 +126,8 @@ protected void alreadyClosed() { public abstract float queryBoost(); - public abstract long getOriginNanoTime(); - public abstract ScrollContext scrollContext(); - public abstract SearchContext scrollContext(ScrollContext scroll); - public abstract SearchContextAggregations aggregations(); public abstract SearchContext aggregations(SearchContextAggregations aggregations); @@ -180,8 +156,6 @@ public InnerHitsContext innerHits() { */ public abstract List rescore(); - public abstract void addRescore(RescoreContext rescore); - public abstract boolean hasScriptFields(); public abstract ScriptFieldsContext scriptFields(); @@ -322,14 +296,6 @@ public InnerHitsContext innerHits() { public abstract SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int docsIdsToLoadSize); - public abstract void accessed(long accessTime); - - public abstract long lastAccessTime(); - - public abstract long keepAlive(); - - public abstract void keepAlive(long keepAlive); - public SearchLookup lookup() { return getQueryShardContext().lookup(); } @@ -411,10 +377,6 @@ public enum Lifetime { * This life time is for objects that only live during collection time. */ COLLECTION, - /** - * This life time is for objects that need to live until the end of the current search phase. - */ - PHASE, /** * This life time is for objects that need to live until the search context they are attached to is destroyed. */ diff --git a/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java index e918aed603750..8ce36550f7b94 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java @@ -30,7 +30,6 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext; import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.rescore.RescoreContext; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestionSearchContext; @@ -85,11 +84,6 @@ public Query buildFilteredQuery(Query query) { throw new UnsupportedOperationException("this context should be read only"); } - @Override - public SearchContext scrollContext(ScrollContext scrollContext) { - throw new UnsupportedOperationException("Not supported"); - } - @Override public SearchContext aggregations(SearchContextAggregations aggregations) { throw new UnsupportedOperationException("Not supported"); @@ -110,11 +104,6 @@ public void suggest(SuggestionSearchContext suggest) { throw new UnsupportedOperationException("Not supported"); } - @Override - public void addRescore(RescoreContext rescore) { - throw new UnsupportedOperationException("Not supported"); - } - @Override public boolean hasScriptFields() { return scriptFields != null; @@ -332,16 +321,6 @@ public CollapseContext collapse() { return null; } - @Override - public void accessed(long accessTime) { - throw new UnsupportedOperationException("Not supported"); - } - - @Override - public void keepAlive(long keepAlive) { - throw new UnsupportedOperationException("Not supported"); - } - @Override public QuerySearchResult queryResult() { return querySearchResult; diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 2c11b26580f3d..18b9b5f4f6d58 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -78,11 +78,10 @@ import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -103,6 +102,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; public class IndexModuleTests extends ESTestCase { private Index index; @@ -272,9 +272,8 @@ public void testAddSearchOperationListener() throws IOException { IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap()); AtomicBoolean executed = new AtomicBoolean(false); SearchOperationListener listener = new SearchOperationListener() { - @Override - public void onNewContext(SearchContext context) { + public void onNewReaderContext(ReaderContext readerContext) { executed.set(true); } }; @@ -287,9 +286,8 @@ public void onNewContext(SearchContext context) { assertEquals(2, indexService.getSearchOperationListener().size()); assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass()); assertSame(listener, indexService.getSearchOperationListener().get(1)); - for (SearchOperationListener l : indexService.getSearchOperationListener()) { - l.onNewContext(new TestSearchContext(null)); + l.onNewReaderContext(mock(ReaderContext.class)); } assertTrue(executed.get()); indexService.close("simon says", false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java index 28bab8da0fdfb..34c528dfeda6a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.index.shard; +import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; @@ -33,6 +35,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; public class SearchOperationListenerTests extends ESTestCase { @@ -90,26 +93,26 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { } @Override - public void onNewContext(SearchContext context) { - assertNotNull(context); + public void onNewReaderContext(ReaderContext readerContext) { + assertNotNull(readerContext); newContext.incrementAndGet(); } @Override - public void onFreeContext(SearchContext context) { - assertNotNull(context); + public void onFreeReaderContext(ReaderContext readerContext) { + assertNotNull(readerContext); freeContext.incrementAndGet(); } @Override - public void onNewScrollContext(SearchContext context) { - assertNotNull(context); + public void onNewScrollContext(ScrollContext scrollContext) { + assertNotNull(scrollContext); newScrollContext.incrementAndGet(); } @Override - public void onFreeScrollContext(SearchContext context) { - assertNotNull(context); + public void onFreeScrollContext(ScrollContext scrollContext) { + assertNotNull(scrollContext); freeScrollContext.incrementAndGet(); } @@ -216,7 +219,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewContext(ctx); + compositeListener.onNewReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -229,7 +232,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewScrollContext(ctx); + compositeListener.onNewScrollContext(new ScrollContext()); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -242,7 +245,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeContext(ctx); + compositeListener.onFreeReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -255,7 +258,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeScrollContext(ctx); + compositeListener.onFreeScrollContext(new ScrollContext()); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 5dd4d1c24bd8f..408d1c8f21cfe 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -47,7 +47,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ScrollContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.rescore.RescoreContext; import org.elasticsearch.search.slice.SliceBuilder; @@ -108,26 +108,33 @@ public void testPreProcess() throws Exception { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); - IndexReader reader = w.getReader(); - Engine.Searcher searcher = new Engine.Searcher("test", reader, - IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), reader)) { + IndexReader reader = w.getReader()) { + + final Engine.Searcher engineSearcher = new Engine.Searcher("test", w.getReader(), + IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), reader); SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); - DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, target, searcher, null, indexService, - indexShard, bigArrays, null, timeout, null); - context1.from(300); + ReaderContext readerWithoutScroll = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null, + indexService, indexShard, bigArrays, null, timeout, null); + contextWithoutScroll.from(300); + contextWithoutScroll.close(); // resultWindow greater than maxResultWindow and scrollContext is null - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> contextWithoutScroll.preProcess(false)); assertThat(exception.getMessage(), equalTo("Result window is too large, from + size must be less than or equal to:" + " [" + maxResultWindow + "] but was [310]. See the scroll api for a more efficient way to request large data sets. " + "This limit can be set by changing the [" + IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey() + "] index level setting.")); // resultWindow greater than maxResultWindow and scrollContext isn't null - context1.scrollContext(new ScrollContext()); + when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000)))); + ReaderContext readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, + indexShard, bigArrays, null, timeout, null); + context1.from(300); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Batch size is too large, size must be less than or equal to: [" + maxResultWindow + "] but was [310]. Scroll batch sizes cost as much memory as result windows so they are " @@ -141,7 +148,7 @@ public void testPreProcess() throws Exception { RescoreContext rescoreContext = mock(RescoreContext.class); when(rescoreContext.getWindowSize()).thenReturn(500); - context1.addRescore(rescoreContext); + readerContext.addRescore(rescoreContext); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Cannot use [sort] option in conjunction with [rescore].")); @@ -155,8 +162,10 @@ public void testPreProcess() throws Exception { + "to be rescored. This limit can be set by changing the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey() + "] index level setting.")); + readerContext.close(); + readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); // rescore is null but sliceBuilder is not null - DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, target, searcher, + DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); SliceBuilder sliceBuilder = mock(SliceBuilder.class); @@ -173,11 +182,12 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY); when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST); - DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, target, searcher, null, + DefaultSearchContext context3 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false); assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query())); + readerContext.close(); } } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 5691530dd32fe..c47ccefca3217 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -75,6 +76,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -167,8 +169,8 @@ protected Map, Object>> pluginScripts() { public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override - public void onNewContext(SearchContext context) { - if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + public void onNewReaderContext(ReaderContext readerContext) { + if ("throttled_threadpool_index".equals(readerContext.indexShard().shardId().getIndex().getName())) { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); @@ -322,7 +324,7 @@ public void onFailure(Exception e) { service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); if (useScroll) { - service.freeContext(searchPhaseResult.getRequestId()); + service.freeReaderContext(searchPhaseResult.getRequestId()); } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); @@ -331,7 +333,7 @@ public void onFailure(Exception e) { } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { - assertEquals("search context is already closed can't increment refCount current count [0]", ex.getMessage()); + assertEquals("reader_context is already closed can't increment refCount current count [0]", ex.getMessage()); } catch (SearchContextMissingException ex) { // that's fine } @@ -357,42 +359,34 @@ public void testTimeout() throws IOException { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final SearchContext contextWithDefaultTimeout = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); - try { + final ShardSearchRequest requestWithDefaultTimeout = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null, null); + + try (ReaderContext reader = createReaderContext(indexShard, requestWithDefaultTimeout); + SearchContext contextWithDefaultTimeout = service.createContext(reader, requestWithDefaultTimeout, null, randomBoolean())) { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); - } finally { - contextWithDefaultTimeout.decRef(); - service.freeContext(contextWithDefaultTimeout.id()); } final long seconds = randomIntBetween(6, 10); searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds))); - final SearchContext context = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); - try { + final ShardSearchRequest requestWithCustomTimeout = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null, null); + try (ReaderContext reader = createReaderContext(indexShard, requestWithCustomTimeout); + SearchContext context = service.createContext(reader, requestWithCustomTimeout, null, randomBoolean())) { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); - } finally { - context.decRef(); - service.freeContext(context.id()); } - } /** @@ -412,19 +406,20 @@ public void testMaxDocvalueFieldsSearch() throws IOException { for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) { searchSourceBuilder.docValueField("field" + i); } - try (SearchContext context = service.createContext( - new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)) - ) { + final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + try (ReaderContext reader = createReaderContext(indexShard, request); + SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertNotNull(context); - searchSourceBuilder.docValueField("one_field_too_much"); + } + searchSourceBuilder.docValueField("one_field_too_much"); + try (ReaderContext reader = createReaderContext(indexShard, request)) { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(reader, request, null, randomBoolean())); assertEquals( - "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " - + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", - ex.getMessage()); + "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", + ex.getMessage()); } } @@ -447,20 +442,22 @@ public void testMaxScriptFieldsSearch() throws IOException { searchSourceBuilder.scriptField("field" + i, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertNotNull(context); + final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + try(ReaderContext reader = createReaderContext(indexShard, request)) { + try (SearchContext context = service.createContext(reader, request, null, randomBoolean())) { + assertNotNull(context); + } searchSourceBuilder.scriptField("anotherScriptField", - new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(reader, request, null, randomBoolean())); assertEquals( - "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" - + (maxScriptFields + 1) - + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", - ex.getMessage()); + "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + + (maxScriptFields + 1) + + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", + ex.getMessage()); } } @@ -477,10 +474,12 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { searchSourceBuilder.scriptField("field" + 0, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, + final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertEquals(0, context.scriptFields().fields().size()); + 1.0f, -1, null, null); + try (ReaderContext reader = createReaderContext(indexShard, request); + SearchContext context = service.createContext(reader, request, null, randomBoolean())) { + assertEquals(0, context.scriptFields().fields().size()); } } @@ -514,7 +513,7 @@ public void testMaxOpenScrollContexts() throws RuntimeException { } ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()))); + () -> service.createAndPutReaderContext(new ShardScrollRequestTest(indexShard.shardId()))); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -626,10 +625,11 @@ public void testCanMatch() throws IOException { assertEquals(numWrapReader, numWrapInvocations.get()); // make sure that the wrapper is called when the context is actually created - service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1f, -1, null, null)).close(); - assertEquals(numWrapReader+1, numWrapInvocations.get()); + try (ReaderContext reader = createReaderContext(indexShard, new ShardSearchRequest(OriginalIndices.NONE, searchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)); + SearchContext context = service.createContext(reader, reader.request(), null, randomBoolean())) { + } + assertEquals(numWrapReader + 1, numWrapInvocations.get()); } public void testCanRewriteToMatchNone() { @@ -744,18 +744,23 @@ public void testCreateSearchContextFailure() throws IOException { final IndexService indexService = createIndex(index); final SearchService service = getInstanceFromNode(SearchService.class); final ShardId shardId = new ShardId(indexService.index(), 0); - - NullPointerException e = expectThrows(NullPointerException.class, - () -> service.createContext( - new ShardSearchRequest(shardId, 0, null) { - @Override - public SearchType searchType() { - // induce an artificial NPE - throw new NullPointerException("expected"); - } - } - )); - assertEquals("expected", e.getMessage()); + final ShardSearchRequest request = new ShardSearchRequest(shardId, 0, null) { + @Override + public SearchType searchType() { + // induce an artificial NPE + throw new NullPointerException("expected"); + } + }; + try (ReaderContext reader = createReaderContext(indexService.getShard(shardId.id()), request)) { + NullPointerException e = expectThrows(NullPointerException.class, + () -> service.createContext(reader, request, null, randomBoolean())); + assertEquals("expected", e.getMessage()); + } assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); } + + private ReaderContext createReaderContext(IndexShard shard, ShardSearchRequest request) { + Engine.Searcher searcher = shard.acquireSearcher("test"); + return new ReaderContext(randomNonNegativeLong(), shard, searcher, request); + } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 4c33177e268f9..b2d4314b3a0a5 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -316,13 +316,12 @@ public void testInOrderScrollOptimization() throws Exception { } w.close(); IndexReader reader = DirectoryReader.open(dir); - TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader)); - context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); scrollContext.lastEmittedDoc = null; scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; - context.scrollContext(scrollContext); context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -545,13 +544,12 @@ public void testIndexSortScrollOptimization() throws Exception { // search sort is a prefix of the index sort searchSortAndFormats.add(new SortAndFormats(new Sort(indexSort.getSort()[0]), new DocValueFormat[]{DocValueFormat.RAW})); for (SortAndFormats searchSortAndFormat : searchSortAndFormats) { - TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader)); - context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); scrollContext.lastEmittedDoc = null; scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; - context.scrollContext(scrollContext); context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index c1ebb1213495b..554f47814a8ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -26,7 +26,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; @@ -39,11 +39,11 @@ public class MockSearchService extends SearchService { */ public static class TestPlugin extends Plugin {} - private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); + private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); /** Throw an {@link AssertionError} if there are still in-flight contexts. */ public static void assertNoInFlightContext() { - final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); + final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); if (copy.isEmpty() == false) { throw new AssertionError( "There are still [" + copy.size() @@ -55,14 +55,14 @@ public static void assertNoInFlightContext() { /** * Add an active search context to the list of tracked contexts. Package private for testing. */ - static void addActiveContext(SearchContext context) { + static void addActiveContext(ReaderContext context) { ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(context.toString())); } /** * Clear an active search context from the list of tracked contexts. Package private for testing. */ - static void removeActiveContext(SearchContext context) { + static void removeActiveContext(ReaderContext context) { ACTIVE_SEARCH_CONTEXTS.remove(context); } @@ -73,14 +73,14 @@ public MockSearchService(ClusterService clusterService, } @Override - protected void putContext(SearchContext context) { + protected void putReaderContext(ReaderContext context) { addActiveContext(context); - super.putContext(context); + super.putReaderContext(context); } @Override - protected SearchContext removeContext(long id) { - final SearchContext removed = super.removeContext(id); + protected ReaderContext removeReaderContext(long id) { + final ReaderContext removed = super.removeReaderContext(id); if (removed != null) { removeActiveContext(removed); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 4edb9adacd13b..fc9a8fe7efa62 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -110,12 +110,18 @@ public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexSh } public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard, ContextIndexSearcher searcher) { + this(queryShardContext, indexShard, searcher, null); + } + + public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard, + ContextIndexSearcher searcher, ScrollContext scrollContext) { this.bigArrays = null; this.indexService = null; this.fixedBitSetFilterCache = null; this.indexShard = indexShard; this.queryShardContext = queryShardContext; this.searcher = searcher; + this.scrollContext = scrollContext; } public void setSearcher(ContextIndexSearcher searcher) { @@ -166,22 +172,11 @@ public float queryBoost() { return 0; } - @Override - public long getOriginNanoTime() { - return originNanoTime; - } - @Override public ScrollContext scrollContext() { return scrollContext; } - @Override - public SearchContext scrollContext(ScrollContext scrollContext) { - this.scrollContext = scrollContext; - return this; - } - @Override public SearchContextAggregations aggregations() { return aggregations; @@ -226,10 +221,6 @@ public List rescore() { return Collections.emptyList(); } - @Override - public void addRescore(RescoreContext rescore) { - } - @Override public boolean hasScriptFields() { return false; @@ -536,24 +527,6 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return null; } - @Override - public void accessed(long accessTime) { - } - - @Override - public long lastAccessTime() { - return 0; - } - - @Override - public long keepAlive() { - return 0; - } - - @Override - public void keepAlive(long keepAlive) { - } - @Override public DfsSearchResult dfsResult() { return null; diff --git a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java index 8a8842487f14a..b49f1fb0acaf5 100644 --- a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java @@ -19,59 +19,25 @@ package org.elasticsearch.search; -import org.apache.lucene.search.Query; -import org.elasticsearch.Version; -import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.lucene.search.Queries; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.query.QueryShardContext; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.TestSearchContext; + +import static org.mockito.Mockito.mock; public class MockSearchServiceTests extends ESTestCase { - public static final IndexMetaData EMPTY_INDEX_METADATA = IndexMetaData.builder("") - .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) - .numberOfShards(1).numberOfReplicas(0).build(); public void testAssertNoInFlightContext() { - final long nowInMillis = randomNonNegativeLong(); - SearchContext s = new TestSearchContext(new QueryShardContext(0, - new IndexSettings(EMPTY_INDEX_METADATA, Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, null, null, null, null, null, - xContentRegistry(), writableRegistry(), null, null, () -> nowInMillis, null, null)) { - - @Override - public SearchShardTarget shardTarget() { - return new SearchShardTarget("node", new ShardId("idx", "ignored", 0), null, OriginalIndices.NONE); - } - - @Override - public SearchType searchType() { - return SearchType.DEFAULT; - } - - @Override - public Query query() { - return Queries.newMatchAllQuery(); - } - }; - MockSearchService.addActiveContext(s); + ReaderContext reader = mock(ReaderContext.class); + MockSearchService.addActiveContext(reader); try { Throwable e = expectThrows(AssertionError.class, () -> MockSearchService.assertNoInFlightContext()); assertEquals("There are still [1] in-flight contexts. The first one's creation site is listed as the cause of this exception.", e.getMessage()); e = e.getCause(); - // The next line with throw an exception if the date looks wrong - assertEquals("[node][idx][0] query=[*:*]", e.getMessage()); assertEquals(MockSearchService.class.getName(), e.getStackTrace()[0].getClassName()); assertEquals(MockSearchServiceTests.class.getName(), e.getStackTrace()[1].getClassName()); } finally { - MockSearchService.removeActiveContext(s); + MockSearchService.removeActiveContext(reader); } } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 694781c5ebec7..f35e45e760b0b 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -283,26 +284,28 @@ public void validateSearchContext(SearchContext context, TransportRequest transp throw new UncheckedIOException(e); } // also register a release resource in this case if we have multiple roundtrips like in DFS - registerRelease(context, lazyDirectoryReader); + context.addReleasable(() -> { + try { + lazyDirectoryReader.release(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, SearchContext.Lifetime.CONTEXT); } } - private void registerRelease(SearchContext context, LazyDirectoryReader lazyDirectoryReader) { - context.addReleasable(() -> { - try { - lazyDirectoryReader.release(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, SearchContext.Lifetime.PHASE); - } - @Override - public void onNewContext(SearchContext context) { - DirectoryReader dirReader = context.searcher().getDirectoryReader(); + public void onNewReaderContext(ReaderContext readerContext) { + DirectoryReader dirReader = readerContext.engineSearcher().getDirectoryReader(); LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); if (lazyDirectoryReader != null) { - registerRelease(context, lazyDirectoryReader); + readerContext.addOnClose(() -> { + try { + lazyDirectoryReader.release(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index 5e0c2945caadc..aef1652b42fc5 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -46,10 +46,9 @@ public SecuritySearchOperationListener(ThreadContext threadContext, XPackLicense * Adds the {@link Authentication} to the {@link ScrollContext} */ @Override - public void onNewScrollContext(SearchContext searchContext) { + public void onNewScrollContext(ScrollContext scrollContext) { if (licenseState.isAuthAllowed()) { - searchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, - Authentication.getAuthentication(threadContext)); + scrollContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, Authentication.getAuthentication(threadContext)); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 5b73d6d212fc5..05f0ed7f0c42f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -49,18 +49,19 @@ public void testUnlicensed() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); AuditTrailService auditTrailService = mock(AuditTrailService.class); SearchContext searchContext = mock(SearchContext.class); - when(searchContext.scrollContext()).thenReturn(new ScrollContext()); + ScrollContext scrollContext = new ScrollContext(); + when(searchContext.scrollContext()).thenReturn(scrollContext); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(threadContext, licenseState, auditTrailService); - listener.onNewScrollContext(searchContext); + listener.onNewScrollContext(scrollContext); listener.validateSearchContext(searchContext, Empty.INSTANCE); verify(licenseState, times(2)).isAuthAllowed(); verifyZeroInteractions(auditTrailService, searchContext); } public void testOnNewContextSetsAuthentication() throws Exception { - TestScrollSearchContext testSearchContext = new TestScrollSearchContext(); - testSearchContext.scrollContext(new ScrollContext()); + ScrollContext scrollContext = new ScrollContext(); + TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(2L)); testSearchContext.scrollContext().scroll = scroll; XPackLicenseState licenseState = mock(XPackLicenseState.class); @@ -71,7 +72,7 @@ public void testOnNewContextSetsAuthentication() throws Exception { authentication.writeToContext(threadContext); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(threadContext, licenseState, auditTrailService); - listener.onNewScrollContext(testSearchContext); + listener.onNewScrollContext(scrollContext); Authentication contextAuth = testSearchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); assertEquals(authentication, contextAuth); @@ -82,8 +83,8 @@ public void testOnNewContextSetsAuthentication() throws Exception { } public void testValidateSearchContext() throws Exception { - TestScrollSearchContext testSearchContext = new TestScrollSearchContext(); - testSearchContext.scrollContext(new ScrollContext()); + ScrollContext scrollContext = new ScrollContext(); + TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); testSearchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null)); testSearchContext.scrollContext().scroll = new Scroll(TimeValue.timeValueSeconds(2L)); @@ -235,24 +236,4 @@ public void testEnsuredAuthenticatedUserIsSame() { verify(auditTrail).accessDenied(eq(auditId), eq(runAsDiffType), eq(action), eq(request), authzInfoRoles(original.getUser().roles())); } - - static class TestScrollSearchContext extends TestSearchContext { - - private ScrollContext scrollContext; - - TestScrollSearchContext() { - super(null); - } - - @Override - public ScrollContext scrollContext() { - return scrollContext; - } - - @Override - public SearchContext scrollContext(ScrollContext scrollContext) { - this.scrollContext = scrollContext; - return this; - } - } } From b2d899bd9bf175fb44e17bd1a576748cc80e99ac Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Jan 2020 17:31:38 -0500 Subject: [PATCH 2/3] notify when releasing a search context --- .../index/shard/SearchOperationListener.java | 19 +++++++++++++++++ .../elasticsearch/search/SearchService.java | 1 + .../index/engine/FrozenEngine.java | 21 ++++++++++++------- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index 7c58fd96b8439..77e83edcdd18f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -116,6 +116,14 @@ default void onFreeScrollContext(ScrollContext scrollContext) {} */ default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {} + /** + * Executed when a search context was freed. The implementor can implement + * this method to release resources used by the search context. + */ + default void onFreeSearchContext(SearchContext context) { + + } + /** * A Composite listener that multiplexes calls to each of the listeners methods. */ @@ -250,5 +258,16 @@ public void validateSearchContext(SearchContext context, TransportRequest reques } ExceptionsHelper.reThrowIfNotNull(exception); } + + @Override + public void onFreeSearchContext(SearchContext context) { + for (SearchOperationListener listener : listeners) { + try { + listener.onFreeSearchContext(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFreeSearchContext listener [{}] failed", listener), e); + } + } + } } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1afc05eccb550..e0ea7d768711c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -566,6 +566,7 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque // Disable timeout while searching and mark the reader as accessed when releasing the context. context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis()), Lifetime.CONTEXT); reader.accessed(-1L); + context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context), Lifetime.CONTEXT); } catch (Exception e) { context.close(); throw e; diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index f35e45e760b0b..0530723044354 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -283,14 +283,19 @@ public void validateSearchContext(SearchContext context, TransportRequest transp } catch (IOException e) { throw new UncheckedIOException(e); } - // also register a release resource in this case if we have multiple roundtrips like in DFS - context.addReleasable(() -> { - try { - lazyDirectoryReader.release(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, SearchContext.Lifetime.CONTEXT); + } + } + + @Override + public void onFreeSearchContext(SearchContext context) { + DirectoryReader dirReader = context.searcher().getDirectoryReader(); + LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); + if (lazyDirectoryReader != null) { + try { + lazyDirectoryReader.release(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } From 34ed39bcae42806fb5c016884f1a56ad504c9af6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 27 Jan 2020 09:32:25 -0500 Subject: [PATCH 3/3] Remove lifetime --- .../ParentChildInnerHitContextBuilder.java | 8 +-- .../index/query/NestedQueryBuilder.java | 6 +- .../elasticsearch/search/SearchService.java | 5 +- .../search/aggregations/AggregationPhase.java | 2 - .../search/aggregations/AggregatorBase.java | 3 +- .../aggregations/AggregatorFactory.java | 3 +- .../fetch/subphase/ExplainFetchSubPhase.java | 2 - .../subphase/MatchedQueriesFetchSubPhase.java | 3 - .../search/internal/SearchContext.java | 60 ++++--------------- .../search/query/QueryPhase.java | 4 -- .../search/DefaultSearchContextTests.java | 1 - .../aggregations/AggregatorTestCase.java | 2 +- 12 files changed, 19 insertions(+), 80 deletions(-) diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java index 551e57abb62a1..626ecab0f4a34 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java @@ -158,12 +158,8 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException { topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE); maxScoreCollector = new MaxScoreCollector(); } - try { - for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { - intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); - } - } finally { - clearReleasables(Lifetime.COLLECTION); + for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { + intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); } TopDocs topDocs = topDocsCollector.topDocs(from(), size()); float maxScore = Float.NaN; diff --git a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java index fecf5c8407e98..55ec170c1e02b 100644 --- a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java @@ -421,11 +421,7 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException { topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE); maxScoreCollector = new MaxScoreCollector(); } - try { - intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); - } finally { - clearReleasables(Lifetime.COLLECTION); - } + intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx); TopDocs td = topDocsCollector.topDocs(from(), size()); float maxScore = Float.NaN; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index e0ea7d768711c..2421baf5fb72c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -86,7 +86,6 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.profile.Profilers; import org.elasticsearch.search.query.QueryPhase; @@ -564,9 +563,9 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque contextScrollKeepAlive(reader, keepAlive); context.lowLevelCancellation(lowLevelCancellation); // Disable timeout while searching and mark the reader as accessed when releasing the context. - context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis()), Lifetime.CONTEXT); + context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis())); reader.accessed(-1L); - context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context), Lifetime.CONTEXT); + context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context)); } catch (Exception e) { context.close(); throw e; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index c150f99cfe196..fca5b14e34be3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -117,8 +117,6 @@ public void execute(SearchContext context) { context.searcher().search(query, collector); } catch (Exception e) { throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e); - } finally { - context.clearReleasables(SearchContext.Lifetime.COLLECTION); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 8788f97de0b2a..43e38c731bbe1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -26,7 +26,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.query.QueryPhaseExecutionException; import java.io.IOException; @@ -76,7 +75,7 @@ protected AggregatorBase(String name, AggregatorFactories factories, SearchConte this.breakerService = context.bigArrays().breakerService(); assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.subAggregators = factories.createSubAggregators(context, this); - context.addReleasable(this, Lifetime.CONTEXT); + context.addReleasable(this); final SearchShardTarget shardTarget = context.shardTarget(); // Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent preCollection call) collectableSubAggregators = new BucketCollector() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java index 743d8d730033a..5451285a379c8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java @@ -28,7 +28,6 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; import java.util.List; @@ -50,7 +49,7 @@ public static final class MultiBucketAggregatorWrapper extends Aggregator { this.parent = parent; this.factory = factory; this.first = first; - context.addReleasable(this, Lifetime.CONTEXT); + context.addReleasable(this); aggregators = bigArrays.newObjectArray(1); aggregators.set(0, first); collectors = bigArrays.newObjectArray(1); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java index 3a7d5c96b8179..345a4c5a0a7ad 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ExplainFetchSubPhase.java @@ -48,8 +48,6 @@ public void hitExecute(SearchContext context, HitContext hitContext) { } catch (IOException e) { throw new FetchPhaseExecutionException(context.shardTarget(), "Failed to explain doc [" + hitContext.hit().getId() + "]", e); - } finally { - context.clearReleasables(SearchContext.Lifetime.COLLECTION); } } } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java index c2f6980781dba..319bd40cfbd70 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/MatchedQueriesFetchSubPhase.java @@ -31,7 +31,6 @@ import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; import java.util.ArrayList; @@ -92,8 +91,6 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) { } } catch (IOException e) { throw ExceptionsHelper.convertToElastic(e); - } finally { - context.clearReleasables(Lifetime.COLLECTION); } } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 937ea85097d79..d0eddac1d86c6 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; @@ -59,10 +58,9 @@ import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestionSearchContext; -import java.util.ArrayList; -import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -77,7 +75,7 @@ public abstract class SearchContext implements Releasable { public static final int TRACK_TOTAL_HITS_DISABLED = -1; public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000; - private Map> clearables = null; + private final List releasables = new CopyOnWriteArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; @@ -93,10 +91,12 @@ protected SearchContext() { @Override public final void close() { - try { - clearReleasables(Lifetime.CONTEXT); - } finally { - doClose(); + if (closed.compareAndSet(false, true)) { + try { + Releasables.close(releasables); + } finally { + doClose(); + } } } @@ -313,38 +313,14 @@ public SearchLookup lookup() { */ public abstract Profilers getProfilers(); + /** - * Schedule the release of a resource. The time when {@link Releasable#close()} will be called on this object - * is function of the provided {@link Lifetime}. + * Adds a releasable that will be freed when this context is closed. */ - public void addReleasable(Releasable releasable, Lifetime lifetime) { - if (clearables == null) { - clearables = new EnumMap<>(Lifetime.class); - } - List releasables = clearables.get(lifetime); - if (releasables == null) { - releasables = new ArrayList<>(); - clearables.put(lifetime, releasables); - } + public void addReleasable(Releasable releasable) { releasables.add(releasable); } - public void clearReleasables(Lifetime lifetime) { - if (clearables != null) { - List>releasables = new ArrayList<>(); - for (Lifetime lc : Lifetime.values()) { - if (lc.compareTo(lifetime) > 0) { - break; - } - List remove = clearables.remove(lc); - if (remove != null) { - releasables.add(remove); - } - } - Releasables.close(Iterables.flatten(releasables)); - } - } - /** * @return true if the request contains only suggest */ @@ -369,20 +345,6 @@ public final boolean hasOnlySuggest() { /** Return a view of the additional query collectors that should be run for this context. */ public abstract Map, Collector> queryCollectors(); - /** - * The life time of an object that is used during search execution. - */ - public enum Lifetime { - /** - * This life time is for objects that only live during collection time. - */ - COLLECTION, - /** - * This life time is for objects that need to live until the search context they are attached to is destroyed. - */ - CONTEXT - } - public abstract QueryShardContext getQueryShardContext(); @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index ba5ae725cab6f..afcd7355f155f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -343,8 +343,6 @@ private static boolean searchWithCollector(SearchContext searchContext, ContextI throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); } queryResult.searchTimedOut(true); - } finally { - searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); } if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) { queryResult.terminatedEarly(false); @@ -399,8 +397,6 @@ private static boolean searchWithCollectorManager(SearchContext searchContext, C throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); } searchContext.queryResult().searchTimedOut(true); - } finally { - searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); } return false; // no rescoring when sorting by field } diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 62192f86cf63e..198daf0f41a11 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -63,7 +63,6 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 7c02fb2481f4c..705912da3a8f5 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -248,7 +248,7 @@ public boolean shouldCache(Query query) { * close their sub-aggregations. This is fairly similar to what the production code does. */ releasables.add((Releasable) invocation.getArguments()[0]); return null; - }).when(searchContext).addReleasable(anyObject(), anyObject()); + }).when(searchContext).addReleasable(anyObject()); return searchContext; }