diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 0573d8e7febad..7795a4e437996 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -175,8 +175,8 @@ protected void doStart() throws ElasticsearchException { @Override protected void doStop() throws ElasticsearchException { - for (SearchContext context : activeContexts.values()) { - freeContext(context); + for (final SearchContext context : activeContexts.values()) { + freeContext(context.id()); } activeContexts.clear(); } @@ -187,7 +187,7 @@ protected void doClose() throws ElasticsearchException { } public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { contextProcessing(context); dfsPhase.execute(context); @@ -195,7 +195,7 @@ public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws Elasti return context.dfsResult(); } catch (Throwable e) { logger.trace("Dfs phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -203,7 +203,7 @@ public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws Elasti } public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { if (context.aggregations() != null) { throw new ElasticsearchIllegalArgumentException("aggregations are not supported with search_type=scan"); @@ -221,7 +221,7 @@ public QuerySearchResult executeScan(ShardSearchRequest request) throws Elastics return context.queryResult(); } catch (Throwable e) { logger.trace("Scan phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -229,7 +229,7 @@ public QuerySearchResult executeScan(ShardSearchRequest request) throws Elastics } public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { processScroll(request, context); @@ -249,7 +249,7 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); } catch (Throwable e) { logger.trace("Scan phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -257,7 +257,7 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ } public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); @@ -287,7 +287,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t } context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -295,7 +295,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t } public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); try { context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); @@ -308,7 +308,7 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req } catch (Throwable e) { context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -316,12 +316,12 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req } public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity())); } catch (Throwable e) { - freeContext(context); + freeContext(context.id()); cleanContext(context); throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } @@ -335,7 +335,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws El } catch (Throwable e) { context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -343,7 +343,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws El } public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); contextProcessing(context); try { context.indexShard().searchService().onPreQueryPhase(context); @@ -373,7 +373,7 @@ public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) thro return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -381,12 +381,12 @@ public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) thro } public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity())); } catch (Throwable e) { - freeContext(context); + freeContext(context.id()); cleanContext(context); throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } @@ -418,7 +418,7 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) thro return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -426,7 +426,7 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) thro } public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { processScroll(request, context); @@ -457,7 +457,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -465,7 +465,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques } public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { if (request.lastEmittedDoc() != null) { @@ -485,7 +485,7 @@ public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws El } catch (Throwable e) { context.indexShard().searchService().onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); - freeContext(context); // we just try to make sure this is freed - rethrow orig exception. + freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception. throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -511,7 +511,7 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws Elast return context; } finally { if (!success) { - freeContext(context); + freeContext(context.id()); } } } @@ -561,27 +561,22 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S } public boolean freeContext(long id) { - SearchContext context = activeContexts.remove(id); - if (context == null) { - return false; - } - context.indexShard().searchService().onFreeContext(context); - context.close(); - return true; - } - - private void freeContext(SearchContext context) { - SearchContext removed = activeContexts.remove(context.id()); - if (removed != null) { - removed.indexShard().searchService().onFreeContext(removed); + final SearchContext context = activeContexts.remove(id); + if (context != null) { + try { + context.indexShard().searchService().onFreeContext(context); + } finally { + context.close(); + } + return true; } - context.close(); + return false; } public void freeAllScrollContexts() { for (SearchContext searchContext : activeContexts.values()) { if (searchContext.scroll() != null) { - freeContext(searchContext); + freeContext(searchContext.id()); } } } @@ -969,7 +964,7 @@ public void run() { } finally { try { if (context != null) { - freeContext(context); + freeContext(context.id()); cleanContext(context); } } finally { @@ -1002,7 +997,7 @@ public void run() { } if ((time - lastAccessTime > context.keepAlive())) { logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive()); - freeContext(context); + freeContext(context.id()); } } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 8ab244c1000b2..9fa8623bf81be 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -145,7 +145,7 @@ public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequ public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener actionListener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - boolean freed = searchService.freeContext(contextId); + final boolean freed = searchService.freeContext(contextId); actionListener.onResponse(freed); } else { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); diff --git a/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/src/main/java/org/elasticsearch/search/internal/SearchContext.java index a89677801811d..71ea669a65905 100644 --- a/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -64,6 +64,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** */ @@ -87,12 +88,15 @@ public static SearchContext current() { } private Multimap clearables = null; + private final AtomicBoolean closed = new AtomicBoolean(false); public final void close() { - try { - clearReleasables(Lifetime.CONTEXT); - } finally { - doClose(); + if (closed.compareAndSet(false, true)) { // prevent double release + try { + clearReleasables(Lifetime.CONTEXT); + } finally { + doClose(); + } } }