diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 3a42abe08804e..dc520e2e2ff02 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -118,29 +119,33 @@ void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, in } } - void finishHim() { + private void finishHim() { try { - innerFinishHim(); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } finally { - // + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + boolean useScroll = !useSlowScroll && request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } catch (Throwable e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + listener.onFailure(failure); + } + } + }); + } catch (EsRejectedExecutionException ex) { + listener.onFailure(ex); } - } - void innerFinishHim() throws Exception { - boolean useScroll = !useSlowScroll && request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 8a38cbeadfd68..f3f1bbef2dfaf 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -190,27 +191,37 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar } } - void finishHim() { + private void finishHim() { try { - innerFinishHim(); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } catch (Throwable e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + listener.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + } + }); + } catch (EsRejectedExecutionException ex) { + try { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } finally { + listener.onFailure(ex); } - listener.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); } - } - void innerFinishHim() throws Exception { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index 7ddfeb0f2f524..b969deaec078e 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -35,8 +36,6 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; - import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; /** @@ -74,25 +73,30 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ @Override protected void moveToSecondPhase() throws Exception { try { - innerFinishHim(); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } - } - - private void innerFinishHim() throws IOException { - boolean useScroll = !useSlowScroll && request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), firstResults, null); + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + boolean useScroll = !useSlowScroll && request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } catch (Throwable e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + listener.onFailure(failure); + } + } + }); + } catch (EsRejectedExecutionException ex) { + listener.onFailure(ex); } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 92914667545f1..8d1ec947c071a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -30,6 +31,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -134,27 +136,36 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar } } - void finishHim() { + private void finishHim() { try { - innerFinishHim(); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } catch (Throwable e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + listener.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + } + }); + } catch (EsRejectedExecutionException ex) { + try { + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } finally { + listener.onFailure(ex); } - listener.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - } - - void innerFinishHim() throws Exception { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } } } diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 1936dfbc06fd2..01c87a5aaa048 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -34,13 +33,9 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; -import org.elasticsearch.common.unit.SizeUnit; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsAbortPolicy; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; +import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; diff --git a/src/test/java/org/elasticsearch/action/RejectionActionTests.java b/src/test/java/org/elasticsearch/action/RejectionActionTests.java index 313339b1515d5..6ad02df5eecf3 100644 --- a/src/test/java/org/elasticsearch/action/RejectionActionTests.java +++ b/src/test/java/org/elasticsearch/action/RejectionActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -101,7 +102,7 @@ public void onFailure(Throwable e) { for (ShardSearchFailure failure : e.shardFailures()) { assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected")); } - } else { + } else if ((unwrap instanceof EsRejectedExecutionException) == false) { throw new AssertionError("unexpected failure", (Throwable) response); } }