Skip to content

Commit

Permalink
Shortcut query phase using the results of other shards (#51852)
Browse files Browse the repository at this point in the history
This commit, built on top of #51708, allows to modify shard search requests based on informations collected on other shards. It is intended to speed up sorted queries on time-based indices. For queries that are only interested in the top documents.

This change will rewrite the shard queries to match none if the bottom sort value computed in prior shards is better than all values in the shard.
For queries that mix top documents and aggregations this change will reset the size of the top documents to 0 instead of rewriting to match none.
This means that we don't need to keep a search context open for this shard since we know in advance that it doesn't contain any competitive hit.
  • Loading branch information
jimczi committed Mar 17, 2020
1 parent f65e4d6 commit ff94792
Show file tree
Hide file tree
Showing 18 changed files with 1,036 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
if (request.source().slice() != null) {
throw new IllegalStateException("Can't slice a request that already has a slice configuration");
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
slicedSource = request.source().shallowCopy().slice(sliceBuilder);
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ setup:
mappings:
properties:
created_at:
type: date
type: date_nanos
format: "yyyy-MM-dd"
- do:
indices.create:
Expand Down Expand Up @@ -154,10 +154,9 @@ setup:
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -166,12 +165,11 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- match: { hits.total.value: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -180,5 +178,47 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- match: { hits.total.value: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

# check field sort is correct when skipping query phase
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }]

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: { hits.hits.0._id: "3" }

# same with aggs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }]
"aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: {hits.hits.0._id: "3" }
- length: { aggregations.idx_terms.buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand Down Expand Up @@ -467,7 +467,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg
* @param result the result returned form the shard
* @param shardIt the shard iterator
*/
private void onShardResult(Result result, SearchShardIterator shardIt) {
protected void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValuesAndFormats;

/**
* Utility class to keep track of the bottom doc's sort values in a distributed search.
*/
class BottomSortValuesCollector {
private final int topNSize;
private final SortField[] sortFields;
private final FieldComparator[] comparators;
private final int[] reverseMuls;

private volatile long totalHits;
private volatile SearchSortValuesAndFormats bottomSortValues;

BottomSortValuesCollector(int topNSize, SortField[] sortFields) {
this.topNSize = topNSize;
this.comparators = new FieldComparator[sortFields.length];
this.reverseMuls = new int[sortFields.length];
this.sortFields = sortFields;
for (int i = 0; i < sortFields.length; i++) {
comparators[i] = sortFields[i].getComparator(1, i);
reverseMuls[i] = sortFields[i].getReverse() ? -1 : 1;
}
}

long getTotalHits() {
return totalHits;
}

/**
* @return The best bottom sort values consumed so far.
*/
SearchSortValuesAndFormats getBottomSortValues() {
return bottomSortValues;
}

synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
totalHits += topDocs.totalHits.value;
if (validateShardSortFields(topDocs.fields) == false) {
return;
}

FieldDoc shardBottomDoc = extractBottom(topDocs);
if (shardBottomDoc == null) {
return;
}
if (bottomSortValues == null
|| compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
}
}

/**
* @return <code>false</code> if the provided {@link SortField} array differs
* from the initial {@link BottomSortValuesCollector#sortFields}.
*/
private boolean validateShardSortFields(SortField[] shardSortFields) {
for (int i = 0; i < shardSortFields.length; i++) {
if (shardSortFields[i].equals(sortFields[i]) == false) {
// ignore shards response that would make the sort incompatible
// (e.g.: mixing keyword/numeric or long/double).
// TODO: we should fail the entire request because the topdocs
// merge will likely fail later but this is not possible with
// the current async logic that only allows shard failures here.
return false;
}
}
return true;
}

private FieldDoc extractBottom(TopFieldDocs topDocs) {
return topNSize > 0 && topDocs.scoreDocs.length == topNSize ?
(FieldDoc) topDocs.scoreDocs[topNSize-1] : null;
}

private int compareValues(Object[] v1, Object[] v2) {
for (int i = 0; i < v1.length; i++) {
int cmp = reverseMuls[i] * comparators[i].compareValues(v1[i], v2[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand Down Expand Up @@ -724,15 +725,6 @@ int getNumBuffered() {
int getNumReducePhases() { return numReducePhases; }
}

private int resolveTrackTotalHits(SearchRequest request) {
if (request.scroll() != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo();
}

/**
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
Expand All @@ -743,7 +735,7 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final int trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = requestToAggReduceContextBuilder.apply(request);
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
Expand All @@ -28,18 +29,28 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;

class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {

private final SearchPhaseController searchPhaseController;
private final SearchProgressListener progressListener;

// informations to track the best bottom top doc globally.
private final int topDocsSize;
private final int trackTotalHitsUpTo;
private volatile BottomSortValuesCollector bottomSortCollector;

SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
Expand All @@ -53,27 +64,64 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
executor, request, listener, shardsIts, timeProvider, clusterState, task,
searchPhaseController.newSearchPhaseResults(task.getProgressListener(), request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
this.searchPhaseController = searchPhaseController;
this.progressListener = task.getProgressListener();
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
request, getTask(), listener);
}

@Override
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
}

@Override
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
QuerySearchResult queryResult = result.queryResult();
if (queryResult.isNull() == false && queryResult.topDocs().topDocs instanceof TopFieldDocs) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
if (bottomSortCollector == null) {
synchronized (this) {
if (bottomSortCollector == null) {
bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
}
}
}
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
}
super.onShardResult(result, shardIt);
}

@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context, clusterState());
}

private ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
if (bottomSortCollector == null) {
return request;
}

// disable tracking total hits if we already reached the required estimation.
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE
&& bottomSortCollector.getTotalHits() > trackTotalHitsUpTo) {
request.source(request.source().shallowCopy().trackTotalHits(false));
}

// set the current best bottom field doc
if (bottomSortCollector.getBottomSortValues() != null) {
request.setBottomSortValues(bottomSortCollector.getBottomSortValues());
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,19 @@ public boolean isSuggestOnly() {
return source != null && source.isSuggestOnly();
}

public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}

public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder source) {
if (scroll != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return source == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo();
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public DateMathParser getDateMathParser() {
return parser;
}

@Override
public String format(long value) {
return formatter.format(resolution.toInstant(value).atZone(timeZone));
Expand Down
Loading

0 comments on commit ff94792

Please sign in to comment.