diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 2d2b9213c63ff..ca68bb4008146 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; @@ -87,7 +88,7 @@ abstract class AbstractSearchAsyncAction exten private final SearchResponse.Clusters clusters; private final GroupShardsIterator toSkipShardsIts; - private final GroupShardsIterator shardsIts; + protected final GroupShardsIterator shardsIts; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final int maxConcurrentRequestsPerNode; @@ -381,6 +382,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e); } } + onShardGroupFailure(shardIndex, e); onPhaseDone(); } else { final ShardRouting nextShard = shardIt.nextOrNull(); @@ -389,7 +391,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, logger.trace(() -> new ParameterizedMessage( "{}: Failed to execute [{}] lastShard [{}]", shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); - if (!lastShard) { + if (lastShard == false) { performPhaseOnShard(shardIndex, shardIt, nextShard); } else { // no more shards active, add a failure @@ -400,10 +402,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); } } + onShardGroupFailure(shardIndex, e); } } } + /** + * Executed once for every {@link ShardId} that failed on all available shard routing. + * + * @param shardIndex the shard target that failed + * @param exc the final failure reason + */ + protected void onShardGroupFailure(int shardIndex, Exception exc) {} + /** * Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given * shard target. diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 6fb6a49e3f557..b4d52fa418e10 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.query.QuerySearchRequest; @@ -46,13 +47,15 @@ final class DfsQueryPhase extends SearchPhase { private final Function, SearchPhase> nextPhaseFactory; private final SearchPhaseContext context; private final SearchTransportService searchTransportService; + private final SearchProgressListener progressListener; DfsQueryPhase(AtomicArray dfsSearchResults, SearchPhaseController searchPhaseController, Function, SearchPhase> nextPhaseFactory, SearchPhaseContext context) { super("dfs_query"); - this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards()); + this.progressListener = context.getTask().getProgressListener(); + this.queryResult = searchPhaseController.newSearchPhaseResults(progressListener, context.getRequest(), context.getNumShards()); this.searchPhaseController = searchPhaseController; this.dfsSearchResults = dfsSearchResults; this.nextPhaseFactory = nextPhaseFactory; @@ -69,6 +72,8 @@ public void run() throws IOException { final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, resultList.size(), () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); + final SearchSourceBuilder sourceBuilder = context.getRequest().source(); + progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0); for (final DfsSearchResult dfsResult : resultList) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); @@ -92,6 +97,7 @@ public void onFailure(Exception exception) { try { context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), exception); + progressListener.notifyQueryFailure(shardIndex, exception); counter.onFailure(shardIndex, searchShardTarget, exception); } finally { // the query might not have been executed at all (for example because thread pool rejected diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index cc16e2d9f182a..41d216072e4b2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -49,6 +49,7 @@ final class FetchSearchPhase extends SearchPhase { private final SearchPhaseContext context; private final Logger logger; private final SearchPhaseResults resultConsumer; + private final SearchProgressListener progressListener; FetchSearchPhase(SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, @@ -72,6 +73,7 @@ final class FetchSearchPhase extends SearchPhase { this.context = context; this.logger = context.getLogger(); this.resultConsumer = resultConsumer; + this.progressListener = context.getTask().getProgressListener(); } @Override @@ -133,6 +135,7 @@ private void innerRun() throws IOException { // we do this as we go since it will free up resources and passing on the request on the // transport layer is cheap. releaseIrrelevantSearchContext(queryResult.queryResult()); + progressListener.notifyFetchResult(i); } // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); @@ -165,6 +168,7 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar @Override public void innerOnResponse(FetchSearchResult result) { try { + progressListener.notifyFetchResult(shardIndex); counter.onResult(result); } catch (Exception e) { context.onPhaseFailure(FetchSearchPhase.this, "", e); @@ -175,6 +179,7 @@ public void innerOnResponse(FetchSearchResult result) { public void onFailure(Exception e) { try { logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); + progressListener.notifyFetchFailure(shardIndex, e); counter.onFailure(shardIndex, shardTarget, e); } finally { // the search context might not be cleared on the node where the fetch was executed for example diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java index e9b5598556ff7..356da9e967623 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -23,7 +23,7 @@ import org.elasticsearch.search.SearchShardTarget; /** - * An base action listener that ensures shard target and shard index is set on all responses + * A base action listener that ensures shard target and shard index is set on all responses * received by this listener. */ abstract class SearchActionListener implements ActionListener { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 09811fd1d2ff1..b3562cc002b5b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -571,19 +571,23 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults= 2 if there is more than one expected result"); @@ -595,6 +599,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR throw new IllegalArgumentException("either aggs or top docs must be present"); } this.controller = controller; + this.progressListener = progressListener; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; @@ -610,6 +615,7 @@ public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); consumeInternal(queryResult); + progressListener.notifyQueryResult(queryResult.getShardIndex()); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { @@ -629,6 +635,10 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { } numReducePhases++; index = 1; + if (hasAggs) { + progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()), + topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); + } } final int i = index++; if (hasAggs) { @@ -652,8 +662,11 @@ private synchronized List getRemainingTopDocs() { @Override public ReducedQueryPhase reduce() { - return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, - numReducePhases, false, performFinalReduce); + ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(), + getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce); + progressListener.notifyReduce(progressListener.searchShards(results.asList()), + reducePhase.totalHits, reducePhase.aggregations); + return reducePhase; } /** @@ -678,7 +691,9 @@ private int resolveTrackTotalHits(SearchRequest request) { /** * Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally. */ - ArraySearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { + ArraySearchPhaseResults newSearchPhaseResults(SearchProgressListener listener, + SearchRequest request, + int numShards) { SearchSourceBuilder source = request.source(); boolean isScrollRequest = request.scroll() != null; final boolean hasAggs = source != null && source.aggregations() != null; @@ -688,14 +703,24 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchRequest r // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, + return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, trackTotalHitsUpTo, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { + @Override + void consumeResult(SearchPhaseResult result) { + super.consumeResult(result); + listener.notifyQueryResult(result.queryResult().getShardIndex()); + } + @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); + List resultList = results.asList(); + final ReducedQueryPhase reducePhase = + reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); + listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations); + return reducePhase; } }; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java new file mode 100644 index 0000000000000..508b059c95952 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressActionListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; + +/** + * An {@link ActionListener} for search requests that allows to track progress of the {@link SearchAction}. + * See {@link SearchProgressListener}. + */ +public abstract class SearchProgressActionListener extends SearchProgressListener implements ActionListener { +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java new file mode 100644 index 0000000000000..504f4aa0d6579 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -0,0 +1,180 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * A listener that allows to track progress of the {@link SearchAction}. + */ +abstract class SearchProgressListener { + private static final Logger logger = LogManager.getLogger(SearchProgressListener.class); + + public static final SearchProgressListener NOOP = new SearchProgressListener() {}; + + private List shards; + + /** + * Executed when shards are ready to be queried. + * + * @param shards The list of shards to query. + * @param fetchPhase true if the search needs a fetch phase, false otherwise. + **/ + public void onListShards(List shards, boolean fetchPhase) {} + + /** + * Executed when a shard returns a query result. + * + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}. + */ + public void onQueryResult(int shardIndex) {} + + /** + * Executed when a shard reports a query failure. + * + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. + * @param exc The cause of the failure. + */ + public void onQueryFailure(int shardIndex, Exception exc) {} + + /** + * Executed when a partial reduce is created. The number of partial reduce can be controlled via + * {@link SearchRequest#setBatchedReduceSize(int)}. + * + * @param shards The list of shards that are part of this reduce. + * @param totalHits The total number of hits in this reduce. + * @param aggs The partial result for aggregations. + * @param version The version number for this reduce. + */ + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) {} + + /** + * Executed once when the final reduce is created. + * + * @param shards The list of shards that are part of this reduce. + * @param totalHits The total number of hits in this reduce. + * @param aggs The final result for aggregations. + */ + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) {} + + /** + * Executed when a shard returns a fetch result. + * + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. + */ + public void onFetchResult(int shardIndex) {} + + /** + * Executed when a shard reports a fetch failure. + * + * @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}. + * @param exc The cause of the failure. + */ + public void onFetchFailure(int shardIndex, Exception exc) {} + + final void notifyListShards(List shards, boolean fetchPhase) { + this.shards = shards; + try { + onListShards(shards, fetchPhase); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e); + } + } + + final void notifyQueryResult(int shardIndex) { + try { + onQueryResult(shardIndex); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query result", + shards.get(shardIndex)), e); + } + } + + final void notifyQueryFailure(int shardIndex, Exception exc) { + try { + onQueryFailure(shardIndex, exc); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure", + shards.get(shardIndex)), e); + } + } + + final void notifyPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + try { + onPartialReduce(shards, totalHits, aggs, version); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e); + } + } + + final void notifyReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + try { + onReduce(shards, totalHits, aggs); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e); + } + } + + final void notifyFetchResult(int shardIndex) { + try { + onFetchResult(shardIndex); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on fetch result", + shards.get(shardIndex)), e); + } + } + + final void notifyFetchFailure(int shardIndex, Exception exc) { + try { + onFetchFailure(shardIndex, exc); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on fetch failure", + shards.get(shardIndex)), e); + } + } + + final List searchShards(List results) { + List lst = results.stream() + .filter(Objects::nonNull) + .map(SearchPhaseResult::getSearchShardTarget) + .map(e -> new SearchShard(e.getClusterAlias(), e.getShardId())) + .collect(Collectors.toList()); + return Collections.unmodifiableList(lst); + } + + final List searchShards(GroupShardsIterator its) { + List lst = StreamSupport.stream(its.spliterator(), false) + .map(e -> new SearchShard(e.getClusterAlias(), e.shardId())) + .collect(Collectors.toList()); + return Collections.unmodifiableList(lst); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index bbd84011de00b..d5060b728347d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; @@ -35,6 +36,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; + private final SearchProgressListener progressListener; SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, @@ -45,8 +47,14 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShard.java b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java new file mode 100644 index 0000000000000..16459d81885ce --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShard.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Comparator; +import java.util.Objects; + +/** + * A class that encapsulates the {@link ShardId} and the cluster alias + * of a shard used during the search action. + */ +public class SearchShard implements Comparable { + @Nullable + private final String clusterAlias; + private final ShardId shardId; + + SearchShard(@Nullable String clusterAlias, ShardId shardId) { + this.clusterAlias = clusterAlias; + this.shardId = shardId; + } + + /** + * Return the cluster alias if the shard is on a remote cluster and null + * otherwise (local). + */ + @Nullable + public String getClusterAlias() { + return clusterAlias; + } + + /** + * Return the {@link ShardId} of this shard. + */ + @Nullable + public ShardId getShardId() { + return shardId; + } + + @Override + public int compareTo(SearchShard o) { + int cmp = Objects.compare(clusterAlias, o.clusterAlias, Comparator.nullsFirst(Comparator.naturalOrder())); + return cmp != 0 ? cmp : shardId.compareTo(o.shardId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchShard that = (SearchShard) o; + return Objects.equals(clusterAlias, that.clusterAlias) + && shardId.equals(that.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(clusterAlias, shardId); + } + + @Override + public String toString() { + return "SearchShard{" + + "clusterAlias='" + clusterAlias + '\'' + + ", shardId=" + shardId + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java new file mode 100644 index 0000000000000..4719c1fda9d53 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; + +import java.util.Map; + +/** + * Task storing information about a currently running search shard request. + * See {@link ShardSearchRequest}, {@link ShardFetchSearchRequest}, ... + */ +public class SearchShardTask extends CancellableTask { + + public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index 699448909a2b5..97247e443bb64 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -25,17 +25,31 @@ import java.util.Map; /** - * Task storing information about a currently running search request. + * Task storing information about a currently running {@link SearchRequest}. */ public class SearchTask extends CancellableTask { + private SearchProgressListener progressListener = SearchProgressListener.NOOP; public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { super(id, type, action, description, parentTaskId, headers); } + /** + * Attach a {@link SearchProgressListener} to this task. + */ + public void setProgressListener(SearchProgressListener progressListener) { + this.progressListener = progressListener; + } + + /** + * Return the {@link SearchProgressListener} attached to this task. + */ + public SearchProgressListener getProgressListener() { + return progressListener; + } + @Override public boolean shouldCancelChildrenOnCancellation() { return true; } - } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4b66ed885db20..822557696f7e5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -307,7 +307,7 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> { - searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener() { + searchService.executeDfsPhase(request, (SearchShardTask) task, new ActionListener() { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { try { @@ -331,44 +331,44 @@ public void onFailure(Exception e) { transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( - channel, QUERY_ACTION_NAME, request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request)); }); TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, - request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, - request)); + searchService.executeQueryPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, - QUERY_FETCH_SCROLL_ACTION_NAME, request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, QUERY_FETCH_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, - FETCH_ID_SCROLL_ACTION_NAME, request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, FETCH_ID_SCROLL_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new, (request, channel, task) -> { - searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, - request)); + searchService.executeFetchPhase(request, (SearchShardTask) task, + new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index bca785102c9c5..74d3e783adbfa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -474,89 +473,10 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); - Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - - if (shouldSplitIndices(searchRequest)) { - //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. - //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other - //indices (possibly slower) being searched at the same time. - List writeIndicesList = new ArrayList<>(); - List readOnlyIndicesList = new ArrayList<>(); - splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); - String[] writeIndices = writeIndicesList.toArray(new String[0]); - String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); - - if (readOnlyIndices.length == 0) { - executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) { - executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else { - //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so - //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices. - CountDown countDown = new CountDown(2); - AtomicReference exceptions = new AtomicReference<>(); - SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, - searchService::createReduceContext); - CountDownActionListener countDownActionListener = - new CountDownActionListener(countDown, exceptions, listener) { - @Override - void innerOnResponse(SearchResponse searchResponse) { - searchResponseMerger.add(searchResponse); - } - - @Override - SearchResponse createFinalResponse() { - return searchResponseMerger.getMergedResponse(clusters); - } - }; - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - } - } else { - String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new); - executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } - } - - static boolean shouldSplitIndices(SearchRequest searchRequest) { - return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH - && (searchRequest.source() == null || searchRequest.source().size() != 0); - } - - static void splitIndices(Index[] indices, ClusterState clusterState, List writeIndices, List readOnlyIndices) { - for (Index index : indices) { - ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName()); - if (writeBlock == null) { - writeIndices.add(index.getName()); - } else { - readOnlyIndices.add(index.getName()); - } + String[] concreteIndices = new String[indices.length]; + for (int i = 0; i < indices.length; i++) { + concreteIndices[i] = indices[i].getName(); } - } - - private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, - OriginalIndices localIndices, String[] concreteIndices, Map> routingMap, - Map aliasFilter, Map concreteIndexBoosts, - List remoteShardIterators, BiFunction remoteConnections, - ClusterState clusterState, ActionListener listener, SearchResponse.Clusters clusters) { - Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); @@ -565,6 +485,8 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea failIfOverShardCountLimit(clusterService, shardIterators.size()); + Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); + // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard @@ -577,9 +499,11 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea if (searchRequest.isSuggestOnly()) { // disable request cache if we have only suggest searchRequest.requestCache(false); - if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { - // convert to Q_T_F if we have only suggest - searchRequest.searchType(QUERY_THEN_FETCH); + switch (searchRequest.searchType()) { + case DFS_QUERY_THEN_FETCH: + // convert to Q_T_F if we have only suggest + searchRequest.searchType(QUERY_THEN_FETCH); + break; } } @@ -688,16 +612,22 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int } } - abstract static class CountDownActionListener implements ActionListener { + abstract static class CCSActionListener implements ActionListener { + private final String clusterAlias; + private final boolean skipUnavailable; private final CountDown countDown; + private final AtomicInteger skippedClusters; private final AtomicReference exceptions; - private final ActionListener delegateListener; + private final ActionListener originalListener; - CountDownActionListener(CountDown countDown, AtomicReference exceptions, - ActionListener delegateListener) { + CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, + AtomicReference exceptions, ActionListener originalListener) { + this.clusterAlias = clusterAlias; + this.skipUnavailable = skipUnavailable; this.countDown = countDown; + this.skippedClusters = skippedClusters; this.exceptions = exceptions; - this.delegateListener = delegateListener; + this.originalListener = originalListener; } @Override @@ -708,7 +638,26 @@ public final void onResponse(Response response) { abstract void innerOnResponse(Response response); - final void maybeFinish() { + @Override + public final void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + Exception exception = e; + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { + exception = wrapRemoteClusterFailure(clusterAlias, e); + } + if (exceptions.compareAndSet(null, exception) == false) { + exceptions.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { if (countDown.countDown()) { Exception exception = exceptions.get(); if (exception == null) { @@ -716,56 +665,17 @@ final void maybeFinish() { try { response = createFinalResponse(); } catch(Exception e) { - delegateListener.onFailure(e); + originalListener.onFailure(e); return; } - delegateListener.onResponse(response); + originalListener.onResponse(response); } else { - delegateListener.onFailure(exceptions.get()); + originalListener.onFailure(exceptions.get()); } } } abstract FinalResponse createFinalResponse(); - - @Override - public void onFailure(Exception e) { - if (exceptions.compareAndSet(null, e) == false) { - exceptions.accumulateAndGet(e, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - maybeFinish(); - } - } - - abstract static class CCSActionListener extends CountDownActionListener { - private final String clusterAlias; - private final boolean skipUnavailable; - private final AtomicInteger skippedClusters; - - CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, - AtomicReference exceptions, ActionListener originalListener) { - super(countDown, exceptions, originalListener); - this.clusterAlias = clusterAlias; - this.skipUnavailable = skipUnavailable; - this.skippedClusters = skippedClusters; - } - - @Override - public final void onFailure(Exception e) { - if (skipUnavailable) { - skippedClusters.incrementAndGet(); - maybeFinish(); - } else { - Exception exception = e; - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { - exception = wrapRemoteClusterFailure(clusterAlias, e); - } - super.onFailure(exception); - } - } } private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) { diff --git a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java index a500df5c643f4..abba923f72a41 100644 --- a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -32,7 +32,7 @@ public abstract class TransportAction { - protected final String actionName; + public final String actionName; private final ActionFilter[] filters; protected final TaskManager taskManager; /** diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index 44f65f0936541..d90cc39fa4e8a 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -23,6 +23,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchProgressActionListener; +import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; @@ -30,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; @@ -42,6 +49,9 @@ public class NodeClient extends AbstractClient { private Map actions; + + private TaskManager taskManager; + /** * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by * {@link #executeLocally(ActionType, ActionRequest, TaskListener)}. @@ -53,9 +63,10 @@ public NodeClient(Settings settings, ThreadPool threadPool) { super(settings, threadPool); } - public void initialize(Map actions, Supplier localNodeId, + public void initialize(Map actions, TaskManager taskManager, Supplier localNodeId, RemoteClusterService remoteClusterService) { this.actions = actions; + this.taskManager = taskManager; this.localNodeId = localNodeId; this.remoteClusterService = remoteClusterService; } @@ -93,6 +104,38 @@ > Task executeLocally(ActionType action, Request request, TaskListener return transportAction(action).execute(request, listener); } + /** + * Execute a {@link SearchRequest} locally and track the progress of the request through + * a {@link SearchProgressActionListener}. + */ + public SearchTask executeSearchLocally(SearchRequest request, SearchProgressActionListener listener) { + // we cannot track the progress if remote cluster requests are splitted. + request.setCcsMinimizeRoundtrips(false); + TransportSearchAction action = (TransportSearchAction) actions.get(SearchAction.INSTANCE); + SearchTask task = (SearchTask) taskManager.register("transport", action.actionName, request); + task.setProgressListener(listener); + action.execute(task, request, new ActionListener() { + @Override + public void onResponse(SearchResponse response) { + try { + taskManager.unregister(task); + } finally { + listener.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + try { + taskManager.unregister(task); + } finally { + listener.onFailure(e); + } + } + }); + return task; + } + /** * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by * {@link #executeLocally(ActionType, ActionRequest, TaskListener)}. diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index dfd0b80cd82e7..9f53069640f1d 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -595,7 +595,7 @@ protected Node( resourcesToClose.addAll(pluginLifecycleComponents); resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); - client.initialize(injector.getInstance(new Key>() {}), + client.initialize(injector.getInstance(new Key>() {}), transportService.getTaskManager(), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); logger.debug("initializing HTTP handlers ..."); diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index ebbbe5a93ec6f..289a001c8666e 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -25,7 +25,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -123,10 +123,9 @@ final class DefaultSearchContext extends SearchContext { private boolean lowLevelCancellation; // filter for sliced scroll private SliceBuilder sliceBuilder; - private SearchTask task; + private SearchShardTask task; private final Version minNodeVersion; - /** * The original query as sent by the user without the types and aliases * applied. Putting things in here leaks them into highlighting so don't add @@ -828,12 +827,12 @@ public void setProfilers(Profilers profilers) { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { this.task = task; } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return task; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 230b9bdc431ea..b175149a560c3 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -28,7 +28,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; @@ -304,11 +304,11 @@ protected void doClose() { keepAliveReaper.cancel(); } - public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { + public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); } - private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); try { @@ -339,7 +339,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } - public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } @@ -347,7 +347,7 @@ private void runAsync(long id, Supplier executable, ActionListener lis getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { final SearchContext context = createAndPutContext(request); context.incRef(); try { @@ -395,7 +395,9 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, long aft return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } - public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(InternalScrollSearchRequest request, + SearchShardTask task, + ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.incRef(); @@ -417,7 +419,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask ta }, listener); } - public void executeQueryPhase(QuerySearchRequest request, SearchTask task, ActionListener listener) { + public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.setTask(task); @@ -467,7 +469,7 @@ private Executor getExecutor(IndexShard indexShard) { return threadPool.executor(indexShard.indexSettings().isSearchThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH); } - public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, + public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); @@ -490,7 +492,7 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta }, listener); } - public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { + public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.incRef(); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index cb2bc99370bca..2e7c59b329874 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -22,7 +22,7 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -112,7 +112,7 @@ public ScoreDoc lastEmittedDoc() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 10eb90afc04fe..8c04954a4efcf 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -535,12 +535,12 @@ public QueryShardContext getQueryShardContext() { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { in.setTask(task); } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return in.getTask(); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index 9d7ba557cc260..fe8d173711c69 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -20,7 +20,7 @@ package org.elasticsearch.search.internal; import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.Scroll; @@ -73,7 +73,7 @@ public InternalScrollSearchRequest scroll(Scroll scroll) { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index fba80d5f3c6e0..c0477ece69aa0 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -93,9 +93,9 @@ protected SearchContext() { super("search_context"); } - public abstract void setTask(SearchTask task); + public abstract void setTask(SearchShardTask task); - public abstract SearchTask getTask(); + public abstract SearchShardTask getTask(); public abstract boolean isCancelled(); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 17a2e2cb76453..c827a750e71f6 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -307,7 +307,7 @@ public String getClusterAlias() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 7f3a7a5b1b513..a715d9494ca5d 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -37,7 +37,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; @@ -224,7 +224,7 @@ static boolean execute(SearchContext searchContext, final Runnable cancellationRunnable; if (searchContext.lowLevelCancellation()) { - SearchTask task = searchContext.getTask(); + SearchShardTask task = searchContext.getTask(); cancellationRunnable = () -> { if (task.isCancelled()) throw new TaskCancelledException("cancelled"); }; } else { cancellationRunnable = null; diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index e458603310ce4..d3919ec3aba48 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -21,7 +21,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -86,7 +86,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); } public String getDescription() { diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 86d7a7ff7b5dc..5eec29dbf8039 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -42,17 +42,21 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.search.SearchProgressListener.NOOP; + public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 1); boolean hasHits = randomBoolean(); final int numHits; if (hasHits) { QuerySearchResult queryResult = new QuerySearchResult(); + queryResult.setSearchShardTarget(new SearchShardTarget("node0", + new ShardId("index", "index", 0), null, OriginalIndices.NONE)); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 1.0F), new DocValueFormat[0]); queryResult.size(1); @@ -89,7 +93,7 @@ public void testFetchTwoDocument() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -147,7 +151,7 @@ public void testFailFetchOneDoc() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -208,7 +212,8 @@ public void testFetchDocsConcurrently() throws InterruptedException { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); - ArraySearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits); + ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, + mockSearchPhaseContext.getRequest(), numHits); for (int i = 0; i < numHits; i++) { QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -265,7 +270,7 @@ public void testExceptionFailsPhase() { SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); @@ -321,7 +326,7 @@ public void testCleanupIrrelevantContexts() { // contexts that are not fetched s SearchPhaseController controller = new SearchPhaseController( (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); ArraySearchPhaseResults results = - controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); + controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = 1; QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 5ec4d18457ee5..05b7633571ba1 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -68,12 +68,15 @@ import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.action.search.SearchProgressListener.NOOP; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -339,7 +342,7 @@ public void testConsumer() { SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3); assertEquals(0, reductions.size()); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); @@ -402,7 +405,7 @@ public void testConsumerConcurrently() throws InterruptedException { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); Thread[] threads = new Thread[expectedNumResults]; for (int i = 0; i < expectedNumResults; i++) { @@ -449,7 +452,7 @@ public void testConsumerOnlyAggs() { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); for (int i = 0; i < expectedNumResults; i++) { int number = randomIntBetween(1, 1000); @@ -487,7 +490,7 @@ public void testConsumerOnlyHits() { } request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); for (int i = 0; i < expectedNumResults; i++) { int number = randomIntBetween(1, 1000); @@ -540,7 +543,7 @@ public void testNewSearchPhaseResults() { } request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer - = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + = searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); @@ -556,7 +559,7 @@ public void testReduceTopNWithFromOffset() { request.source(new SearchSourceBuilder().size(5).from(5)); request.setBatchedReduceSize(randomIntBetween(2, 4)); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, 4); + searchPhaseController.newSearchPhaseResults(NOOP, request, 4); int score = 100; for (int i = 0; i < 4; i++) { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), @@ -592,7 +595,7 @@ public void testConsumerSortByField() { int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); AtomicInteger max = new AtomicInteger(); SortField[] sortFields = {new SortField("field", SortField.Type.INT, true)}; DocValueFormat[] docValueFormats = {DocValueFormat.RAW}; @@ -628,7 +631,7 @@ public void testConsumerFieldCollapsing() { int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); SortField[] sortFields = {new SortField("field", SortField.Type.STRING)}; BytesRef a = new BytesRef("a"); BytesRef b = new BytesRef("b"); @@ -667,7 +670,7 @@ public void testConsumerSuggestions() { SearchRequest request = randomSearchRequest(); request.setBatchedReduceSize(bufferSize); ArraySearchPhaseResults consumer = - searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + searchPhaseController.newSearchPhaseResults(NOOP, request, expectedNumResults); int maxScoreTerm = -1; int maxScorePhrase = -1; int maxScoreCompletion = -1; @@ -752,4 +755,89 @@ public void testConsumerSuggestions() { assertNull(reduce.sortedTopDocs.collapseField); assertNull(reduce.sortedTopDocs.collapseValues); } + + public void testProgressListener() throws InterruptedException { + int expectedNumResults = randomIntBetween(10, 100); + for (int bufferSize : new int[] {expectedNumResults, expectedNumResults/2, expectedNumResults/4, 2}) { + SearchRequest request = randomSearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); + request.setBatchedReduceSize(bufferSize); + AtomicInteger numQueryResultListener = new AtomicInteger(); + AtomicInteger numQueryFailureListener = new AtomicInteger(); + AtomicInteger numReduceListener = new AtomicInteger(); + AtomicReference finalAggsListener = new AtomicReference<>(); + AtomicReference totalHitsListener = new AtomicReference<>(); + SearchProgressListener progressListener = new SearchProgressListener() { + @Override + public void onQueryResult(int shardIndex) { + assertThat(shardIndex, lessThan(expectedNumResults)); + numQueryResultListener.incrementAndGet(); + } + + @Override + public void onQueryFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(expectedNumResults)); + numQueryFailureListener.incrementAndGet(); + } + + @Override + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + assertEquals(numReduceListener.incrementAndGet(), version); + } + + @Override + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + totalHitsListener.set(totalHits); + finalAggsListener.set(aggs); + numReduceListener.incrementAndGet(); + } + }; + ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(progressListener, request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + Thread[] threads = new Thread[expectedNumResults]; + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + threads[i] = new Thread(() -> { + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id), + null, OriginalIndices.NONE)); + result.topDocs(new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[]{new ScoreDoc(0, number)}), number), + new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + }); + threads[i].start(); + } + for (int i = 0; i < expectedNumResults; i++) { + threads[i].join(); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits.value); + assertEquals(max.get(), reduce.sortedTopDocs.scoreDocs[0].score, 0.0f); + assertFalse(reduce.sortedTopDocs.isSortedByField); + assertNull(reduce.sortedTopDocs.sortFields); + assertNull(reduce.sortedTopDocs.collapseField); + assertNull(reduce.sortedTopDocs.collapseValues); + + assertEquals(reduce.aggregations, finalAggsListener.get()); + assertEquals(reduce.totalHits, totalHitsListener.get()); + + assertEquals(expectedNumResults, numQueryResultListener.get()); + assertEquals(0, numQueryFailureListener.get()); + assertEquals(numReduceListener.get(), reduce.numReducePhases); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java new file mode 100644 index 0000000000000..f33701ffe9b92 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java @@ -0,0 +1,222 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class SearchProgressActionListenerIT extends ESSingleNodeTestCase { + private List shards; + + public void setUp() throws Exception { + super.setUp(); + shards = createRandomIndices(client()); + } + + public void testSearchProgressSimple() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source(new SearchSourceBuilder().size(0)); + testCase((NodeClient) client(), request, shards, false); + } + } + + public void testSearchProgressWithHits() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithAggs() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(0) + .aggregation(AggregationBuilders.max("max").field("number")) + ); + testCase((NodeClient) client(), request, shards, false); + } + } + + public void testSearchProgressWithHitsAndAggs() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + .aggregation(AggregationBuilders.max("max").field("number")) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithQuery() throws Exception { + for (SearchType searchType : SearchType.values()) { + SearchRequest request = new SearchRequest("index-*") + .searchType(searchType) + .source( + new SearchSourceBuilder() + .size(10) + .query(QueryBuilders.termQuery("foo", "bar")) + ); + testCase((NodeClient) client(), request, shards, true); + } + } + + public void testSearchProgressWithShardSort() throws Exception { + SearchRequest request = new SearchRequest("index-*") + .source( + new SearchSourceBuilder() + .size(0) + .sort(new FieldSortBuilder("number").order(SortOrder.DESC)) + ); + request.setPreFilterShardSize(1); + List sortShards = new ArrayList<>(shards); + Collections.sort(sortShards, Comparator.reverseOrder()); + testCase((NodeClient) client(), request, sortShards, false); + } + + private static void testCase(NodeClient client, SearchRequest request, + List expectedShards, boolean hasFetchPhase) throws InterruptedException { + AtomicInteger numQueryResults = new AtomicInteger(); + AtomicInteger numQueryFailures = new AtomicInteger(); + AtomicInteger numFetchResults = new AtomicInteger(); + AtomicInteger numFetchFailures = new AtomicInteger(); + AtomicInteger numReduces = new AtomicInteger(); + AtomicReference searchResponse = new AtomicReference<>(); + AtomicReference> shardsListener = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + SearchProgressActionListener listener = new SearchProgressActionListener() { + @Override + public void onListShards(List shards, boolean fetchPhase) { + shardsListener.set(shards); + assertEquals(fetchPhase, hasFetchPhase); + } + + @Override + public void onQueryResult(int shardIndex) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); + numQueryResults.incrementAndGet(); + } + + @Override + public void onQueryFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); + numQueryFailures.incrementAndGet(); + } + + @Override + public void onFetchResult(int shardIndex) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); + numFetchResults.incrementAndGet(); + } + + @Override + public void onFetchFailure(int shardIndex, Exception exc) { + assertThat(shardIndex, lessThan(shardsListener.get().size())); + numFetchFailures.incrementAndGet(); + } + + @Override + public void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int version) { + numReduces.incrementAndGet(); + } + + @Override + public void onReduce(List shards, TotalHits totalHits, InternalAggregations aggs) { + numReduces.incrementAndGet(); + } + + @Override + public void onResponse(SearchResponse response) { + searchResponse.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + }; + client.executeSearchLocally(request, listener); + latch.await(); + + assertThat(shardsListener.get(), equalTo(expectedShards)); + assertThat(numQueryResults.get(), equalTo(searchResponse.get().getSuccessfulShards())); + assertThat(numQueryFailures.get(), equalTo(searchResponse.get().getFailedShards())); + if (hasFetchPhase) { + assertThat(numFetchResults.get(), equalTo(searchResponse.get().getSuccessfulShards())); + assertThat(numFetchFailures.get(), equalTo(0)); + } else { + assertThat(numFetchResults.get(), equalTo(0)); + assertThat(numFetchFailures.get(), equalTo(0)); + } + assertThat(numReduces.get(), equalTo(searchResponse.get().getNumReducePhases())); + } + + private static List createRandomIndices(Client client) { + int numIndices = randomIntBetween(3, 20); + for (int i = 0; i < numIndices; i++) { + String indexName = String.format(Locale.ROOT, "index-%03d" , i); + assertAcked(client.admin().indices().prepareCreate(indexName).get()); + client.prepareIndex(indexName, "doc", Integer.toString(i)).setSource("number", i, "foo", "bar").get(); + } + client.admin().indices().prepareRefresh("index-*").get(); + ClusterSearchShardsResponse resp = client.admin().cluster().prepareSearchShards("index-*").get(); + return Arrays.stream(resp.getGroups()) + .map(e -> new SearchShard(null, e.getShardId())) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index fa6160839d2a9..10f252c30dc3b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -19,14 +19,11 @@ package org.elasticsearch.action.search; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -177,62 +174,4 @@ public void testFinalReduce() { assertEquals(2, longTerms.getBuckets().size()); } } - - public void testSplitIndices() { - { - CreateIndexResponse response = client().admin().indices().prepareCreate("write").get(); - assertTrue(response.isAcknowledged()); - } - { - CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get(); - assertTrue(response.isAcknowledged()); - } - { - SearchResponse response = client().prepareSearch("readonly").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("write").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("readonly", "write").get(); - assertEquals(2, response.getTotalShards()); - assertEquals(2, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - Settings settings = Settings.builder().put("index.blocks.read_only", "true").build(); - AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); - assertTrue(response.isAcknowledged()); - } - try { - { - SearchResponse response = client().prepareSearch("readonly").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("write").get(); - assertEquals(1, response.getTotalShards()); - assertEquals(1, response.getSuccessfulShards()); - assertEquals(1, response.getNumReducePhases()); - } - { - SearchResponse response = client().prepareSearch("readonly", "write").get(); - assertEquals(2, response.getTotalShards()); - assertEquals(2, response.getSuccessfulShards()); - assertEquals(3, response.getNumReducePhases()); - } - } finally { - Settings settings = Settings.builder().put("index.blocks.read_only", "false").build(); - AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); - assertTrue(response.isAcknowledged()); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index a2c83c43a9770..e9aeff6847ad5 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -29,10 +29,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIteratorTests; @@ -841,75 +837,4 @@ public void testShouldMinimizeRoundtrips() throws Exception { assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); } } - - public void testShouldSplitIndices() { - { - SearchRequest searchRequest = new SearchRequest(); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder()); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100))); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.scroll("5s"); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder().size(0)); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); - } - } - - public void testSplitIndices() { - int numIndices = randomIntBetween(1, 10); - Index[] indices = new Index[numIndices]; - for (int i = 0; i < numIndices; i++) { - String indexName = randomAlphaOfLengthBetween(5, 10); - indices[i] = new Index(indexName, indexName + "-uuid"); - } - { - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); - List writeIndices = new ArrayList<>(); - List readOnlyIndices = new ArrayList<>(); - TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); - assertEquals(0, readOnlyIndices.size()); - assertEquals(numIndices, writeIndices.size()); - } - { - List expectedWrite = new ArrayList<>(); - List expectedReadOnly = new ArrayList<>(); - ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder(); - for (Index index : indices) { - if (randomBoolean()) { - blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK); - expectedReadOnly.add(index.getName()); - } else if(randomBoolean() ){ - blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK); - expectedReadOnly.add(index.getName()); - } else { - expectedWrite.add(index.getName()); - } - } - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).blocks(blocksBuilder).build(); - List writeIndices = new ArrayList<>(); - List readOnlyIndices = new ArrayList<>(); - TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); - assertEquals(writeIndices, expectedWrite); - assertEquals(readOnlyIndices, expectedReadOnly); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java index ffc41cab847b9..e16227c4ac6f8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -69,5 +69,4 @@ public void testShardCountLimit() throws Exception { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null))); } } - } diff --git a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index 31705bfa5f720..4dbbe100259cd 100644 --- a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -41,9 +41,10 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase { @Override protected Client buildClient(Settings headersSettings, ActionType[] testedActions) { Settings settings = HEADER_SETTINGS; + TaskManager taskManager = new TaskManager(settings, threadPool, Collections.emptySet()); Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - client.initialize(actions, () -> "test", null); + client.initialize(actions, taskManager, () -> "test", null); return client; } diff --git a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java index 88e636fe8e9e7..803eea1c06d19 100644 --- a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -77,7 +77,7 @@ public void testSlowLogHasJsonFields() throws IOException { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); SearchSlowLog.SearchSlowLogMessage p = new SearchSlowLog.SearchSlowLogMessage(searchContext, 10); @@ -97,7 +97,7 @@ public void testSlowLogWithTypes() throws IOException { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); searchContext.getQueryShardContext().setTypes("type1", "type2"); SearchSlowLog.SearchSlowLogMessage p = new SearchSlowLog.SearchSlowLogMessage(searchContext, 10); @@ -118,7 +118,7 @@ public void testSlowLogsWithStats() throws IOException { SearchContext searchContext = createSearchContext(index,"group1"); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); SearchSlowLog.SearchSlowLogMessage p = new SearchSlowLog.SearchSlowLogMessage(searchContext, 10); @@ -127,7 +127,7 @@ public void testSlowLogsWithStats() throws IOException { searchContext = createSearchContext(index, "group1", "group2"); source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); p = new SearchSlowLog.SearchSlowLogMessage(searchContext, 10); assertThat(p.getValueFor("stats"), equalTo("[\\\"group1\\\", \\\"group2\\\"]")); @@ -138,7 +138,7 @@ public void testSlowLogSearchContextPrinterToLog() throws IOException { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); - searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, + searchContext.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); SearchSlowLog.SearchSlowLogMessage p = new SearchSlowLog.SearchSlowLogMessage(searchContext, 10); assertThat(p.getFormattedMessage(), startsWith("[foo][0]")); diff --git a/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestValidateQueryActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestValidateQueryActionTests.java index 86dfc2a86fbfe..e1e66b503e791 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestValidateQueryActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestValidateQueryActionTests.java @@ -82,7 +82,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listen final Map actions = new HashMap<>(); actions.put(ValidateQueryAction.INSTANCE, transportAction); - client.initialize(actions, () -> "local", null); + client.initialize(actions, taskManager, () -> "local", null); } @AfterClass diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 4aa819f384bc9..c410066dbe693 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -313,13 +313,13 @@ public void onFailure(Exception e) { new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), - new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); + new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */); PlainActionFuture listener = new PlainActionFuture<>(); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); + service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); if (useScroll) { service.freeContext(searchPhaseResult.getRequestId()); diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 2190e573707e6..53aa0a65bfba9 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -63,7 +63,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.search.ESToParentBlockJoinQuery; @@ -110,7 +110,7 @@ private void countTestCase(Query query, IndexReader reader, boolean shouldCollec TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); final IndexSearcher searcher = shouldCollectSearch ? new IndexSearcher(reader) : getAssertingEarlyTerminationSearcher(reader, 0); @@ -198,7 +198,7 @@ public void testPostFilterDisablesCountOptimization() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); @@ -228,7 +228,7 @@ public void testTerminateAfterWithFilter() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = new IndexSearcher(reader); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); context.setSize(10); @@ -257,7 +257,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); assertEquals(1, context.queryResult().topDocs().topDocs.totalHits.value); @@ -271,7 +271,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { public void testQueryCapturesThreadPoolStats() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); Directory dir = newDirectory(); @@ -284,6 +284,8 @@ public void testQueryCapturesThreadPoolStats() throws Exception { w.close(); IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = new IndexSearcher(reader); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); QuerySearchResult results = context.queryResult(); @@ -313,7 +315,7 @@ public void testInOrderScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -351,7 +353,7 @@ public void testTerminateAfterEarlyTermination() throws Exception { } w.close(); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); final IndexReader reader = DirectoryReader.open(dir); @@ -458,7 +460,7 @@ public void testIndexSortingEarlyTermination() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW})); final IndexReader reader = DirectoryReader.open(dir); @@ -545,7 +547,7 @@ public void testIndexSortScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = null; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); @@ -599,7 +601,7 @@ public void testDisableTopScoreCollection() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = new IndexSearcher(reader); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); Query q = new SpanNearQuery.Builder("title", true) .addClause(new SpanTermQuery(new Term("title", "foo"))) .addClause(new SpanTermQuery(new Term("title", "bar"))) @@ -704,7 +706,7 @@ public void testMinScore() throws Exception { .build() )); context.minimumScore(0.01f); - context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(1); context.trackTotalHitsUpTo(5); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9f6575428bdbc..bdf6cd3eabdb0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1240,7 +1240,8 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon transportService, clusterService, threadPool, snapshotsService, actionFilters, indexNameExpressionResolver )); - client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); + client.initialize(actions, transportService.getTaskManager(), + () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); } private Repository.Factory getRepoFactory(Environment environment) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 83da817f64bf2..32b1245650e94 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -22,7 +22,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -79,7 +79,7 @@ public class TestSearchContext extends SearchContext { ParsedQuery postFilter; Query query; Float minScore; - SearchTask task; + SearchShardTask task; SortAndFormats sort; boolean trackScores = false; int trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO; @@ -604,12 +604,12 @@ public QueryShardContext getQueryShardContext() { } @Override - public void setTask(SearchTask task) { + public void setTask(SearchShardTask task) { this.task = task; } @Override - public SearchTask getTask() { + public SearchShardTask getTask() { return task; } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml index f74d6ae900037..16a0aace0e444 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml @@ -82,25 +82,6 @@ - match: {hits.total: 0} -- do: - index: - index: ordinary - id: "1" - body: { "foo": "Hello: 1" } - refresh: wait_for - -- do: - search: - rest_total_hits_as_int: true - index: [test, ordinary] - ignore_throttled: false - body: - query: - match: - foo: hello - -- match: {hits.total: 3} - --- "Test index options":