Skip to content

Commit

Permalink
Cumulative backport of async search changes (#53635)
Browse files Browse the repository at this point in the history
* Submit async search to work only with POST (#53368)

Currently the submit async search API can be called using both GET and POST at REST, but given that it submits a call and creates internal state, POST should be the only allowed method.

* Refine SearchProgressListener internal API (#53373)

The following cumulative improvements have been made:
- rename `onReduce` and `notifyReduce` to `onFinalReduce` and `notifyFinalReduce`
- add unit test for `SearchShard`
- on* methods in `SearchProgressListener` shouldn't need to be public as they should never be called directly, they only need to be overridden hence they can be made protected. They are actually called directly from a test which required some adapting, like making `AsyncSearchTask.Listener` class package private instead of private
- Instead of overriding `getProgressListener` in `AsyncSearchTask`, as it feels weird to override a getter method, added a specific method that allows to retrieve the Listener directly without needing to cast it. Made the getter and setter for the listener final in the base class.
- rename `SearchProgressListener#searchShards` methods to `buildSearchShards` and make it static given that it accesses no instance members
- make `SearchShard` and `SearchShardTask` classes final

* Move async search yaml tests to x-pack yaml test folder (#53537)

The yaml tests for async search currently sit in its qa folder. There is no reason though for them to live in a separate folder as they don't require particular setup. This commit moves them to the main folder together with the other x-pack yaml tests so that they will be run by the client test runners too.

* [DOCS] Add temporary redirect for async-search (#53454)

The following API spec files contain a link to a not-yet-created
async search docs page:

* [async_search.delete.json][0]
* [async_search.get.json][1]
* [async_search.submit.json][2]

The Elaticsearch-js client uses these spec files to create their docs.
This created a broken link in the Elaticsearch-js docs, which has broken
the docs build.

This PR adds a temporary redirect for the docs page. This redirect
should be removed when the actual API docs are added.

[0]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.delete.json
[1]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.get.json
[2]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.submit.json

Co-authored-by: James Rodewig <james.rodewig@elastic.co>
  • Loading branch information
javanna and jrodewig committed Mar 16, 2020
1 parent 9845dbb commit c3d2417
Show file tree
Hide file tree
Showing 23 changed files with 151 additions and 182 deletions.
5 changes: 5 additions & 0 deletions docs/reference/redirects.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,8 @@ See <<slm-api-stop>>.
=== How {ccs} works

See <<ccs-gateway-seed-nodes>> and <<ccs-min-roundtrips>>.

[role="exclude",id="async-search"]
=== Asynchronous search

coming::[7.x]
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
this.searchPhaseController = searchPhaseController;
SearchProgressListener progressListener = task.getProgressListener();
SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
numReducePhases++;
index = 1;
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards),
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
final int i = index++;
Expand Down Expand Up @@ -710,7 +710,7 @@ public ReducedQueryPhase reduce() {
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false,
aggReduceContextBuilder, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
progressListener.notifyFinalReduce(SearchProgressListener.buildSearchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
Expand Down Expand Up @@ -767,8 +767,8 @@ ReducedQueryPhase reduce() {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, aggReduceContextBuilder, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
reducePhase.aggregations, reducePhase.numReducePhases);
listener.notifyFinalReduce(SearchProgressListener.buildSearchShards(resultList),
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ abstract class SearchProgressListener {
* @param clusters The statistics for remote clusters included in the search.
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
**/
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}
protected void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}

/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
*/
public void onQueryResult(int shardIndex) {}
protected void onQueryResult(int shardIndex) {}

/**
* Executed when a shard reports a query failure.
Expand All @@ -70,7 +70,7 @@ public void onQueryResult(int shardIndex) {}
* @param shardTarget The last shard target that thrown an exception.
* @param exc The cause of the failure.
*/
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
Expand All @@ -81,7 +81,7 @@ public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Except
* @param aggs The partial result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
protected void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed once when the final reduce is created.
Expand All @@ -91,22 +91,22 @@ public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, Inter
* @param aggs The final result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
protected void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed when a shard returns a fetch result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
*/
public void onFetchResult(int shardIndex) {}
protected void onFetchResult(int shardIndex) {}

/**
* Executed when a shard reports a fetch failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param exc The cause of the failure.
*/
public void onFetchFailure(int shardIndex, Exception exc) {}
protected void onFetchFailure(int shardIndex, Exception exc) {}

final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
this.shards = shards;
Expand Down Expand Up @@ -143,9 +143,9 @@ final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, In
}
}

final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
protected final void notifyFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onReduce(shards, totalHits, aggs, reducePhase);
onFinalReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
}
Expand All @@ -169,25 +169,25 @@ final void notifyFetchFailure(int shardIndex, Exception exc) {
}
}

final List<SearchShard> searchShards(List<? extends SearchPhaseResult> results) {
List<SearchShard> lst = results.stream()
static List<SearchShard> buildSearchShards(List<? extends SearchPhaseResult> results) {
List<SearchShard> lst = results.stream()
.filter(Objects::nonNull)
.map(SearchPhaseResult::getSearchShardTarget)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
.collect(Collectors.toList());
return Collections.unmodifiableList(lst);
}

final List<SearchShard> searchShards(SearchShardTarget[] results) {
List<SearchShard> lst = Arrays.stream(results)
static List<SearchShard> buildSearchShards(SearchShardTarget[] results) {
List<SearchShard> lst = Arrays.stream(results)
.filter(Objects::nonNull)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
.collect(Collectors.toList());
return Collections.unmodifiableList(lst);
}

final List<SearchShard> searchShards(GroupShardsIterator<SearchShardIterator> its) {
List<SearchShard> lst = StreamSupport.stream(its.spliterator(), false)
static List<SearchShard> buildSearchShards(GroupShardsIterator<SearchShardIterator> its) {
List<SearchShard> lst = StreamSupport.stream(its.spliterator(), false)
.map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))
.collect(Collectors.toList());
return Collections.unmodifiableList(lst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
this.progressListener = task.getProgressListener();
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A class that encapsulates the {@link ShardId} and the cluster alias
* of a shard used during the search action.
*/
public class SearchShard implements Comparable<SearchShard> {
public final class SearchShard implements Comparable<SearchShard> {
@Nullable
private final String clusterAlias;
private final ShardId shardId;
Expand All @@ -40,8 +40,7 @@ public SearchShard(@Nullable String clusterAlias, ShardId shardId) {
}

/**
* Return the cluster alias if the shard is on a remote cluster and <code>null</code>
* otherwise (local).
* Return the cluster alias if we are executing a cross cluster search request, <code>null</code> otherwise.
*/
@Nullable
public String getClusterAlias() {
Expand All @@ -51,7 +50,6 @@ public String getClusterAlias() {
/**
* Return the {@link ShardId} of this shard.
*/
@Nullable
public ShardId getShardId() {
return shardId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ public SearchShardTask(long id, String type, String action, String description,
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public SearchTask(long id, String type, String action, String description, TaskI
/**
* Attach a {@link SearchProgressListener} to this task.
*/
public void setProgressListener(SearchProgressListener progressListener) {
public final void setProgressListener(SearchProgressListener progressListener) {
this.progressListener = progressListener;
}

/**
* Return the {@link SearchProgressListener} attached to this task.
*/
public SearchProgressListener getProgressListener() {
public final SearchProgressListener getProgressListener() {
return progressListener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, Inter
}

@Override
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
totalHitsListener.set(totalHits);
finalAggsListener.set(aggs);
numReduceListener.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, Inter
}

@Override
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
numReduces.incrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class SearchShardTests extends ESTestCase {

public void testEqualsAndHashcode() {
String index = randomAlphaOfLengthBetween(5, 10);
SearchShard searchShard = new SearchShard(randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10),
new ShardId(index, index + "-uuid", randomIntBetween(0, 1024)));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(searchShard,
s -> new SearchShard(s.getClusterAlias(), s.getShardId()),
s -> {
if (randomBoolean()) {
return new SearchShard(s.getClusterAlias() == null ? randomAlphaOfLengthBetween(3, 10) : null, s.getShardId());
} else {
String indexName = s.getShardId().getIndexName();
int shardId = s.getShardId().getId();
if (randomBoolean()) {
indexName += randomAlphaOfLength(5);
} else {
shardId += randomIntBetween(1, 1024);
}
return new SearchShard(s.getClusterAlias(), new ShardId(indexName, indexName + "-uuid", shardId));
}
});
}

public void testCompareTo() {
List<SearchShard> searchShards = new ArrayList<>();
Index index0 = new Index("index0", "index0-uuid");
Index index1 = new Index("index1", "index1-uuid");
searchShards.add(new SearchShard(null, new ShardId(index0, 0)));
searchShards.add(new SearchShard(null, new ShardId(index1, 0)));
searchShards.add(new SearchShard(null, new ShardId(index0, 1)));
searchShards.add(new SearchShard(null, new ShardId(index1, 1)));
searchShards.add(new SearchShard(null, new ShardId(index0, 2)));
searchShards.add(new SearchShard(null, new ShardId(index1, 2)));
searchShards.add(new SearchShard("", new ShardId(index0, 0)));
searchShards.add(new SearchShard("", new ShardId(index1, 0)));
searchShards.add(new SearchShard("", new ShardId(index0, 1)));
searchShards.add(new SearchShard("", new ShardId(index1, 1)));

searchShards.add(new SearchShard("remote0", new ShardId(index0, 0)));
searchShards.add(new SearchShard("remote0", new ShardId(index1, 0)));
searchShards.add(new SearchShard("remote0", new ShardId(index0, 1)));
searchShards.add(new SearchShard("remote0", new ShardId(index0, 2)));
searchShards.add(new SearchShard("remote1", new ShardId(index0, 0)));
searchShards.add(new SearchShard("remote1", new ShardId(index1, 0)));
searchShards.add(new SearchShard("remote1", new ShardId(index0, 1)));
searchShards.add(new SearchShard("remote1", new ShardId(index1, 1)));

List<SearchShard> sorted = new ArrayList<>(searchShards);
Collections.sort(sorted);
assertEquals(searchShards, sorted);
}
}
30 changes: 0 additions & 30 deletions x-pack/plugin/async-search/qa/rest/build.gradle

This file was deleted.

This file was deleted.

Loading

0 comments on commit c3d2417

Please sign in to comment.