Skip to content

Commit

Permalink
[CORE] Unify search context cleanup
Browse files Browse the repository at this point in the history
Today there are two different ways to cleanup search contexts which can
potentially lead to double releasing of a context. This commit unifies
the methods and prevents double closing.

Closes #7625
  • Loading branch information
s1monw committed Sep 8, 2014
1 parent c85b49b commit b969950
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 47 deletions.
79 changes: 37 additions & 42 deletions src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -179,8 +179,8 @@ protected void doStart() throws ElasticsearchException {

@Override
protected void doStop() throws ElasticsearchException {
for (SearchContext context : activeContexts.values()) {
freeContext(context);
for (final SearchContext context : activeContexts.values()) {
freeContext(context.id());
}
activeContexts.clear();
}
Expand All @@ -191,23 +191,23 @@ protected void doClose() throws ElasticsearchException {
}

public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
contextProcessing(context);
dfsPhase.execute(context);
contextProcessedSuccessfully(context);
return context.dfsResult();
} catch (Throwable e) {
logger.trace("Dfs phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
if (context.aggregations() != null) {
throw new ElasticsearchIllegalArgumentException("aggregations are not supported with search_type=scan");
Expand All @@ -225,15 +225,15 @@ public QuerySearchResult executeScan(ShardSearchRequest request) throws Elastics
return context.queryResult();
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
Expand All @@ -253,15 +253,15 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
Expand Down Expand Up @@ -291,15 +291,15 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t
}
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
Expand All @@ -312,20 +312,20 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
} catch (Throwable e) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
Expand All @@ -339,15 +339,15 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws El
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
contextProcessing(context);
try {
context.indexShard().searchService().onPreQueryPhase(context);
Expand Down Expand Up @@ -377,20 +377,20 @@ public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) thro
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
} catch (Throwable e) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
Expand Down Expand Up @@ -422,15 +422,15 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) thro
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
Expand Down Expand Up @@ -461,15 +461,15 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
if (request.lastEmittedDoc() != null) {
Expand All @@ -489,7 +489,7 @@ public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws El
} catch (Throwable e) {
context.indexShard().searchService().onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
freeContext(context); // we just try to make sure this is freed - rethrow orig exception.
freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception.
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand All @@ -515,7 +515,7 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws Elast
return context;
} finally {
if (!success) {
freeContext(context);
freeContext(context.id());
}
}
}
Expand Down Expand Up @@ -565,27 +565,22 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S
}

public boolean freeContext(long id) {
SearchContext context = activeContexts.remove(id);
if (context == null) {
return false;
}
context.indexShard().searchService().onFreeContext(context);
context.close();
return true;
}

private void freeContext(SearchContext context) {
SearchContext removed = activeContexts.remove(context.id());
if (removed != null) {
removed.indexShard().searchService().onFreeContext(removed);
final SearchContext context = activeContexts.remove(id);
if (context != null) {
try {
context.indexShard().searchService().onFreeContext(context);
} finally {
context.close();
}
return true;
}
context.close();
return false;
}

public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scroll() != null) {
freeContext(searchContext);
freeContext(searchContext.id());
}
}
}
Expand Down Expand Up @@ -973,7 +968,7 @@ public void run() {
} finally {
try {
if (context != null) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
}
} finally {
Expand Down Expand Up @@ -1006,7 +1001,7 @@ public void run() {
}
if ((time - lastAccessTime > context.keepAlive())) {
logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive());
freeContext(context);
freeContext(context.id());
}
}
}
Expand Down
Expand Up @@ -145,7 +145,7 @@ public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequ

public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
boolean freed = searchService.freeContext(contextId);
final boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(freed);
} else {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*/
Expand All @@ -90,12 +91,15 @@ public static SearchContext current() {
}

private Multimap<Lifetime, Releasable> clearables = null;
private final AtomicBoolean closed = new AtomicBoolean(false);

public final void close() {
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
if (closed.compareAndSet(false, true)) { // prevent double release
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
}
}
}

Expand Down

0 comments on commit b969950

Please sign in to comment.