Skip to content

Commit

Permalink
Optimize sort on long field (#48804)
Browse files Browse the repository at this point in the history
* Optimize sort on numeric long and date fields (#39770)

Optimize sort on numeric long and date fields, when 
the system property `es.search.long_sort_optimized` is true.

* Skip optimization if the index has duplicate data (#43121)

Skip sort optimization if the index has 50% or more data
with the same value.
When index has a lot of docs with the same value, sort
optimization doesn't make sense, as DistanceFeatureQuery
will produce same scores for these docs, and Lucene
will use the second sort to tie-break. This could be slower
than usual sorting.

* Sort leaves on search according to the primary numeric sort field (#44021)

This change pre-sort the index reader leaves (segment) prior to search
when the primary sort is a numeric field eligible to the distance feature
optimization. It also adds a tie breaker on `_doc` to the rewritten sort
in order to bypass the fact that leaves will be collected in a random order.
I ran this patch on the http_logs benchmark and the results are very promising:

```
|                                       50th percentile latency | desc_sort_timestamp |    220.706 |      136544 |   136324 |     ms |
|                                       90th percentile latency | desc_sort_timestamp |    244.847 |      162084 |   161839 |     ms |
|                                       99th percentile latency | desc_sort_timestamp |    316.627 |      172005 |   171688 |     ms |
|                                      100th percentile latency | desc_sort_timestamp |    335.306 |      173325 |   172989 |     ms |
|                                  50th percentile service time | desc_sort_timestamp |    218.369 |     1968.11 |  1749.74 |     ms |
|                                  90th percentile service time | desc_sort_timestamp |    244.182 |      2447.2 |  2203.02 |     ms |
|                                  99th percentile service time | desc_sort_timestamp |    313.176 |     2950.85 |  2637.67 |     ms |
|                                 100th percentile service time | desc_sort_timestamp |    332.924 |     2959.38 |  2626.45 |     ms |
|                                                    error rate | desc_sort_timestamp |          0 |           0 |        0 |      % |
|                                                Min Throughput |  asc_sort_timestamp |   0.801824 |    0.800855 | -0.00097 |  ops/s |
|                                             Median Throughput |  asc_sort_timestamp |   0.802595 |    0.801104 | -0.00149 |  ops/s |
|                                                Max Throughput |  asc_sort_timestamp |   0.803282 |    0.801351 | -0.00193 |  ops/s |
|                                       50th percentile latency |  asc_sort_timestamp |    220.761 |     824.098 |  603.336 |     ms |
|                                       90th percentile latency |  asc_sort_timestamp |    251.741 |     853.984 |  602.243 |     ms |
|                                       99th percentile latency |  asc_sort_timestamp |    368.761 |     893.943 |  525.182 |     ms |
|                                      100th percentile latency |  asc_sort_timestamp |    431.042 |      908.85 |  477.808 |     ms |
|                                  50th percentile service time |  asc_sort_timestamp |    218.547 |     820.757 |  602.211 |     ms |
|                                  90th percentile service time |  asc_sort_timestamp |    249.578 |     849.886 |  600.308 |     ms |
|                                  99th percentile service time |  asc_sort_timestamp |    366.317 |     888.894 |  522.577 |     ms |
|                                 100th percentile service time |  asc_sort_timestamp |    430.952 |     908.401 |   477.45 |     ms |
|                                                    error rate |  asc_sort_timestamp |          0 |           0 |        0 |      % |
```

So roughly 10x faster for the descending sort and 2-3x faster in the ascending case. Note
that I indexed the http_logs with a single client in order to simulate real time-based indices
where document are indexed in their timestamp order.

Relates #37043

* Remove nested collector in docs response

As we don't use cancellableCollector anymore, it should be removed from
the expected docs response.

* Use collector manager for search when necessary (#45829)

When we optimize sort, we sort segments by their min/max value.
As a collector expects to have segments in order,
we can not use a single collector for sorted segments.
Thus for such a case, we use collectorManager,
where for every segment a dedicated collector will be created.

* Use shared TopFieldCollector manager

Use shared TopFieldCollector manager for sort optimization.
This collector manager is able to exchange minimum competitive
score between collectors

* Correct calculation of avg value to avoid overflow

* Optimize calculating if index has duplicate data
  • Loading branch information
mayya-sharipova committed Nov 26, 2019
1 parent 2ba00c8 commit 79d9b36
Show file tree
Hide file tree
Showing 12 changed files with 767 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ class BuildPlugin implements Plugin<Project> {
// TODO: remove this once ctx isn't added to update script params in 7.0
test.systemProperty 'es.scripting.update.ctx_in_params', 'false'

// TODO: remove this property in 8.0
test.systemProperty 'es.search.rewrite_sort', 'true'

// TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
test.systemProperty 'es.transport.cname_in_publish_address', 'true'

Expand Down
61 changes: 20 additions & 41 deletions docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,9 @@ The API returns the following result:
"rewrite_time": 51443,
"collector": [
{
"name": "CancellableCollector",
"reason": "search_cancelled",
"time_in_nanos": "304311",
"children": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": "32273"
}
]
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": "32273"
}
]
}
Expand Down Expand Up @@ -445,16 +438,9 @@ Looking at the previous example:
--------------------------------------------------
"collector": [
{
"name": "CancellableCollector",
"reason": "search_cancelled",
"time_in_nanos": "304311",
"children": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": "32273"
}
]
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": "32273"
}
]
--------------------------------------------------
Expand Down Expand Up @@ -657,33 +643,26 @@ The API returns the following result:
"rewrite_time": 7208,
"collector": [
{
"name": "CancellableCollector",
"reason": "search_cancelled",
"time_in_nanos": 2390,
"name": "MultiCollector",
"reason": "search_multi",
"time_in_nanos": 1820,
"children": [
{
"name": "MultiCollector",
"reason": "search_multi",
"time_in_nanos": 1820,
"name": "FilteredCollector",
"reason": "search_post_filter",
"time_in_nanos": 7735,
"children": [
{
"name": "FilteredCollector",
"reason": "search_post_filter",
"time_in_nanos": 7735,
"children": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": 1328
}
]
},
{
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"reason": "aggregation",
"time_in_nanos": 8273
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time_in_nanos": 1328
}
]
},
{
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"reason": "aggregation",
"time_in_nanos": 8273
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConjunctionDISI;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
Expand All @@ -35,24 +36,31 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CombinedBitSet;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.profile.Timer;
import org.elasticsearch.search.profile.query.ProfileWeight;
import org.elasticsearch.search.profile.query.QueryProfileBreakdown;
import org.elasticsearch.search.profile.query.QueryProfiler;
import org.elasticsearch.search.profile.query.QueryTimingType;
import org.elasticsearch.search.query.QuerySearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -131,12 +139,86 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
}
}

private void checkCancelled() {
if (checkCancelled != null) {
checkCancelled.run();
}
}

public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
final List<Collector> collectors = new ArrayList<>(leaves.size());
for (LeafReaderContext ctx : leaves) {
final Collector collector = manager.newCollector();
searchLeaf(ctx, weight, collector);
collectors.add(collector);
}
TopFieldDocs mergedTopDocs = (TopFieldDocs) manager.reduce(collectors);
// Lucene sets shards indexes during merging of topDocs from different collectors
// We need to reset shard index; ES will set shard index later during reduce stage
for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) {
scoreDoc.shardIndex = -1;
}
if (totalHits != null) { // we have already precalculated totalHits for the whole index
mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields);
}
result.topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), formats);
}

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
final Weight cancellableWeight;
if (checkCancelled != null) {
cancellableWeight = new Weight(weight.getQuery()) {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
}
}

/**
* Lower-level search API.
*
* {@link LeafCollector#collect(int)} is called for every matching document in
* the provided <code>ctx</code>.
*/
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
if (liveDocsBitSet == null) {
BulkScorer bulkScorer = weight.bulkScorer(ctx);
if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
} else {
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> { } : checkCancelled);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}

private Weight wrapWeight(Weight weight) {
if (checkCancelled != null) {
return new Weight(weight.getQuery()) {
@Override
public void extractTerms(Set<Term> terms) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -168,48 +250,10 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
};
} else {
cancellableWeight = weight;
return weight;
}
searchInternal(leaves, cancellableWeight, collector);
}

private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
continue;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
if (liveDocsBitSet == null) {
BulkScorer bulkScorer = weight.bulkScorer(ctx);
if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
} else {
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> {} : checkCancelled);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}
}

private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
if (liveDocs instanceof SparseFixedBitSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class CollectorResult implements ToXContentObject, Writeable {
public static final String REASON_SEARCH_POST_FILTER = "search_post_filter";
public static final String REASON_SEARCH_MIN_SCORE = "search_min_score";
public static final String REASON_SEARCH_MULTI = "search_multi";
public static final String REASON_SEARCH_TIMEOUT = "search_timeout";
public static final String REASON_SEARCH_CANCELLED = "search_cancelled";
public static final String REASON_AGGREGATION = "aggregation";
public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global";

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
import org.elasticsearch.tasks.TaskCancelledException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_CANCELLED;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
Expand Down Expand Up @@ -150,18 +147,6 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
};
}

/**
* Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
*/
static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier cancelled) {
return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
@Override
Collector create(Collector in) throws IOException {
return new CancellableCollector(cancelled, in);
}
};
}

/**
* Creates collector limiting the collection to the first <code>numHits</code> documents
*/
Expand Down
Loading

0 comments on commit 79d9b36

Please sign in to comment.