Skip to content

Commit

Permalink
Add new x-pack endpoints to track the progress of a search asynchrono…
Browse files Browse the repository at this point in the history
…usly (#49931)

### High level view

This change introduces a new API in x-pack basic that allows to track the progress of a search.
Users can submit an asynchronous search through a new endpoint called `_async_search` that
works exactly the same as the `_search` endpoint but instead of blocking and returning the final response when available, it returns a response after a provided `wait_for_completion` time.

````
# Submit an _async_search and waits up to 100ms for a final response
GET my_index_pattern*/_async_search?wait_for_completion=100ms
{
  "aggs": {
    "date_histogram": {
      "field": "@timestamp",
      "fixed_interval": "1h"
    }
  }
}
````

If after 100ms the final response is not available, a `partial_response` is included in the body:

````
{
  "id": "9N3J1m4BgyzUDzqgC15b",
  "version": 1,
  "is_running": true,
  "is_partial": true,
  "response": {
   "_shards": {
       "total": 100,
       "successful": 5,
       "failed": 0
    },
    "total_hits": {
      "value": 1653433,
      "relation": "eq"
    },
    "aggs": {
      ...
    }
  }
}
````

The partial response contains the total number of requested shards, the number of shards that successfully returned and the number of shards that failed.
It also contains the total hits as well as partial aggregations computed from the successful shards.
To continue to monitor the progress of the search users can call the get `_async_search` API like the following:

````
GET _async_search/9N3J1m4BgyzUDzqgC15b/?wait_for_completion=100ms
````

That returns a new response that can contain the same partial response than the previous call if the search didn't progress, in such case the returned `version`
should be the same. If new partial results are available, the version is incremented and the `partial_response` contains the updated progress.
Finally if the response is fully available while or after waiting for completion, the `partial_response` is replaced by a `response` section that contains the usual _search response:

````
{
  "id": "9N3J1m4BgyzUDzqgC15b",
  "version": 10,
  "is_running": false,
  "response": {
     "is_partial": false,
     ...
  }
}
````

## Persistency

Asynchronous search are stored in a restricted index called `.async-search` if they survive (still running) after the initial submit. Each request has a keep alive that defaults to 5 days but this value can be changed/updated any time:
`````
GET my_index_pattern*/_async_search?wait_for_completion=100ms&keep_alive=10d
`````
The default can be changed when submitting the search, the example above raises the default value for the search to `10d`. 
`````
GET _async_search/9N3J1m4BgyzUDzqgC15b/?wait_for_completion=100ms&keep_alive=10d
`````
The time to live for a specific search can be extended when getting the progress/result. In the example above we extend the keep alive to 10 more days.
A background service that runs only on the node that holds the first primary shard of the `async-search` index is responsible for deleting the expired results. It runs every hour but the expiration is also checked by running queries (if they take longer than the keep_alive) and when getting a result.

Like a normal `_search`, if the http channel that is used to submit a request is closed before getting a response, the search is automatically cancelled. Note that this behavior is only for the submit API, subsequent GET requests will not cancel if they are closed. 

## Resiliency

Asynchronous search are not persistent, if the coordinator node crashes or is restarted during the search, the asynchronous search will stop. To know if the search is still running or not the response contains a field called `is_running` that indicates if the task is up or not. It is the responsibility of the user to resume an asynchronous search that didn't reach a final response by re-submitting the query. However final responses and failures are persisted in a system index that allows
to retrieve a response even if the task finishes.

````
DELETE _async_search/9N3J1m4BgyzUDzqgC15b
````

The response is also not stored if the initial submit action returns a final response. This allows to not add any overhead to queries that completes within the initial `wait_for_completion`.

## Security

The `.async-search` index is a restricted index (should be migrated to a system index in +8.0) that is accessible only through the async search APIs. These APIs also ensure that only the user that submitted the initial query can retrieve or delete the running search. Note that admins/superusers would still be able to cancel the search task through the task manager like any other tasks.

Relates #49091

Co-authored-by: Luca Cavanna <javanna@users.noreply.github.com>
  • Loading branch information
jimczi and javanna committed Mar 10, 2020
1 parent 0089805 commit 146b2a8
Show file tree
Hide file tree
Showing 58 changed files with 4,602 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
Expand Down Expand Up @@ -385,7 +385,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
onShardGroupFailure(shardIndex, e);
onShardGroupFailure(shardIndex, shardTarget, e);
onPhaseDone();
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
Expand All @@ -405,18 +405,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
}
}
onShardGroupFailure(shardIndex, e);
onShardGroupFailure(shardIndex, shardTarget, e);
}
}
}

/**
* Executed once for every {@link ShardId} that failed on all available shard routing.
*
* @param shardIndex the shard target that failed
* @param exc the final failure reason
* @param shardIndex the shard index that failed
* @param shardTarget the last shard target for this failure
* @param exc the last failure reason
*/
protected void onShardGroupFailure(int shardIndex, Exception exc) {}
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -72,8 +71,6 @@ public void run() throws IOException {
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
resultList.size(),
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand All @@ -97,7 +94,7 @@ public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.contextId()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -51,6 +52,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
SearchProgressListener progressListener = task.getProgressListener();
SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
numReducePhases++;
index = 1;
if (hasAggs) {
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
final int i = index++;
Expand Down Expand Up @@ -696,7 +696,7 @@ public ReducedQueryPhase reduce() {
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations);
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}

Expand Down Expand Up @@ -751,7 +751,8 @@ ReducedQueryPhase reduce() {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -48,24 +49,27 @@ abstract class SearchProgressListener {
* Executed when shards are ready to be queried.
*
* @param shards The list of shards to query.
* @param skippedShards The list of skipped shards.
* @param clusters The statistics for remote clusters included in the search.
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
**/
public void onListShards(List<SearchShard> shards, boolean fetchPhase) {}
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}

/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
*/
public void onQueryResult(int shardIndex) {}

/**
* Executed when a shard reports a query failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param shardTarget The last shard target that thrown an exception.
* @param exc The cause of the failure.
*/
public void onQueryFailure(int shardIndex, Exception exc) {}
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
Expand All @@ -74,38 +78,39 @@ public void onQueryFailure(int shardIndex, Exception exc) {}
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The partial result for aggregations.
* @param version The version number for this reduce.
* @param reducePhase The version number for this reduce.
*/
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {}
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed once when the final reduce is created.
*
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The final result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {}
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed when a shard returns a fetch result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
*/
public void onFetchResult(int shardIndex) {}

/**
* Executed when a shard reports a fetch failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param exc The cause of the failure.
*/
public void onFetchFailure(int shardIndex, Exception exc) {}

final void notifyListShards(List<SearchShard> shards, boolean fetchPhase) {
final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
this.shards = shards;
try {
onListShards(shards, fetchPhase);
onListShards(shards, skippedShards, clusters, fetchPhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e);
}
Expand All @@ -120,26 +125,26 @@ final void notifyQueryResult(int shardIndex) {
}
}

final void notifyQueryFailure(int shardIndex, Exception exc) {
final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
try {
onQueryFailure(shardIndex, exc);
onQueryFailure(shardIndex, shardTarget, exc);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure",
shards.get(shardIndex)), e);
}
}

final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onPartialReduce(shards, totalHits, aggs, version);
onPartialReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e);
}
}

final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onReduce(shards, totalHits, aggs);
onReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -57,7 +58,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
Expand All @@ -67,8 +68,8 @@ protected void executePhaseOnShard(final SearchShardIterator shardIt, final Shar
}

@Override
protected void onShardGroupFailure(int shardIndex, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, exc);
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -56,9 +55,9 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
Expand Down Expand Up @@ -560,7 +559,7 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public SearchRequestBuilder setVersion(boolean version) {
sourceBuilder().version(version);
return this;
}

/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}

public SearchResponseSections getInternalResponse() {
return internalResponse;
}

/**
* The search hits.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SearchShard implements Comparable<SearchShard> {
private final String clusterAlias;
private final ShardId shardId;

SearchShard(@Nullable String clusterAlias, ShardId shardId) {
public SearchShard(@Nullable String clusterAlias, ShardId shardId) {
this.clusterAlias = clusterAlias;
this.shardId = shardId;
}
Expand Down
38 changes: 0 additions & 38 deletions server/src/main/java/org/elasticsearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchProgressActionListener;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
Expand Down Expand Up @@ -108,38 +102,6 @@ > Task executeLocally(ActionType<Response> action, Request request, TaskListener
listener::onResponse, listener::onFailure);
}

/**
* Execute a {@link SearchRequest} locally and track the progress of the request through
* a {@link SearchProgressActionListener}.
*/
public SearchTask executeSearchLocally(SearchRequest request, SearchProgressActionListener listener) {
// we cannot track the progress if remote cluster requests are splitted.
request.setCcsMinimizeRoundtrips(false);
TransportSearchAction action = (TransportSearchAction) actions.get(SearchAction.INSTANCE);
SearchTask task = (SearchTask) taskManager.register("transport", action.actionName, request);
task.setProgressListener(listener);
action.execute(task, request, new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {
try {
taskManager.unregister(task);
} finally {
listener.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try {
taskManager.unregister(task);
} finally {
listener.onFailure(e);
}
}
});
return task;
}

/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));

checkRestTotalHits(request, searchRequest);
}
Expand Down
Loading

0 comments on commit 146b2a8

Please sign in to comment.