diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 32695ac69a88e..5e5e60760d790 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * A specialization of {@link DeferringBucketCollector} that collects all @@ -48,28 +49,28 @@ * this collector. */ public class BestBucketsDeferringCollector extends DeferringBucketCollector { - private static class Entry { + static class Entry { final LeafReaderContext context; final PackedLongValues docDeltas; final PackedLongValues buckets; Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) { - this.context = context; - this.docDeltas = docDeltas; - this.buckets = buckets; + this.context = Objects.requireNonNull(context); + this.docDeltas = Objects.requireNonNull(docDeltas); + this.buckets = Objects.requireNonNull(buckets); } } - final List entries = new ArrayList<>(); - BucketCollector collector; - final SearchContext searchContext; - final boolean isGlobal; - LeafReaderContext context; - PackedLongValues.Builder docDeltas; - PackedLongValues.Builder buckets; - long maxBucket = -1; - boolean finished = false; - LongHash selectedBuckets; + protected List entries = new ArrayList<>(); + protected BucketCollector collector; + protected final SearchContext searchContext; + protected final boolean isGlobal; + protected LeafReaderContext context; + protected PackedLongValues.Builder docDeltasBuilder; + protected PackedLongValues.Builder bucketsBuilder; + protected long maxBucket = -1; + protected boolean finished = false; + protected LongHash selectedBuckets; /** * Sole constructor. @@ -97,28 +98,32 @@ public void setDeferredCollector(Iterable deferredCollectors) { private void finishLeaf() { if (context != null) { - entries.add(new Entry(context, docDeltas.build(), buckets.build())); + assert docDeltasBuilder != null && bucketsBuilder != null; + entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build())); } - context = null; - docDeltas = null; - buckets = null; } @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { finishLeaf(); - context = ctx; - docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + context = null; + // allocates the builder lazily in case this segment doesn't contain any match + docDeltasBuilder = null; + bucketsBuilder = null; return new LeafBucketCollector() { int lastDoc = 0; @Override public void collect(int doc, long bucket) throws IOException { - docDeltas.add(doc - lastDoc); - buckets.add(bucket); + if (context == null) { + context = ctx; + docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + } + docDeltasBuilder.add(doc - lastDoc); + bucketsBuilder.add(bucket); lastDoc = doc; maxBucket = Math.max(maxBucket, bucket); } @@ -141,7 +146,7 @@ public void postCollection() throws IOException { */ @Override public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { - if (!finished) { + if (finished == false) { throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); } if (this.selectedBuckets != null) { @@ -160,14 +165,16 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { Query query = isGlobal ? new MatchAllDocsQuery() : searchContext.query(); weight = searchContext.searcher().createWeight(searchContext.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } + for (Entry entry : entries) { + assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0"; final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); - DocIdSetIterator docIt = null; - if (needsScores && entry.docDeltas.size() > 0) { + DocIdSetIterator scoreIt = null; + if (needsScores) { Scorer scorer = weight.scorer(entry.context); // We don't need to check if the scorer is null // since we are sure that there are documents to replay (entry.docDeltas it not empty). - docIt = scorer.iterator(); + scoreIt = scorer.iterator(); leafCollector.setScorer(scorer); } final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); @@ -179,17 +186,16 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { final long rebasedBucket = hash.find(bucket); if (rebasedBucket != -1) { if (needsScores) { - if (docIt.docID() < doc) { - docIt.advance(doc); + if (scoreIt.docID() < doc) { + scoreIt.advance(doc); } // aggregations should only be replayed on matching documents - assert docIt.docID() == doc; + assert scoreIt.docID() == doc; } leafCollector.collect(doc, rebasedBucket); } } } - collector.postCollection(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 53049d0301c2d..b293cc53a3629 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -19,98 +19,25 @@ package org.elasticsearch.search.aggregations.bucket; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedLongValues; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.LongHash; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.BucketCollector; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.MultiBucketCollector; import org.elasticsearch.search.internal.SearchContext; -import java.io.IOException; import java.util.ArrayList; import java.util.List; /** - * A specialization of {@link DeferringBucketCollector} that collects all + * A specialization of {@link BestBucketsDeferringCollector} that collects all * matches and then is able to replay a given subset of buckets. Exposes * mergeBuckets, which can be invoked by the aggregator when increasing the * rounding interval. */ -public class MergingBucketsDeferringCollector extends DeferringBucketCollector { - - List entries = new ArrayList<>(); - BucketCollector collector; - final SearchContext searchContext; - LeafReaderContext context; - PackedLongValues.Builder docDeltas; - PackedLongValues.Builder buckets; - long maxBucket = -1; - boolean finished = false; - LongHash selectedBuckets; - - public MergingBucketsDeferringCollector(SearchContext context) { - this.searchContext = context; - } - - @Override - public void setDeferredCollector(Iterable deferredCollectors) { - this.collector = MultiBucketCollector.wrap(deferredCollectors); - } - - @Override - public ScoreMode scoreMode() { - if (collector == null) { - throw new IllegalStateException(); - } - return collector.scoreMode(); - } - - @Override - public void preCollection() throws IOException { - collector.preCollection(); - } - - private void finishLeaf() { - if (context != null) { - entries.add(new Entry(context, docDeltas.build(), buckets.build())); - } - context = null; - docDeltas = null; - buckets = null; - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - finishLeaf(); - - context = ctx; - docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - - return new LeafBucketCollector() { - int lastDoc = 0; - - @Override - public void collect(int doc, long bucket) { - docDeltas.add(doc - lastDoc); - buckets.add(bucket); - lastDoc = doc; - maxBucket = Math.max(maxBucket, bucket); - } - }; +public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollector { + public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal) { + super(context, isGlobal); } public void mergeBuckets(long[] mergeMap) { - List newEntries = new ArrayList<>(entries.size()); for (Entry sourceEntry : entries) { PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); @@ -124,117 +51,14 @@ public void mergeBuckets(long[] mergeMap) { // if there are buckets that have been collected in the current segment // we need to update the bucket ordinals there too - if (buckets.size() > 0) { - PackedLongValues currentBuckets = buckets.build(); + if (bucketsBuilder.size() > 0) { + PackedLongValues currentBuckets = bucketsBuilder.build(); PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) { long bucket = itr.next(); newBuckets.add(mergeMap[Math.toIntExact(bucket)]); } - buckets = newBuckets; - } - } - - @Override - public void postCollection() { - finishLeaf(); - finished = true; - } - - /** - * Replay the wrapped collector, but only on a selection of buckets. - */ - @Override - public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { - if (finished == false) { - throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); - } - if (this.selectedBuckets != null) { - throw new IllegalStateException("Already been replayed"); - } - - final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); - for (long bucket : selectedBuckets) { - hash.add(bucket); - } - this.selectedBuckets = hash; - - boolean needsScores = collector.scoreMode().needsScores(); - Weight weight = null; - if (needsScores) { - weight = searchContext.searcher().createWeight( - searchContext.searcher().rewrite(searchContext.query()), - ScoreMode.COMPLETE, 1f); - } - for (Entry entry : entries) { - final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); - DocIdSetIterator docIt = null; - if (needsScores && entry.docDeltas.size() > 0) { - Scorer scorer = weight.scorer(entry.context); - // We don't need to check if the scorer is null - // since we are sure that there are documents to replay - // (entry.docDeltas it not empty). - docIt = scorer.iterator(); - leafCollector.setScorer(scorer); - } - final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); - final PackedLongValues.Iterator buckets = entry.buckets.iterator(); - int doc = 0; - for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { - doc += docDeltaIterator.next(); - final long bucket = buckets.next(); - final long rebasedBucket = hash.find(bucket); - if (rebasedBucket != -1) { - if (needsScores) { - if (docIt.docID() < doc) { - docIt.advance(doc); - } - // aggregations should only be replayed on matching - // documents - assert docIt.docID() == doc; - } - leafCollector.collect(doc, rebasedBucket); - } - } - } - - collector.postCollection(); - } - - /** - * Wrap the provided aggregator so that it behaves (almost) as if it had - * been collected directly. - */ - @Override - public Aggregator wrap(final Aggregator in) { - - return new WrappedAggregator(in) { - - @Override - public InternalAggregation buildAggregation(long bucket) throws IOException { - if (selectedBuckets == null) { - throw new IllegalStateException("Collection has not been replayed yet."); - } - final long rebasedBucket = selectedBuckets.find(bucket); - if (rebasedBucket == -1) { - throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]"); - } - return in.buildAggregation(rebasedBucket); - } - - }; - } - - private static class Entry { - final LeafReaderContext context; - final PackedLongValues docDeltas; - final PackedLongValues buckets; - - Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) { - this.context = context; - this.docDeltas = docDeltas; - this.buckets = buckets; + bucketsBuilder = newBuckets; } } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 1b982ea9deca2..b10507cd2ce65 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -93,7 +93,7 @@ protected boolean shouldDefer(Aggregator aggregator) { @Override public DeferringBucketCollector getDeferringCollector() { - deferringCollector = new MergingBucketsDeferringCollector(context); + deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent())); return deferringCollector; }