Skip to content

Commit

Permalink
[SEARCH] Execute search reduce phase on the search threadpool
Browse files Browse the repository at this point in the history
Reduce Phases can be expensive and some of them like the aggregations
reduce phase might even execute a one-off call via an internal client
that might cause a deadlock due to execution on the network thread
that is needed to handle the one-off call. This commit dispatches
the reduce phase to the search threadpool to ensure we don't wait
for the current thread to be available.

Closes elastic#7623
  • Loading branch information
s1monw committed Sep 8, 2014
1 parent b28e56e commit 6dea95a
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 82 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
Expand Down Expand Up @@ -118,29 +119,33 @@ void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, in
}
}

void finishHim() {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
} finally {
//
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
}
}
});
} catch (EsRejectedExecutionException ex) {
listener.onFailure(ex);
}
}

void innerFinishHim() throws Exception {
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand Down Expand Up @@ -190,27 +191,37 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar
}
}

void finishHim() {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
}
});
} catch (EsRejectedExecutionException ex) {
try {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
} finally {
listener.onFailure(ex);
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
}

void innerFinishHim() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
Expand All @@ -35,8 +36,6 @@
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;

import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;

/**
Expand Down Expand Up @@ -74,25 +73,30 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ
@Override
protected void moveToSecondPhase() throws Exception {
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
}
}

private void innerFinishHim() throws IOException {
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
}
}
});
} catch (EsRejectedExecutionException ex) {
listener.onFailure(ex);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -23,13 +23,15 @@
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand Down Expand Up @@ -134,27 +136,36 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar
}
}

void finishHim() {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
}
});
} catch (EsRejectedExecutionException ex) {
try {
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
} finally {
listener.onFailure(ex);
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
}

void innerFinishHim() throws Exception {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
7 changes: 1 addition & 6 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -34,13 +33,9 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.SizeUnit;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void onFailure(Throwable e) {
for (ShardSearchFailure failure : e.shardFailures()) {
assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected"));
}
} else {
} else if ((unwrap instanceof EsRejectedExecutionException) == false) {
throw new AssertionError("unexpected failure", (Throwable) response);
}
}
Expand Down

0 comments on commit 6dea95a

Please sign in to comment.