Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-sort shards based on the max/min value of the primary sort field #49092

Merged
merged 14 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
Expand All @@ -40,8 +50,12 @@
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
* This phase can also be used to pre-sort shards based on min/max values in each shard of the provided primary sort.
* When the query primary sort is perform on a field, this phase extracts the min/max value in each shard and
* sort them according to the provided order. This can be useful for instance to ensure that shards that contain recent
* data are executed first when sorting by descending timestamp.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -58,26 +72,26 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchService.CanMatchResponse> listener) {
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}

private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
Expand All @@ -86,6 +100,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
SearchSourceBuilder source = getRequest().source();
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
Expand All @@ -94,24 +109,48 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
iter.resetAndSkip();
}
}
return shardsIts;
if (shouldSortShards(results.minAndMaxes) == false) {
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(source);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()), false);
}

private static final class BitSetSearchPhaseResults extends SearchPhaseResults<SearchService.CanMatchResponse> {
private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
MinAndMax<?>[] minAndMaxes,
SortOrder order) {
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted(shardComparator(shardsIts, minAndMaxes, order))
.map(ord -> shardsIts.get(ord))
.collect(Collectors.toList());
}

private static boolean shouldSortShards(MinAndMax<?>[] minAndMaxes) {
return Arrays.stream(minAndMaxes).anyMatch(Objects::nonNull);
}

private static Comparator<Integer> shardComparator(GroupShardsIterator<SearchShardIterator> shardsIts,
MinAndMax<?>[] minAndMaxes,
SortOrder order) {
final Comparator<Integer> comparator = Comparator.comparing(index -> minAndMaxes[index], MinAndMax.getComparator(order));
return comparator.thenComparing(index -> shardsIts.get(index).shardId());
}

private static final class CanMatchSearchPhaseResults extends SearchPhaseResults<CanMatchResponse> {
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;

BitSetSearchPhaseResults(int size) {
CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax[size];
}

@Override
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
void consumeResult(CanMatchResponse result) {
consumeResult(result.getShardIndex(), result.canMatch(), result.minAndMax());
}

@Override
Expand All @@ -120,12 +159,18 @@ boolean hasResult(int shardIndex) {
}

@Override
synchronized void consumeShardFailure(int shardIndex) {
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
consumeResult(shardIndex, true, null);
}

synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax<?> minAndMax) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
minAndMaxes[shardIndex] = minAndMax;
}

synchronized int getNumPossibleMatches() {
return numPossibleMatches;
Expand All @@ -136,7 +181,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
Stream<CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {

private final int requestIndex;
final int requestIndex;
private final SearchShardTarget searchShardTarget;

protected SearchActionListener(SearchShardTarget searchShardTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -615,9 +616,9 @@ static BiFunction<String, String, Transport.Connection> buildConnectionLookup(St
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
&& (SearchService.canRewriteToMatchNone(source) || FieldSortBuilder.hasPrimaryFieldSort(source))
&& searchRequest.getPreFilterShardSize() < shardIterators.size();
}

static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
this(iterators, true);
}

/**
* Constructs a new GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
this.iterators = iterators;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public static final class DateFieldType extends MappedFieldType {
protected DateMathParser dateMathParser;
protected Resolution resolution;

DateFieldType() {
public DateFieldType() {
super();
setTokenized(false);
setHasDocValues(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public final String typeName() {
return name;
}
/** Get the associated numeric type */
final NumericType numericType() {
public final NumericType numericType() {
return numericType;
}
public abstract Query termQuery(String field, Object value);
Expand Down Expand Up @@ -909,6 +909,10 @@ public String typeName() {
return type.name;
}

public NumericType numericType() {
return type.numericType();
}

@Override
public Query existsQuery(QueryShardContext context) {
if (hasDocValues()) {
Expand Down
32 changes: 26 additions & 6 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
Expand Down Expand Up @@ -92,6 +93,8 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.rescore.RescorerBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -1012,7 +1015,7 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
* This method can have false positives while if it returns <code>false</code> the query won't match any documents on the current
* shard.
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
Expand All @@ -1022,18 +1025,20 @@ public boolean canMatch(ShardSearchRequest request) throws IOException {
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, false);
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
if (canRewriteToMatchNone(request.source())) {
QueryBuilder queryBuilder = request.source().query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
return new CanMatchResponse(queryBuilder instanceof MatchNoneQueryBuilder == false, minMax);
}
return true; // null query means match_all
// null query means match_all
return new CanMatchResponse(true, minMax);
}
}


public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(new CanMatchResponse(canMatch(request)));
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
Expand All @@ -1052,6 +1057,7 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}


/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
Expand Down Expand Up @@ -1087,24 +1093,38 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce

public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final MinAndMax<?> minAndMax;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
minAndMax = in.readOptionalWriteable(MinAndMax::new);
} else {
minAndMax = null;
}
}

public CanMatchResponse(boolean canMatch) {
public CanMatchResponse(boolean canMatch, MinAndMax<?> minAndMax) {
this.canMatch = canMatch;
this.minAndMax = minAndMax;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(minAndMax);
}
}

public boolean canMatch() {
return canMatch;
}

public MinAndMax<?> minAndMax() {
return minAndMax;
}
}

/**
Expand Down