Skip to content

Commit

Permalink
Add a listener to track the progress of a search request locally (#49471
Browse files Browse the repository at this point in the history
) (#49691)

This commit adds a function in NodeClient that allows to track the progress
of a search request locally. Progress is tracked through a SearchProgressListener
that exposes query and fetch responses as well as partial and final reduces.
This new method can be used by modules/plugins inside a node in order to track the
progress of a local search request.

Relates #49091
  • Loading branch information
jimczi committed Nov 28, 2019
1 parent 2dafecc commit 496bb9e
Show file tree
Hide file tree
Showing 39 changed files with 923 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -87,7 +88,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchResponse.Clusters clusters;

private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
private final int maxConcurrentRequestsPerNode;
Expand Down Expand Up @@ -381,6 +382,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
onShardGroupFailure(shardIndex, e);
onPhaseDone();
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
Expand All @@ -389,7 +391,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
logger.trace(() -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
if (!lastShard) {
if (lastShard == false) {
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
// no more shards active, add a failure
Expand All @@ -400,10 +402,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
}
}
onShardGroupFailure(shardIndex, e);
}
}
}

/**
* Executed once for every {@link ShardId} that failed on all available shard routing.
*
* @param shardIndex the shard target that failed
* @param exc the final failure reason
*/
protected void onShardGroupFailure(int shardIndex, Exception exc) {}

/**
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
* shard target.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand All @@ -46,13 +47,15 @@ final class DfsQueryPhase extends SearchPhase {
private final Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final SearchTransportService searchTransportService;
private final SearchProgressListener progressListener;

DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
SearchPhaseController searchPhaseController,
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context) {
super("dfs_query");
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());
this.progressListener = context.getTask().getProgressListener();
this.queryResult = searchPhaseController.newSearchPhaseResults(progressListener, context.getRequest(), context.getNumShards());
this.searchPhaseController = searchPhaseController;
this.dfsSearchResults = dfsSearchResults;
this.nextPhaseFactory = nextPhaseFactory;
Expand All @@ -69,6 +72,8 @@ public void run() throws IOException {
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
resultList.size(),
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand All @@ -92,6 +97,7 @@ public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ final class FetchSearchPhase extends SearchPhase {
private final SearchPhaseContext context;
private final Logger logger;
private final SearchPhaseResults<SearchPhaseResult> resultConsumer;
private final SearchProgressListener progressListener;

FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
Expand All @@ -72,6 +73,7 @@ final class FetchSearchPhase extends SearchPhase {
this.context = context;
this.logger = context.getLogger();
this.resultConsumer = resultConsumer;
this.progressListener = context.getTask().getProgressListener();
}

@Override
Expand Down Expand Up @@ -133,6 +135,7 @@ private void innerRun() throws IOException {
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
progressListener.notifyFetchResult(i);
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
Expand Down Expand Up @@ -165,6 +168,7 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar
@Override
public void innerOnResponse(FetchSearchResult result) {
try {
progressListener.notifyFetchResult(shardIndex);
counter.onResult(result);
} catch (Exception e) {
context.onPhaseFailure(FetchSearchPhase.this, "", e);
Expand All @@ -175,6 +179,7 @@ public void innerOnResponse(FetchSearchResult result) {
public void onFailure(Exception e) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
progressListener.notifyFetchFailure(shardIndex, e);
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.search.SearchShardTarget;

/**
* An base action listener that ensures shard target and shard index is set on all responses
* A base action listener that ensures shard target and shard index is set on all responses
* received by this listener.
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,19 +571,23 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
private final int bufferSize;
private int index;
private final SearchPhaseController controller;
private final SearchProgressListener progressListener;
private int numReducePhases = 0;
private final TopDocsStats topDocsStats;
private final boolean performFinalReduce;

/**
* Creates a new {@link QueryPhaseResultConsumer}
* @param progressListener a progress listener to be notified when a successful response is received
* and when a partial or final reduce has completed.
* @param controller a controller instance to reduce the query response objects
* @param expectedResultSize the expected number of query results. Corresponds to the number of shards queried
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
boolean hasTopDocs, boolean hasAggs, int trackTotalHitsUpTo, boolean performFinalReduce) {
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
Expand All @@ -595,6 +599,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
throw new IllegalArgumentException("either aggs or top docs must be present");
}
this.controller = controller;
this.progressListener = progressListener;
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
Expand All @@ -610,6 +615,7 @@ public void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
QuerySearchResult queryResult = result.queryResult();
consumeInternal(queryResult);
progressListener.notifyQueryResult(queryResult.getShardIndex());
}

private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
Expand All @@ -629,6 +635,10 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
Expand All @@ -652,8 +662,11 @@ private synchronized List<TopDocs> getRemainingTopDocs() {

@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
numReducePhases, false, performFinalReduce);
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations);
return reducePhase;
}

/**
Expand All @@ -678,7 +691,9 @@ private int resolveTrackTotalHits(SearchRequest request) {
/**
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressListener listener,
SearchRequest request,
int numShards) {
SearchSourceBuilder source = request.source();
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
Expand All @@ -688,14 +703,24 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest r
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, request.isFinalReduce());
}
}
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
listener.notifyQueryResult(result.queryResult().getShardIndex());
}

@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
return reducePhase;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;

/**
* An {@link ActionListener} for search requests that allows to track progress of the {@link SearchAction}.
* See {@link SearchProgressListener}.
*/
public abstract class SearchProgressActionListener extends SearchProgressListener implements ActionListener<SearchResponse> {
}
Loading

0 comments on commit 496bb9e

Please sign in to comment.