Skip to content

Commit

Permalink
Pre-sort shards based on the max/min value of the primary sort field (#…
Browse files Browse the repository at this point in the history
…49092)

This change automatically pre-sort search shards on search requests that use a primary sort based on the value of a field. When possible, the can_match phase will extract the min/max (depending on the provided sort order) values of each shard and use it to pre-sort the shards prior to running the subsequent phases. This feature can be useful to ensure that shards that contain recent data are executed first so that intermediate merge have more chance to contain contiguous data (think of date_histogram for instance) but it could also be used in a follow up to early terminate sorted top-hits queries that don't require the total hit count. The latter could significantly speed up the retrieval of the most/least
recent documents from time-based indices.

Relates #49091
  • Loading branch information
jimczi committed Nov 22, 2019
1 parent c13fce6 commit ed4eecc
Show file tree
Hide file tree
Showing 15 changed files with 616 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
Expand All @@ -40,8 +50,12 @@
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
* This phase can also be used to pre-sort shards based on min/max values in each shard of the provided primary sort.
* When the query primary sort is perform on a field, this phase extracts the min/max value in each shard and
* sort them according to the provided order. This can be useful for instance to ensure that shards that contain recent
* data are executed first when sorting by descending timestamp.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -58,26 +72,26 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchService.CanMatchResponse> listener) {
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}

private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
Expand All @@ -86,6 +100,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
SearchSourceBuilder source = getRequest().source();
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
Expand All @@ -94,24 +109,48 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
iter.resetAndSkip();
}
}
return shardsIts;
if (shouldSortShards(results.minAndMaxes) == false) {
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(source);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()), false);
}

private static final class BitSetSearchPhaseResults extends SearchPhaseResults<SearchService.CanMatchResponse> {
private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
MinAndMax<?>[] minAndMaxes,
SortOrder order) {
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted(shardComparator(shardsIts, minAndMaxes, order))
.map(ord -> shardsIts.get(ord))
.collect(Collectors.toList());
}

private static boolean shouldSortShards(MinAndMax<?>[] minAndMaxes) {
return Arrays.stream(minAndMaxes).anyMatch(Objects::nonNull);
}

private static Comparator<Integer> shardComparator(GroupShardsIterator<SearchShardIterator> shardsIts,
MinAndMax<?>[] minAndMaxes,
SortOrder order) {
final Comparator<Integer> comparator = Comparator.comparing(index -> minAndMaxes[index], MinAndMax.getComparator(order));
return comparator.thenComparing(index -> shardsIts.get(index).shardId());
}

private static final class CanMatchSearchPhaseResults extends SearchPhaseResults<CanMatchResponse> {
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;

BitSetSearchPhaseResults(int size) {
CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax[size];
}

@Override
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
void consumeResult(CanMatchResponse result) {
consumeResult(result.getShardIndex(), result.canMatch(), result.minAndMax());
}

@Override
Expand All @@ -120,12 +159,18 @@ boolean hasResult(int shardIndex) {
}

@Override
synchronized void consumeShardFailure(int shardIndex) {
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
consumeResult(shardIndex, true, null);
}

synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax<?> minAndMax) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
minAndMaxes[shardIndex] = minAndMax;
}

synchronized int getNumPossibleMatches() {
return numPossibleMatches;
Expand All @@ -136,7 +181,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
Stream<CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {

private final int requestIndex;
final int requestIndex;
private final SearchShardTarget searchShardTarget;

protected SearchActionListener(SearchShardTarget searchShardTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -615,9 +616,9 @@ static BiFunction<String, String, Transport.Connection> buildConnectionLookup(St
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
&& (SearchService.canRewriteToMatchNone(source) || FieldSortBuilder.hasPrimaryFieldSort(source))
&& searchRequest.getPreFilterShardSize() < shardIterators.size();
}

static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
this(iterators, true);
}

/**
* Constructs a new GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
this.iterators = iterators;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public static final class DateFieldType extends MappedFieldType {
protected DateMathParser dateMathParser;
protected Resolution resolution;

DateFieldType() {
public DateFieldType() {
super();
setTokenized(false);
setHasDocValues(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public final String typeName() {
return name;
}
/** Get the associated numeric type */
final NumericType numericType() {
public final NumericType numericType() {
return numericType;
}
public abstract Query termQuery(String field, Object value);
Expand Down Expand Up @@ -909,6 +909,10 @@ public String typeName() {
return type.name;
}

public NumericType numericType() {
return type.numericType();
}

@Override
public Query existsQuery(QueryShardContext context) {
if (hasDocValues()) {
Expand Down
32 changes: 26 additions & 6 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
Expand Down Expand Up @@ -92,6 +93,8 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.rescore.RescorerBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -1013,7 +1016,7 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
* This method can have false positives while if it returns <code>false</code> the query won't match any documents on the current
* shard.
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
Expand All @@ -1023,18 +1026,20 @@ public boolean canMatch(ShardSearchRequest request) throws IOException {
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, false);
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
if (canRewriteToMatchNone(request.source())) {
QueryBuilder queryBuilder = request.source().query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
return new CanMatchResponse(queryBuilder instanceof MatchNoneQueryBuilder == false, minMax);
}
return true; // null query means match_all
// null query means match_all
return new CanMatchResponse(true, minMax);
}
}


public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(new CanMatchResponse(canMatch(request)));
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
Expand All @@ -1053,6 +1058,7 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}


/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
Expand Down Expand Up @@ -1088,24 +1094,38 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce

public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final MinAndMax<?> minAndMax;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
minAndMax = in.readOptionalWriteable(MinAndMax::new);
} else {
minAndMax = null;
}
}

public CanMatchResponse(boolean canMatch) {
public CanMatchResponse(boolean canMatch, MinAndMax<?> minAndMax) {
this.canMatch = canMatch;
this.minAndMax = minAndMax;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalWriteable(minAndMax);
}
}

public boolean canMatch() {
return canMatch;
}

public MinAndMax<?> minAndMax() {
return minAndMax;
}
}

/**
Expand Down

0 comments on commit ed4eecc

Please sign in to comment.