Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the composite aggregation for match_all and range queries #28745

Merged
merged 24 commits into from Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d46143c
Optimize the composite aggregation for match_all and range queries
jimczi Feb 16, 2018
6a8e866
fix checkstyle
jimczi Feb 20, 2018
0bc679e
handle null point values
jimczi Feb 20, 2018
0581226
restore global ord execution for normal execution
jimczi Feb 20, 2018
8fd25e1
add missing change
jimczi Feb 20, 2018
0d32ab0
fix checkstyle
jimczi Feb 20, 2018
32f0904
fix global ord comparaison
jimczi Feb 20, 2018
0634551
Merge branch 'master' into composite_sort_optim
jimczi Feb 21, 2018
a7f8ffe
add tests for the composite queue and address review comments
jimczi Feb 22, 2018
35c017e
Merge branch 'master' into composite_sort_optim
jimczi Feb 22, 2018
841118c
cosmetics
jimczi Feb 22, 2018
6445f23
adapt heuristic to disable sorted docs producer
jimczi Feb 22, 2018
4437f2e
protect against empty reader
jimczi Feb 22, 2018
cdba4c2
Add missing license
jimczi Feb 22, 2018
93e3345
refactor the composite source to create the sorted docs producer dire…
jimczi Feb 22, 2018
cc6539c
Merge branch 'master' into composite_sort_optim
jimczi Feb 22, 2018
fc91434
fail composite agg that contains an unmapped field and no missing value
jimczi Feb 23, 2018
f9d1eeb
implement deferring collection directly in the collector
jimczi Mar 11, 2018
f2588f2
Merge branch 'master' into composite_sort_optim
jimczi Mar 11, 2018
eb61b02
line len
jimczi Mar 12, 2018
f22dd2a
Merge branch 'master' into composite_sort_optim
jimczi Mar 19, 2018
8bf9703
more javadocs and cleanups
jimczi Mar 19, 2018
e58e540
make sure that the cost is within the integer range when building the…
jimczi Mar 23, 2018
1d71d98
Merge branch 'master' into composite_sort_optim
jimczi Mar 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 0 additions & 85 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Expand Up @@ -545,88 +545,3 @@ GET /_search
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\.//]

==== Index sorting

By default this aggregation runs on every document that match the query.
Though if the index sort matches the composite sort this aggregation can optimize
the execution and can skip documents that contain composite buckets that would not
be part of the response.

For instance the following aggregations:

[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "asc" } } },
{ "product": { "terms": { "field": "product", "order": "asc" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

\... is much faster on an index that uses the following sort:

[source,js]
--------------------------------------------------
PUT twitter
{
"settings" : {
"index" : {
"sort.field" : ["timestamp", "product"],
"sort.order" : ["asc", "asc"]
}
},
"mappings": {
"sales": {
"properties": {
"timestamp": {
"type": "date"
},
"product": {
"type": "keyword"
}
}
}
}
}
--------------------------------------------------
// CONSOLE

WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow
the same order as the aggregation (`desc` or `asc`).

If only the aggregation results are needed it is also better to set the size of the query to 0
and `track_total_hits` to false in order to remove other slowing factors:

[source,js]
--------------------------------------------------
GET /_search
{
"size": 0,
"track_total_hits": false,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": { "field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

See <<index-modules-index-sorting, index sorting>> for more details.
@@ -0,0 +1,198 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.DocIdSetBuilder;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A specialization of {@link DeferringBucketCollector} that collects all
* matches and then, in a second pass, replays the documents that contain a top bucket
* selected during the first pass.
*/
final class BestCompositeBucketsDeferringCollector extends DeferringBucketCollector {
private static class Entry {
final LeafReaderContext context;
final DocIdSet docIdSet;

Entry(LeafReaderContext context, DocIdSet docIdSet) {
this.context = context;
this.docIdSet = docIdSet;
}
}

private final SearchContext searchContext;
private final CompositeValuesCollectorQueue queue;
private final boolean isCollectionSorted;
private final List<Entry> entries = new ArrayList<>();

private BucketCollector collector;
private LeafReaderContext context;
private RoaringDocIdSet.Builder sortedBuilder;
private DocIdSetBuilder builder;
private boolean finished = false;

/**
* Sole constructor.
* @param context The search context.
* @param queue The queue that is used to record the top composite buckets.
* @param isCollectionSorted true if the parent aggregator will pass documents sorted by doc_id.
*/
BestCompositeBucketsDeferringCollector(SearchContext context, CompositeValuesCollectorQueue queue, boolean isCollectionSorted) {
this.searchContext = context;
this.queue = queue;
this.isCollectionSorted = isCollectionSorted;
}

@Override
public boolean needsScores() {
if (collector == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this should really never occur? Should we make this an assertion instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, I replaced it with an assert.

throw new IllegalStateException();
}
return collector.needsScores();
}

/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
}

private void finishLeaf() {
if (context != null) {
DocIdSet docIdSet = isCollectionSorted ? sortedBuilder.build() : builder.build();
entries.add(new Entry(context, docIdSet));
}
context = null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

context = ctx;
if (isCollectionSorted) {
sortedBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
} else {
builder = new DocIdSetBuilder(ctx.reader().maxDoc());
}

return new LeafBucketCollector() {
int lastDoc = -1;
@Override
public void collect(int doc, long bucket) throws IOException {
if (lastDoc != doc) {
if (isCollectionSorted) {
sortedBuilder.add(doc);
} else {
builder.grow(1).add(doc);
}
}
lastDoc = doc;
}
};
}

@Override
public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
finishLeaf();
finished = true;
}

@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
// the selected buckets are extracted directly from the queue
assert selectedBuckets.length == 0;
if (!finished) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}

final boolean needsScores = needsScores();
Weight weight = null;
if (needsScores) {
Query query = searchContext.query();
weight = searchContext.searcher().createNormalizedWeight(query, true);
}
for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
}
final LeafBucketCollector subCollector = collector.getLeafCollector(entry.context);
final LeafBucketCollector collector =
queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));
DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (docIdSetIterator it not empty).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know it is not empty?

scorerIt = scorer.iterator();
subCollector.setScorer(scorer);
}
int docID;
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (needsScores) {
assert scorerIt.docID() < docID;
scorerIt.advance(docID);
// aggregations should only be replayed on matching documents
assert scorerIt.docID() == docID;
}
collector.collect(docID);
}
}
collector.postCollection();
}

/**
* Replay the top buckets from the matching documents.
*/
private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollector) {
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
Integer slot = queue.compareCurrent();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
subCollector.collect(doc, slot);
}
}
};
}
}
Expand Up @@ -19,16 +19,12 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -154,16 +150,9 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
if (parent != null) {
throw new IllegalArgumentException("[composite] aggregation cannot be used with a parent aggregation");
}
final QueryShardContext shardContext = context.getQueryShardContext();
CompositeValuesSourceConfig[] configs = new CompositeValuesSourceConfig[sources.size()];
SortField[] sortFields = new SortField[configs.length];
IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig();
if (indexSortConfig.hasIndexSort()) {
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
configs[i] = sources.get(i).build(context);
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}
Expand Down

This file was deleted.