Skip to content

Commit

Permalink
Allocate memory lazily in BestBucketsDeferringCollector (#43339)
Browse files Browse the repository at this point in the history
While investigating memory consumption of deeply nested aggregations for #43091
the memory used to keep track of the doc ids and buckets in the BestBucketsDeferringCollector
showed up as one of the main contributor. In my tests half of the memory held in the
 BestBucketsDeferringCollector is associated to segments that don't have matching docs
 in the selected buckets. This is expected on fields that have a big cardinality since each
 bucket can appear in very few segments. By allocating the builders lazily this change
 reduces the memory consumption by a factor 2 (from 1GB to 512MB), hence reducing the
impact on gcs for these volatile allocations. This commit also switches the PackedLongValues.Builder
with a RoaringDocIdSet in order to handle very sparse buckets more efficiently.

I ran all my tests on the `geoname` rally track with the following query:

````
{
    "size": 0,
    "aggs": {
        "country_population": {
            "terms": {
                "size": 100,
                "field": "country_code.raw"
            },
            "aggs": {
                "admin1_code": {
                    "terms": {
                        "size": 100,
                        "field": "admin1_code.raw"
                    },
                    "aggs": {
                        "admin2_code": {
                            "terms": {
                                "size": 100,
                                "field": "admin2_code.raw"
                            },
                            "aggs": {
                                "sum_population": {
                                    "sum": {
                                        "field": "population"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}
````
  • Loading branch information
jimczi committed Jun 19, 2019
1 parent 838d4c4 commit 5b1de3c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* A specialization of {@link DeferringBucketCollector} that collects all
Expand All @@ -48,28 +49,28 @@
* this collector.
*/
public class BestBucketsDeferringCollector extends DeferringBucketCollector {
private static class Entry {
static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;

Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
this.context = Objects.requireNonNull(context);
this.docDeltas = Objects.requireNonNull(docDeltas);
this.buckets = Objects.requireNonNull(buckets);
}
}

final List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
final boolean isGlobal;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;
protected List<Entry> entries = new ArrayList<>();
protected BucketCollector collector;
protected final SearchContext searchContext;
protected final boolean isGlobal;
protected LeafReaderContext context;
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected boolean finished = false;
protected LongHash selectedBuckets;

/**
* Sole constructor.
Expand Down Expand Up @@ -97,28 +98,32 @@ public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {

private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
assert docDeltasBuilder != null && bucketsBuilder != null;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
}
context = null;
docDeltas = null;
buckets = null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
context = null;
// allocates the builder lazily in case this segment doesn't contain any match
docDeltasBuilder = null;
bucketsBuilder = null;

return new LeafBucketCollector() {
int lastDoc = 0;

@Override
public void collect(int doc, long bucket) throws IOException {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
if (context == null) {
context = ctx;
docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
}
docDeltasBuilder.add(doc - lastDoc);
bucketsBuilder.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
Expand All @@ -141,7 +146,7 @@ public void postCollection() throws IOException {
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (!finished) {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
Expand All @@ -160,14 +165,16 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
Query query = isGlobal ? new MatchAllDocsQuery() : searchContext.query();
weight = searchContext.searcher().createWeight(searchContext.searcher().rewrite(query), ScoreMode.COMPLETE, 1f);
}

for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
docIt = scorer.iterator();
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
Expand All @@ -179,17 +186,16 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
assert scoreIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}

collector.postCollection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,98 +19,25 @@

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A specialization of {@link DeferringBucketCollector} that collects all
* A specialization of {@link BestBucketsDeferringCollector} that collects all
* matches and then is able to replay a given subset of buckets. Exposes
* mergeBuckets, which can be invoked by the aggregator when increasing the
* rounding interval.
*/
public class MergingBucketsDeferringCollector extends DeferringBucketCollector {

List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;

public MergingBucketsDeferringCollector(SearchContext context) {
this.searchContext = context;
}

@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

@Override
public ScoreMode scoreMode() {
if (collector == null) {
throw new IllegalStateException();
}
return collector.scoreMode();
}

@Override
public void preCollection() throws IOException {
collector.preCollection();
}

private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
}
context = null;
docDeltas = null;
buckets = null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);

return new LeafBucketCollector() {
int lastDoc = 0;

@Override
public void collect(int doc, long bucket) {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
};
public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollector {
public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal) {
super(context, isGlobal);
}

public void mergeBuckets(long[] mergeMap) {

List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
Expand All @@ -124,117 +51,14 @@ public void mergeBuckets(long[] mergeMap) {

// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (buckets.size() > 0) {
PackedLongValues currentBuckets = buckets.build();
if (bucketsBuilder.size() > 0) {
PackedLongValues currentBuckets = bucketsBuilder.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[Math.toIntExact(bucket)]);
}
buckets = newBuckets;
}
}

@Override
public void postCollection() {
finishLeaf();
finished = true;
}

/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}

final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
}
this.selectedBuckets = hash;

boolean needsScores = collector.scoreMode().needsScores();
Weight weight = null;
if (needsScores) {
weight = searchContext.searcher().createWeight(
searchContext.searcher().rewrite(searchContext.query()),
ScoreMode.COMPLETE, 1f);
}
for (Entry entry : entries) {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay
// (entry.docDeltas it not empty).
docIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching
// documents
assert docIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}

collector.postCollection();
}

/**
* Wrap the provided aggregator so that it behaves (almost) as if it had
* been collected directly.
*/
@Override
public Aggregator wrap(final Aggregator in) {

return new WrappedAggregator(in) {

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]");
}
return in.buildAggregation(rebasedBucket);
}

};
}

private static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;

Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
bucketsBuilder = newBuckets;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected boolean shouldDefer(Aggregator aggregator) {

@Override
public DeferringBucketCollector getDeferringCollector() {
deferringCollector = new MergingBucketsDeferringCollector(context);
deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent()));
return deferringCollector;
}

Expand Down

0 comments on commit 5b1de3c

Please sign in to comment.