Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-sort shards based on the max/min value of the primary sort field #49092

Merged
merged 14 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
Expand All @@ -40,8 +48,12 @@
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
* This phase can also be used to pre-sort shards based on the maximum value in each shard of the provided primary sort.
jimczi marked this conversation as resolved.
Show resolved Hide resolved
* When the query primary sort is perform on a field, this phase extracts the maximum possible value in each shard and
jimczi marked this conversation as resolved.
Show resolved Hide resolved
* sort them according to the provided order. This can be useful for instance to ensure that shards that contain recent
* data are executed first.
jimczi marked this conversation as resolved.
Show resolved Hide resolved
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -58,26 +70,26 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchService.CanMatchResponse> listener) {
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}

private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
Expand All @@ -86,6 +98,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
SearchSourceBuilder source = getRequest().source();
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
Expand All @@ -94,24 +107,59 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
iter.resetAndSkip();
}
}
return shardsIts;
if (shouldSortShards(results.sortValues) == false) {
return shardsIts;
}
int sortMul = FieldSortBuilder.getPrimaryFieldSortOrNull(source).order() == SortOrder.ASC ? 1 : -1;
jimczi marked this conversation as resolved.
Show resolved Hide resolved
return new GroupShardsIterator<>(sortShards(shardsIts, results.sortValues, sortMul), false);
}

private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
Comparable[] sortValues,
int sortMul) {
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted((a, b) -> compareShardSortValues(shardsIts.get(a).shardId(), shardsIts.get(b).shardId(),
sortValues[a], sortValues[b], sortMul))
.map(ord -> shardsIts.get(ord))
.collect(Collectors.toList());
}

private static final class BitSetSearchPhaseResults extends SearchPhaseResults<SearchService.CanMatchResponse> {
private static boolean shouldSortShards(Comparable[] sortValues) {
return Arrays.stream(sortValues).anyMatch(e -> e != null);
}

static int compareShardSortValues(ShardId shard1, ShardId shard2, Comparable v1, Comparable v2, int sortMul) {
final int cmp;
if (v1 == v2) {
cmp = 0;
} else if (v1 == null || v2 == null) {
// sort null values last
if (v1 == null) {
cmp = -1;
} else {
cmp = 1;
}
} else {
cmp = v1.compareTo(v2) * sortMul;
}
return cmp != 0 ? cmp : shard1.compareTo(shard2);
}

private static final class CanMatchSearchPhaseResults extends SearchPhaseResults<CanMatchResponse> {
private final FixedBitSet possibleMatches;
private final Comparable[] sortValues;
private int numPossibleMatches;

BitSetSearchPhaseResults(int size) {
CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
sortValues = new Comparable[size];
}

@Override
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
void consumeResult(CanMatchResponse result) {
consumeResult(result.getShardIndex(), result.canMatch(), result.sortValue());
}

@Override
Expand All @@ -120,10 +168,18 @@ boolean hasResult(int shardIndex) {
}

@Override
synchronized void consumeShardFailure(int shardIndex) {
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
consumeResult(shardIndex, true, null);
}

synchronized void consumeResult(int shardIndex, boolean canMatch, Comparable sortValue) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
sortValues[shardIndex] = sortValue;

}


Expand All @@ -136,7 +192,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
Stream<CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {

private final int requestIndex;
final int requestIndex;
private final SearchShardTarget searchShardTarget;

protected SearchActionListener(SearchShardTarget searchShardTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -616,8 +617,8 @@ private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
(SearchService.canRewriteToMatchNone(source) || FieldSortBuilder.hasPrimaryFieldSort(source)) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
}

static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
this(iterators, true);
}

/**
* Constructs a enw GroupShardsIterator from the given list.
jimczi marked this conversation as resolved.
Show resolved Hide resolved
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
this.iterators = iterators;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public static final class DateFieldType extends MappedFieldType {
protected DateMathParser dateMathParser;
protected Resolution resolution;

DateFieldType() {
public DateFieldType() {
super();
setTokenized(false);
setHasDocValues(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public final String typeName() {
return name;
}
/** Get the associated numeric type */
final NumericType numericType() {
public final NumericType numericType() {
return numericType;
}
public abstract Query termQuery(String field, Object value);
Expand Down
33 changes: 27 additions & 6 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.rescore.RescorerBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -1012,7 +1015,7 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
* This method can have false positives while if it returns <code>false</code> the query won't match any documents on the current
* shard.
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
Expand All @@ -1022,18 +1025,20 @@ public boolean canMatch(ShardSearchRequest request) throws IOException {
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, false);
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
Comparable sortValue = sortBuilder != null ? FieldSortBuilder.getMaxSortValueOrNull(context, sortBuilder) : null;
if (canRewriteToMatchNone(request.source())) {
QueryBuilder queryBuilder = request.source().query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
return new CanMatchResponse(queryBuilder instanceof MatchNoneQueryBuilder == false, sortValue);
}
return true; // null query means match_all
// null query means match_all
return new CanMatchResponse(true, sortValue);
}
}


public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(new CanMatchResponse(canMatch(request)));
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
Expand All @@ -1052,6 +1057,7 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}


/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
Expand Down Expand Up @@ -1087,24 +1093,39 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce

public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final Comparable sortValue;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
sortValue = Lucene.readSortValue(in);
} else {
sortValue = null;
}
}

public CanMatchResponse(boolean canMatch) {
public CanMatchResponse(boolean canMatch, @Nullable Comparable sortValue) {
this.canMatch = canMatch;
this.sortValue = sortValue;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
Lucene.writeSortValue(out, sortValue);
}
}

public boolean canMatch() {
return canMatch;
}

@Nullable
public Comparable sortValue() {
return sortValue;
}
}

/**
Expand Down