diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java index fbcb3d51efac3..6f1948ad70167 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field, if (request.source().slice() != null) { throw new IllegalStateException("Can't slice a request that already has a slice configuration"); } - slicedSource = request.source().copyWithNewSlice(sliceBuilder); + slicedSource = request.source().shallowCopy().slice(sliceBuilder); } SearchRequest searchRequest = new SearchRequest(request); searchRequest.source(slicedSource); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml index 636f3d88e10cf..15cea782c4679 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml @@ -19,7 +19,7 @@ setup: mappings: properties: created_at: - type: date + type: date_nanos format: "yyyy-MM-dd" - do: indices.create: @@ -154,10 +154,9 @@ setup: - match: { hits.total: 2 } - length: { aggregations.idx_terms.buckets: 2 } - # check that empty responses are correctly handled when rewriting to match_no_docs + # check that empty responses are correctly handled when rewriting to match_no_docs - do: search: - rest_total_hits_as_int: true # ensure that one shard can return empty response max_concurrent_shard_requests: 1 body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } @@ -166,12 +165,11 @@ setup: - match: { _shards.successful: 3 } - match: { _shards.skipped : 0 } - match: { _shards.failed: 0 } - - match: { hits.total: 2 } + - match: { hits.total.value: 2 } - length: { aggregations.idx_terms.buckets: 2 } - do: search: - rest_total_hits_as_int: true # ensure that one shard can return empty response max_concurrent_shard_requests: 2 body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } @@ -180,5 +178,47 @@ setup: - match: { _shards.successful: 3 } - match: { _shards.skipped : 0 } - match: { _shards.failed: 0 } - - match: { hits.total: 0 } + - match: { hits.total.value: 0 } - length: { aggregations.idx_terms.buckets: 0 } + + # check field sort is correct when skipping query phase + - do: + search: + # ensure that one shard can return empty response + max_concurrent_shard_requests: 1 + pre_filter_shard_size: 1 + body: + "size": 1 + "track_total_hits": 1 + "sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }] + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped: 0 } + - match: { _shards.failed: 0 } + - match: { hits.total.value: 1 } + - match: { hits.total.relation: "gte" } + - length: { hits.hits: 1 } + - match: { hits.hits.0._id: "3" } + + # same with aggs + - do: + search: + # ensure that one shard can return empty response + max_concurrent_shard_requests: 1 + pre_filter_shard_size: 1 + body: + "size": 1 + "track_total_hits": 1 + "sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }] + "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped: 0 } + - match: { _shards.failed: 0 } + - match: { hits.total.value: 1 } + - match: { hits.total.relation: "gte" } + - length: { hits.hits: 1 } + - match: {hits.hits.0._id: "3" } + - length: { aggregations.idx_terms.buckets: 3 } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 1d2339dd9e6fe..f6f90cc7ccbb3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction exten **/ private final BiFunction nodeIdToConnection; private final SearchTask task; - private final SearchPhaseResults results; + final SearchPhaseResults results; private final ClusterState clusterState; private final Map aliasFilter; private final Map concreteIndexBoosts; @@ -467,7 +467,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg * @param result the result returned form the shard * @param shardIt the shard iterator */ - private void onShardResult(Result result, SearchShardIterator shardIt) { + protected void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; successfulOps.incrementAndGet(); diff --git a/server/src/main/java/org/elasticsearch/action/search/BottomSortValuesCollector.java b/server/src/main/java/org/elasticsearch/action/search/BottomSortValuesCollector.java new file mode 100644 index 0000000000000..06df5cd6123b7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/BottomSortValuesCollector.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopFieldDocs; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchSortValuesAndFormats; + +/** + * Utility class to keep track of the bottom doc's sort values in a distributed search. + */ +class BottomSortValuesCollector { + private final int topNSize; + private final SortField[] sortFields; + private final FieldComparator[] comparators; + private final int[] reverseMuls; + + private volatile long totalHits; + private volatile SearchSortValuesAndFormats bottomSortValues; + + BottomSortValuesCollector(int topNSize, SortField[] sortFields) { + this.topNSize = topNSize; + this.comparators = new FieldComparator[sortFields.length]; + this.reverseMuls = new int[sortFields.length]; + this.sortFields = sortFields; + for (int i = 0; i < sortFields.length; i++) { + comparators[i] = sortFields[i].getComparator(1, i); + reverseMuls[i] = sortFields[i].getReverse() ? -1 : 1; + } + } + + long getTotalHits() { + return totalHits; + } + + /** + * @return The best bottom sort values consumed so far. + */ + SearchSortValuesAndFormats getBottomSortValues() { + return bottomSortValues; + } + + synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) { + totalHits += topDocs.totalHits.value; + if (validateShardSortFields(topDocs.fields) == false) { + return; + } + + FieldDoc shardBottomDoc = extractBottom(topDocs); + if (shardBottomDoc == null) { + return; + } + if (bottomSortValues == null + || compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) { + bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat); + } + } + + /** + * @return false if the provided {@link SortField} array differs + * from the initial {@link BottomSortValuesCollector#sortFields}. + */ + private boolean validateShardSortFields(SortField[] shardSortFields) { + for (int i = 0; i < shardSortFields.length; i++) { + if (shardSortFields[i].equals(sortFields[i]) == false) { + // ignore shards response that would make the sort incompatible + // (e.g.: mixing keyword/numeric or long/double). + // TODO: we should fail the entire request because the topdocs + // merge will likely fail later but this is not possible with + // the current async logic that only allows shard failures here. + return false; + } + } + return true; + } + + private FieldDoc extractBottom(TopFieldDocs topDocs) { + return topNSize > 0 && topDocs.scoreDocs.length == topNSize ? + (FieldDoc) topDocs.scoreDocs[topNSize-1] : null; + } + + private int compareValues(Object[] v1, Object[] v2) { + for (int i = 0; i < v1.length; i++) { + int cmp = reverseMuls[i] * comparators[i].compareValues(v1[i], v2[i]); + if (cmp != 0) { + return cmp; + } + } + return 0; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index d8a3e26b8d82b..f2cf6b199dcbe 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -478,7 +478,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection> suggestion : result.suggest()) { @@ -724,15 +725,6 @@ int getNumBuffered() { int getNumReducePhases() { return numReducePhases; } } - private int resolveTrackTotalHits(SearchRequest request) { - if (request.scroll() != null) { - // no matter what the value of track_total_hits is - return SearchContext.TRACK_TOTAL_HITS_ACCURATE; - } - return request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo() == null ? - SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo(); - } - /** * Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally. */ @@ -743,7 +735,7 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL boolean isScrollRequest = request.scroll() != null; final boolean hasAggs = source != null && source.aggregations() != null; final boolean hasTopDocs = source == null || source.size() != 0; - final int trackTotalHitsUpTo = resolveTrackTotalHits(request); + final int trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo(); InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = requestToAggReduceContextBuilder.apply(request); if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 24345c606003a..e42d8405da5b0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.TopFieldDocs; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -28,6 +29,9 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.Transport; import java.util.Map; @@ -35,11 +39,18 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; -final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { +import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize; + +class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; private final SearchProgressListener progressListener; + // informations to track the best bottom top doc globally. + private final int topDocsSize; + private final int trackTotalHitsUpTo; + private volatile BottomSortValuesCollector bottomSortCollector; + SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, @@ -53,9 +64,10 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { + ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), - buildShardSearchRequest(shardIt), getTask(), listener); + request, getTask(), listener); } @Override @@ -72,8 +85,43 @@ protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget progressListener.notifyQueryFailure(shardIndex, shardTarget, exc); } + @Override + protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) { + QuerySearchResult queryResult = result.queryResult(); + if (queryResult.isNull() == false && queryResult.topDocs().topDocs instanceof TopFieldDocs) { + TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs; + if (bottomSortCollector == null) { + synchronized (this) { + if (bottomSortCollector == null) { + bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields); + } + } + } + bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats()); + } + super.onShardResult(result, shardIt); + } + @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context, clusterState()); } + + private ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) { + if (bottomSortCollector == null) { + return request; + } + + // disable tracking total hits if we already reached the required estimation. + if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE + && bottomSortCollector.getTotalHits() > trackTotalHitsUpTo) { + request.source(request.source().shallowCopy().trackTotalHits(false)); + } + + // set the current best bottom field doc + if (bottomSortCollector.getBottomSortValues() != null) { + request.setBottomSortValues(bottomSortCollector.getBottomSortValues()); + } + return request; + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 96206aa4bcd58..761a50bfa4168 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -558,6 +558,19 @@ public boolean isSuggestOnly() { return source != null && source.isSuggestOnly(); } + public int resolveTrackTotalHitsUpTo() { + return resolveTrackTotalHitsUpTo(scroll, source); + } + + public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder source) { + if (scroll != null) { + // no matter what the value of track_total_hits is + return SearchContext.TRACK_TOTAL_HITS_ACCURATE; + } + return source == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo() == null ? + SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo(); + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { // generating description in a lazy way since source can be quite big diff --git a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java index 7f681bfa47c92..5b45cd660697a 100644 --- a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java +++ b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java @@ -230,6 +230,10 @@ public void writeTo(StreamOutput out) throws IOException { } } + public DateMathParser getDateMathParser() { + return parser; + } + @Override public String format(long value) { return formatter.format(resolution.toInstant(value).atZone(timeZone)); diff --git a/server/src/main/java/org/elasticsearch/search/SearchSortValuesAndFormats.java b/server/src/main/java/org/elasticsearch/search/SearchSortValuesAndFormats.java new file mode 100644 index 0000000000000..f5f80836cde8f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/SearchSortValuesAndFormats.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lucene.Lucene; + +import java.io.IOException; +import java.util.Arrays; + +public class SearchSortValuesAndFormats implements Writeable { + private final Object[] rawSortValues; + private final Object[] formattedSortValues; + private final DocValueFormat[] sortValueFormats; + + public SearchSortValuesAndFormats(Object[] rawSortValues, DocValueFormat[] sortValueFormats) { + assert rawSortValues.length == sortValueFormats.length; + this.rawSortValues = rawSortValues; + this.sortValueFormats = sortValueFormats; + this.formattedSortValues = Arrays.copyOf(rawSortValues, rawSortValues.length); + for (int i = 0; i < rawSortValues.length; ++i) { + Object sortValue = rawSortValues[i]; + if (sortValue instanceof BytesRef) { + this.formattedSortValues[i] = sortValueFormats[i].format((BytesRef) sortValue); + } else if (sortValue instanceof Long) { + this.formattedSortValues[i] = sortValueFormats[i].format((long) sortValue); + } else if (sortValue instanceof Double) { + this.formattedSortValues[i] = sortValueFormats[i].format((double) sortValue); + } else if (sortValue instanceof Float || sortValue instanceof Integer) { + // sort by _score or _doc + this.formattedSortValues[i] = sortValue; + } else { + assert sortValue == null : "Sort values must be a BytesRef, Long, Integer, Double or Float, but got " + + sortValue.getClass() + ": " + sortValue; + this.formattedSortValues[i] = sortValue; + } + } + } + + public SearchSortValuesAndFormats(StreamInput in) throws IOException { + this.rawSortValues = in.readArray(Lucene::readSortValue, Object[]::new); + this.formattedSortValues = in.readArray(Lucene::readSortValue, Object[]::new); + this.sortValueFormats = new DocValueFormat[formattedSortValues.length]; + for (int i = 0; i < sortValueFormats.length; ++i) { + sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeArray(Lucene::writeSortValue, rawSortValues); + out.writeArray(Lucene::writeSortValue, formattedSortValues); + for (int i = 0; i < sortValueFormats.length; i++) { + out.writeNamedWriteable(sortValueFormats[i]); + } + } + + public Object[] getRawSortValues() { + return rawSortValues; + } + + public Object[] getFormattedSortValues() { + return formattedSortValues; + } + + public DocValueFormat[] getSortValueFormats() { + return sortValueFormats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchSortValuesAndFormats that = (SearchSortValuesAndFormats) o; + return Arrays.equals(rawSortValues, that.rawSortValues) && + Arrays.equals(formattedSortValues, that.formattedSortValues) && + Arrays.equals(sortValueFormats, that.sortValueFormats); + } + + @Override + public int hashCode() { + int result = Arrays.hashCode(rawSortValues); + result = 31 * result + Arrays.hashCode(formattedSortValues); + result = 31 * result + Arrays.hashCode(sortValueFormats); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index e95845b225164..14c61311bddf6 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -959,13 +959,13 @@ public SearchSourceBuilder rewrite(QueryRewriteContext context) throws IOExcepti /** * Create a shallow copy of this builder with a new slice configuration. */ - public SearchSourceBuilder copyWithNewSlice(SliceBuilder slice) { - return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice, sorts, rescoreBuilders, highlightBuilder); + public SearchSourceBuilder shallowCopy() { + return shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder, sorts, rescoreBuilders, highlightBuilder); } /** * Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@link #sliceBuilder}. Used by - * {@link #rewrite(QueryRewriteContext)} and {@link #copyWithNewSlice(SliceBuilder)}. + * {@link #rewrite(QueryRewriteContext)}}. */ @SuppressWarnings("rawtypes") private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder, diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 412b53f2d2e3e..6c1fc7e959b40 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -39,15 +39,19 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchNoneQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.AliasFilterParsingException; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.search.SearchSortValuesAndFormats; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; @@ -57,6 +61,8 @@ import java.util.Map; import java.util.function.Function; +import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED; + /** * Shard level request that represents a search. * It provides all the methods that the {@link SearchContext} needs. @@ -77,6 +83,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final OriginalIndices originalIndices; private boolean canReturnNullResponseIfMatchNoDocs; + private SearchSortValuesAndFormats bottomSortValues; //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; @@ -175,6 +182,11 @@ public ShardSearchRequest(StreamInput in) throws IOException { } else { canReturnNullResponseIfMatchNoDocs = false; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + bottomSortValues = in.readOptionalWriteable(SearchSortValuesAndFormats::new); + } else { + bottomSortValues = null; + } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -212,6 +224,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce if (out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeBoolean(canReturnNullResponseIfMatchNoDocs); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(bottomSortValues); + } } @Override @@ -286,6 +301,20 @@ public String preference() { return preference; } + /** + * Sets the bottom sort values that can be used by the searcher to filter documents + * that are after it. This value is computed by coordinating nodes that throttles the + * query phase. After a partial merge of successful shards the sort values of the + * bottom top document are passed as an hint on subsequent shard requests. + */ + public void setBottomSortValues(SearchSortValuesAndFormats values) { + this.bottomSortValues = values; + } + + public SearchSortValuesAndFormats getBottomSortValues() { + return bottomSortValues; + } + /** * Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}. * Defaults to false since the coordinator node needs at least one shard response to build the global @@ -343,6 +372,27 @@ static class RequestRewritable implements Rewriteable { public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException { SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx); AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx); + + QueryShardContext shardContext = ctx.convertToShardContext(); + + FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource); + if (shardContext != null + && primarySort != null + && primarySort.isBottomSortShardDisjoint(shardContext, request.getBottomSortValues())) { + assert newSource != null : "source should contain a primary sort field"; + newSource = newSource.shallowCopy(); + int trackTotalHitsUpTo = SearchRequest.resolveTrackTotalHitsUpTo(request.scroll, request.source); + if (trackTotalHitsUpTo == TRACK_TOTAL_HITS_DISABLED + && newSource.suggest() == null + && newSource.aggregations() == null) { + newSource.query(new MatchNoneQueryBuilder()); + } else { + newSource.size(0); + } + request.source(newSource); + request.setBottomSortValues(null); + } + if (newSource == request.source() && newAliasFilter == request.getAliasFilter()) { return this; } else { diff --git a/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java b/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java index f912d1e99129a..3f8ef30863584 100644 --- a/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java +++ b/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java @@ -99,6 +99,7 @@ boolean shouldRescore() { } static class EmptyTopDocsCollectorContext extends TopDocsCollectorContext { + private final Sort sort; private final Collector collector; private final Supplier hitCountSupplier; @@ -109,9 +110,13 @@ static class EmptyTopDocsCollectorContext extends TopDocsCollectorContext { * @param trackTotalHitsUpTo True if the total number of hits should be tracked * @param hasFilterCollector True if the collector chain contains a filter */ - private EmptyTopDocsCollectorContext(IndexReader reader, Query query, - int trackTotalHitsUpTo, boolean hasFilterCollector) throws IOException { + private EmptyTopDocsCollectorContext(IndexReader reader, + Query query, + @Nullable SortAndFormats sortAndFormats, + int trackTotalHitsUpTo, + boolean hasFilterCollector) throws IOException { super(REASON_SEARCH_COUNT, 0); + this.sort = sortAndFormats == null ? null : sortAndFormats.sort; if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) { this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0, false); // for bwc hit count is set to 0, it will be converted to -1 by the coordinating node @@ -147,7 +152,13 @@ Collector create(Collector in) { @Override void postProcess(QuerySearchResult result) { final TotalHits totalHitCount = hitCountSupplier.get(); - result.topDocs(new TopDocsAndMaxScore(new TopDocs(totalHitCount, Lucene.EMPTY_SCORE_DOCS), Float.NaN), null); + final TopDocs topDocs; + if (sort != null) { + topDocs = new TopFieldDocs(totalHitCount, Lucene.EMPTY_SCORE_DOCS, sort.getSort()); + } else { + topDocs = new TopDocs(totalHitCount, Lucene.EMPTY_SCORE_DOCS); + } + result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), null); } } @@ -421,7 +432,8 @@ static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searc final int totalNumDocs = Math.max(1, reader.numDocs()); if (searchContext.size() == 0) { // no matter what the value of from is - return new EmptyTopDocsCollectorContext(reader, query, searchContext.trackTotalHitsUpTo(), hasFilterCollector); + return new EmptyTopDocsCollectorContext(reader, query, searchContext.sort(), + searchContext.trackTotalHitsUpTo(), hasFilterCollector); } else if (searchContext.scrollContext() != null) { // we can disable the tracking of total hits after the initial scroll query // since the total hits is preserved in the scroll context. diff --git a/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java index 65a3a4137ed59..044c59d24a809 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java @@ -26,10 +26,12 @@ import org.apache.lucene.index.PointValues; import org.apache.lucene.index.Terms; import org.apache.lucene.search.SortField; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; @@ -50,6 +52,7 @@ import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardException; +import org.elasticsearch.search.SearchSortValuesAndFormats; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -351,6 +354,55 @@ public SortFieldAndFormat build(QueryShardContext context) throws IOException { return new SortFieldAndFormat(field, fieldType.docValueFormat(null, null)); } + public boolean canRewriteToMatchNone() { + return nestedSort == null && (missing == null || "_last".equals(missing)); + } + + /** + * Returns whether some values of the given {@link QueryShardContext#getIndexReader()} are within the + * primary sort value provided in the bottomSortValues. + */ + public boolean isBottomSortShardDisjoint(QueryShardContext context, SearchSortValuesAndFormats bottomSortValues) throws IOException { + if (bottomSortValues == null || bottomSortValues.getRawSortValues().length == 0) { + return false; + } + + if (canRewriteToMatchNone() == false) { + return false; + } + MappedFieldType fieldType = context.fieldMapper(fieldName); + if (fieldType == null) { + // unmapped + return false; + } + if (fieldType.indexOptions() == IndexOptions.NONE) { + return false; + } + DocValueFormat docValueFormat = bottomSortValues.getSortValueFormats()[0]; + final DateMathParser dateMathParser; + if (docValueFormat instanceof DocValueFormat.DateTime) { + if (fieldType instanceof DateFieldType && ((DateFieldType) fieldType).resolution() == NANOSECONDS) { + // we parse the formatted value with the resolution of the local field because + // the provided format can use a different one (date vs date_nanos). + docValueFormat = DocValueFormat.withNanosecondResolution(docValueFormat); + } + dateMathParser = ((DocValueFormat.DateTime) docValueFormat).getDateMathParser(); + } else { + dateMathParser = null; + } + Object bottomSortValue = bottomSortValues.getFormattedSortValues()[0]; + Object minValue = order() == SortOrder.DESC ? bottomSortValue : null; + Object maxValue = order() == SortOrder.DESC ? null : bottomSortValue; + try { + MappedFieldType.Relation relation = fieldType.isFieldWithinQuery(context.getIndexReader(), minValue, maxValue, + true, true, null, dateMathParser, context); + return relation == MappedFieldType.Relation.DISJOINT; + } catch (ElasticsearchParseException exc) { + // can happen if the sort field is mapped differently in another search index + return false; + } + } + @Override public BucketedSort buildBucketedSort(QueryShardContext context, int bucketSize, BucketedSort.ExtraData extra) throws IOException { if (DOC_FIELD_NAME.equals(fieldName)) { diff --git a/server/src/test/java/org/elasticsearch/action/search/BottomSortValuesCollectorTests.java b/server/src/test/java/org/elasticsearch/action/search/BottomSortValuesCollectorTests.java new file mode 100644 index 0000000000000..7c626f33e9c40 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/BottomSortValuesCollectorTests.java @@ -0,0 +1,263 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.time.DateUtils; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.test.ESTestCase; + +import java.time.ZoneId; +import java.util.Arrays; + +import static org.apache.lucene.search.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class BottomSortValuesCollectorTests extends ESTestCase { + public void testWithStrings() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[] { new SortField("foo", SortField.Type.STRING_VAL, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[] { DocValueFormat.RAW }; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newBytesArray("foo", "goo", "hoo")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newBytesArray("bar", "car", "zar")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newBytesArray()), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newBytesArray("tar", "zar", "zzz")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newBytesArray(null, null, "zzz")), sortFormats); + assertThat(collector.getTotalHits(), equalTo(350L)); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.RAW.getClass())); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getFormattedSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(new BytesRef("tar"))); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("tar")); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(new BytesRef("hoo"))); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("hoo")); + } + } + } + + public void testWithLongs() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[] { new SortField("foo", SortField.Type.LONG, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[]{ DocValueFormat.RAW }; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newLongArray(5L, 10L, 15L)), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newLongArray(25L, 350L, 3500L)), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newLongArray(1L, 2L, 3L)), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newLongArray()), sortFormats); + // ignore bottom if we have less top docs than the requested size + collector.consumeTopDocs(createTopDocs(sortFields[0], 1, + newLongArray(-100L)), sortFormats); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getTotalHits(), equalTo(301L)); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.RAW.getClass())); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getFormattedSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(25L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(25L)); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(3L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(3L)); + } + } + } + + public void testWithDoubles() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[]{ new SortField("foo", SortField.Type.LONG, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[] { DocValueFormat.RAW }; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDoubleArray(500d, 5000d, 6755d)), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDoubleArray(0.1d, 1.5d, 3.5d)), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDoubleArray()), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDoubleArray(100d, 101d, 102d)), sortFormats); + // ignore bottom if we have less top docs than the requested size + collector.consumeTopDocs(createTopDocs(sortFields[0], 2, + newDoubleArray(0d, 1d)), sortFormats); + assertThat(collector.getTotalHits(), equalTo(302L)); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.RAW.getClass())); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getFormattedSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(500d)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(500d)); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(3.5d)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(3.5d)); + } + } + } + + public void testWithDates() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[]{ new SortField("foo", SortField.Type.LONG, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[] { + new DocValueFormat.DateTime(DEFAULT_DATE_TIME_FORMATTER, ZoneId.of("UTC"), DateFieldMapper.Resolution.MILLISECONDS)}; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDateArray("2017-06-01T12:18:20Z", "2018-04-03T15:10:27Z", "2013-06-01T13:10:20Z")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDateArray("2018-05-21T08:10:10Z", "2015-02-08T15:12:34Z", "2015-01-01T13:10:30Z")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDateArray()), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDateArray("2019-12-30T07:34:20Z", "2017-03-01T12:10:30Z", "2015-07-09T14:00:30Z")), sortFormats); + assertThat(collector.getTotalHits(), equalTo(300L)); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.DateTime.class)); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(1436450430000L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("2015-07-09T14:00:30.000Z")); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(1522768227000L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("2018-04-03T15:10:27.000Z")); + } + } + } + + public void testWithDateNanos() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[]{ new SortField("foo", SortField.Type.LONG, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[] { + new DocValueFormat.DateTime(DEFAULT_DATE_TIME_FORMATTER, ZoneId.of("UTC"), DateFieldMapper.Resolution.NANOSECONDS)}; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDateNanoArray("2017-06-01T12:18:20Z", "2018-04-03T15:10:27Z", "2013-06-01T13:10:20Z")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newDateNanoArray("2018-05-21T08:10:10Z", "2015-02-08T15:12:34Z", "2015-01-01T13:10:30Z")), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDateNanoArray()), sortFormats); + collector.consumeTopDocs(createTopDocs(sortFields[0], 50, + newDateNanoArray("2019-12-30T07:34:20Z", "2017-03-01T12:10:30Z", "2015-07-09T14:00:30Z")), sortFormats); + assertThat(collector.getTotalHits(), equalTo(300L)); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.DateTime.class)); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(1436450430000000000L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("2015-07-09T14:00:30.000Z")); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(1522768227000000000L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo("2018-04-03T15:10:27.000Z")); + } + } + } + + public void testWithMixedTypes() { + for (boolean reverse : new boolean[] { true, false }) { + SortField[] sortFields = new SortField[] { new SortField("foo", SortField.Type.LONG, reverse) }; + SortField[] otherSortFields = new SortField[] { new SortField("foo", SortField.Type.STRING_VAL, reverse) }; + DocValueFormat[] sortFormats = new DocValueFormat[] { DocValueFormat.RAW }; + BottomSortValuesCollector collector = new BottomSortValuesCollector(3, sortFields); + collector.consumeTopDocs(createTopDocs(sortFields[0], 100, + newLongArray(1000L, 100L, 10L)), sortFormats); + collector.consumeTopDocs(createTopDocs(otherSortFields[0], 50, + newBytesArray("foo", "bar", "zoo")), sortFormats); + assertThat(collector.getTotalHits(), equalTo(150L)); + assertNotNull(collector.getBottomSortValues()); + assertThat(collector.getBottomSortValues().getSortValueFormats().length, equalTo(1)); + assertThat(collector.getBottomSortValues().getSortValueFormats()[0], instanceOf(DocValueFormat.RAW.getClass())); + assertThat(collector.getBottomSortValues().getRawSortValues().length, equalTo(1)); + if (reverse) { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(10L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(10L)); + } else { + assertThat(collector.getBottomSortValues().getRawSortValues()[0], equalTo(1000L)); + assertThat(collector.getBottomSortValues().getFormattedSortValues()[0], equalTo(1000L)); + } + } + } + + private Object[] newDoubleArray(Double... values) { + return values; + } + + private Object[] newLongArray(Long... values) { + return values; + } + + private Object[] newBytesArray(String... values) { + BytesRef[] bytesRefs = new BytesRef[values.length]; + for (int i = 0; i < bytesRefs.length; i++) { + bytesRefs[i] = values[i] == null ? null : new BytesRef(values[i]); + } + return bytesRefs; + } + + private Object[] newDateArray(String... values) { + Long[] longs = new Long[values.length]; + for (int i = 0; i < values.length; i++) { + longs[i] = DEFAULT_DATE_TIME_FORMATTER.parseMillis(values[i]); + } + return longs; + } + + private Object[] newDateNanoArray(String... values) { + Long[] longs = new Long[values.length]; + for (int i = 0; i < values.length; i++) { + longs[i] = DateUtils.toNanoSeconds(DEFAULT_DATE_TIME_FORMATTER.parseMillis(values[i])); + } + return longs; + } + + private TopFieldDocs createTopDocs(SortField sortField, int totalHits, Object[] values) { + FieldDoc[] fieldDocs = new FieldDoc[values.length]; + FieldComparator cmp = sortField.getComparator(1, 0); + for (int i = 0; i < values.length; i++) { + fieldDocs[i] = new FieldDoc(i, Float.NaN, new Object[] { values[i] }); + } + int reverseMul = sortField.getReverse() ? -1 : 1; + Arrays.sort(fieldDocs, (o1, o2) -> reverseMul * cmp.compareValues(o1.fields[0], o2.fields[0])); + return new TopFieldDocs(new TotalHits(totalHits, GREATER_THAN_OR_EQUAL_TO), + fieldDocs, new SortField[] { sortField }); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java new file mode 100644 index 0000000000000..0d4e0ede61167 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.SearchContextId; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.Transport; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; + +public class SearchQueryThenFetchAsyncActionTests extends ESTestCase { + public void testBottomFieldSort() throws InterruptedException { + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + int numShards = randomIntBetween(10, 20); + int numConcurrent = randomIntBetween(1, 4); + AtomicInteger numWithTopDocs = new AtomicInteger(); + AtomicInteger successfulOps = new AtomicInteger(); + AtomicBoolean canReturnNullResponse = new AtomicBoolean(false); + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, + SearchTask task, SearchActionListener listener) { + int shardId = request.shardId().id(); + if (request.canReturnNullResponseIfMatchNoDocs()) { + canReturnNullResponse.set(true); + } + if (request.getBottomSortValues() != null) { + assertNotEquals(shardId, (int) request.getBottomSortValues().getFormattedSortValues()[0]); + numWithTopDocs.incrementAndGet(); + } + QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null, OriginalIndices.NONE)); + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[] { + new FieldDoc(randomInt(1000), Float.NaN, new Object[] { request.shardId().id() }) + }, new SortField[] { sortField }), Float.NaN), + new DocValueFormat[] { DocValueFormat.RAW }); + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + new Thread(() -> listener.onResponse(queryResult)).start(); + } + }; + CountDownLatch latch = new CountDownLatch(1); + GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("idx", + new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS), + numShards, randomBoolean(), primaryNode, replicaNode); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder() + .size(1) + .trackTotalHitsUpTo(2) + .sort(SortBuilders.fieldSort("timestamp"))); + searchRequest.allowPartialSearchResults(false); + SearchPhaseController controller = new SearchPhaseController((b) -> new InternalAggregation.ReduceContextBuilder() { + @Override + public InternalAggregation.ReduceContext forPartialReduction() { + return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null); + } + + public InternalAggregation.ReduceContext forFinalReduction() { + return InternalAggregation.ReduceContext.forFinalReduction( + BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineAggregator.PipelineTree.EMPTY); + }; + }); + SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), Collections.emptyMap(), controller, EsExecutors.newDirectExecutorService(), searchRequest, + null, shardsIter, timeProvider, null, task, + SearchResponse.Clusters.EMPTY) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(numShards)); + assertTrue(canReturnNullResponse.get()); + assertThat(numWithTopDocs.get(), greaterThanOrEqualTo(1)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases, greaterThanOrEqualTo(1)); + assertThat(phase.totalHits.value, equalTo(2L)); + assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + assertThat(phase.sortedTopDocs.scoreDocs.length, equalTo(1)); + assertThat(phase.sortedTopDocs.scoreDocs[0], instanceOf(FieldDoc.class)); + assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields.length, equalTo(1)); + assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields[0], equalTo(0)); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/SearchSortValuesAndFormatsTests.java b/server/src/test/java/org/elasticsearch/search/SearchSortValuesAndFormatsTests.java new file mode 100644 index 0000000000000..2273ab3af0db7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/SearchSortValuesAndFormatsTests.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SearchSortValuesAndFormatsTests extends AbstractWireSerializingTestCase { + private NamedWriteableRegistry namedWriteableRegistry; + + @Before + public void initRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + List entries = new ArrayList<>(); + entries.addAll(searchModule.getNamedWriteables()); + namedWriteableRegistry = new NamedWriteableRegistry(entries); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return namedWriteableRegistry; + } + + @Override + protected Writeable.Reader instanceReader() { + return SearchSortValuesAndFormats::new; + } + + @Override + protected SearchSortValuesAndFormats createTestInstance() { + return randomInstance(); + } + + @Override + protected SearchSortValuesAndFormats mutateInstance(SearchSortValuesAndFormats instance) { + Object[] sortValues = instance.getRawSortValues(); + Object[] newValues = Arrays.copyOf(sortValues, sortValues.length + 1); + DocValueFormat[] newFormats = Arrays.copyOf(instance.getSortValueFormats(), sortValues.length + 1); + newValues[sortValues.length] = randomSortValue(); + newFormats[sortValues.length] = DocValueFormat.RAW; + return new SearchSortValuesAndFormats(newValues, newFormats); + } + + private static Object randomSortValue() { + switch(randomIntBetween(0, 5)) { + case 0: + return null; + case 1: + return new BytesRef(randomAlphaOfLengthBetween(3, 10)); + case 2: + return randomInt(); + case 3: + return randomLong(); + case 4: + return randomFloat(); + case 5: + return randomDouble(); + default: + throw new UnsupportedOperationException(); + } + } + + public static SearchSortValuesAndFormats randomInstance() { + int size = randomIntBetween(1, 20); + Object[] values = new Object[size]; + DocValueFormat[] sortValueFormats = new DocValueFormat[size]; + for (int i = 0; i < size; i++) { + values[i] = randomSortValue(); + sortValueFormats[i] = DocValueFormat.RAW; + } + return new SearchSortValuesAndFormats(values, sortValueFormats); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index 2d0aa2f591d8d..a9759c6298e83 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.search.AbstractSearchTestCase; +import org.elasticsearch.search.SearchSortValuesAndFormatsTests; import java.io.IOException; @@ -94,6 +95,9 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); + if (randomBoolean()) { + req.setBottomSortValues(SearchSortValuesAndFormatsTests.randomInstance()); + } return req; } diff --git a/server/src/test/java/org/elasticsearch/search/sort/FieldSortBuilderTests.java b/server/src/test/java/org/elasticsearch/search/sort/FieldSortBuilderTests.java index b4482e5545503..b457e560c65d4 100644 --- a/server/src/test/java/org/elasticsearch/search/sort/FieldSortBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/sort/FieldSortBuilderTests.java @@ -27,12 +27,14 @@ import org.apache.lucene.document.HalfFloatPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.AssertingIndexSearcher; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSelector; import org.apache.lucene.search.SortedNumericSortField; @@ -60,6 +62,7 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.search.SearchSortValuesAndFormats; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -91,6 +94,8 @@ protected FieldSortBuilder createTestItem() { randomInt()); + + public FieldSortBuilder randomFieldSortBuilder() { String fieldName = rarely() ? FieldSortBuilder.DOC_FIELD_NAME : randomAlphaOfLengthBetween(1, 10); FieldSortBuilder builder = new FieldSortBuilder(fieldName); @@ -565,6 +570,55 @@ public void testGetMaxKeywordValue() throws IOException { } } + public void testIsBottomSortShardDisjoint() throws Exception { + try (Directory dir = newDirectory()) { + int numDocs = randomIntBetween(5, 10); + long maxValue = -1; + long minValue = Integer.MAX_VALUE; + try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir, new KeywordAnalyzer())) { + FieldSortBuilder fieldSort = SortBuilders.fieldSort("custom-date"); + try (DirectoryReader reader = writer.getReader()) { + QueryShardContext context = createMockShardContext(new IndexSearcher(reader)); + assertTrue(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { 0L }, new DocValueFormat[] { DocValueFormat.RAW }))); + } + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + long value = randomLongBetween(1, Integer.MAX_VALUE); + doc.add(new LongPoint("custom-date", value)); + doc.add(new SortedNumericDocValuesField("custom-date", value)); + writer.addDocument(doc); + maxValue = Math.max(maxValue, value); + minValue = Math.min(minValue, value); + } + try (DirectoryReader reader = writer.getReader()) { + QueryShardContext context = createMockShardContext(new IndexSearcher(reader)); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, null)); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { minValue }, new DocValueFormat[] { DocValueFormat.RAW }))); + assertTrue(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { minValue-1 }, new DocValueFormat[] { DocValueFormat.RAW }))); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { minValue+1 }, new DocValueFormat[] { DocValueFormat.RAW }))); + fieldSort.order(SortOrder.DESC); + assertTrue(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { maxValue+1 }, new DocValueFormat[] { DocValueFormat.RAW }))); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { maxValue }, new DocValueFormat[] { DocValueFormat.RAW }))); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { minValue }, new DocValueFormat[] { DocValueFormat.RAW }))); + fieldSort.setNestedSort(new NestedSortBuilder("empty")); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { minValue-1 }, new DocValueFormat[] { DocValueFormat.RAW }))); + fieldSort.setNestedSort(null); + fieldSort.missing("100"); + assertFalse(fieldSort.isBottomSortShardDisjoint(context, + new SearchSortValuesAndFormats(new Object[] { maxValue+1 }, new DocValueFormat[] { DocValueFormat.RAW }))); + } + } + } + } + @Override protected FieldSortBuilder fromXContent(XContentParser parser, String fieldName) throws IOException { return FieldSortBuilder.fromXContent(parser, fieldName);