Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred aggregations prevent combinatorial explosion #6128

Closed
wants to merge 15 commits into from

Conversation

Projects
None yet
5 participants
@markharwood
Copy link
Contributor

commented May 12, 2014

New BucketCollector classes to aid the recording and subsequent playback of "collect" streams in aggs to reduce combinatorial explosions where pruning of parent buckets should occur before calculating child aggs.
Aggregator base class now wraps the subAgg BucketCollectors with any required caching of collect streams for sub aggregations that are indicated as being deferred. Aggregator subclasses should now override shouldDefer to indicate any aggs that are expensive to compute and in the BuildAggregation call should subsequently call runDeferredCollections with the subset of bucket ordinals that represent the pruned parent buckets of interest.

@jpountz jpountz added v1.2.0 and removed v1.2.0 labels May 12, 2014

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/FilteringBucketCollector.java Outdated

@Override
public final void collect(int docId, long bucketOrdinal) throws IOException {
int pos = Arrays.binarySearch(sortedOrds, bucketOrdinal);

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

Should it be a hash table instead to make the access constant-time? I think it wouldn't matter with the default size of 10 but maybe it would it the user sets shard_size to eg. 1000?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

We already make a split in choice of collector impl for the case where the number of buckets is 1 or >1 so maybe there could be another break-point where we choose between hash table and sorted array?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/FilteringBucketCollector.java Outdated
public final void collect(int docId, long bucketOrdinal) throws IOException {
int pos = Arrays.binarySearch(sortedOrds, bucketOrdinal);
if(pos>=0){
delegate.collect(docId, bucketOrdinal);

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

Since sub aggregations are going to build arrays of a length which is the largest bucket ordinal that has been seen so far, I think we should make the bucket ordinals dense to save memory?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

So if I understand correctly, if the top agg collects 10k buckets but keeps only the top 5 then the deferred collector should map these parent ordinals to a number between 0 and 4 when feeding collect stream to deferred child aggs?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/RecordingBucketCollector.java Outdated
protected static final int INITIAL_CAPACITY = 50; // TODO sizing
protected IntArray docs;
protected IntArray readerIds;
private LongArray buckets;

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

I think we should store the recorded docs and buckets on a per-AtomicReader basis: this will prevent us from having to record a readerId for each (docId, bucketOrdinal) pair?

Regarding the recording of doc IDs and bucket ordinals, I think we should compress them, even if it means that we need to drop recycling. Typically, we could force in-order collection (via AggregationContext.ensureScoreDocsInOrder) and then store doc ID deltas and bucket ordinals in two AppendingPackedLongBuffer instances?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

The rationale for existing data structures was that a small number of recyclable big arrays would be better than many per-segment collections. I guess it's hard to predict as it depends on queries/data. Would it be worth undertaking some benchmarking or are you happy that this is indeed the best route? I deliberately kept RecordingBucketCollector separate from DeferringBucketCollector to allow for alternative data structures to be employed.

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

Given that the number of segments is bounded, I don't think it is an issue to cache per segment. And it would also probably help save memory since you wouldn't need to store the reader ids anymore?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java Outdated
BucketCollector deferreds=BucketCollector.wrap(nextPassCollectors);
recordingWrapper=new DeferringBucketCollector(deferreds, context);
// //TODO. Without line below we are dependent on subclass aggs delegating setNextReader calls on to child aggs
// // which they don't seem to do as a matter of course? Is there a cleaner way around this?

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

+1 on making aggregators responsible for calling setNextReader on their children. Maybe we can leave it as-is for now (I think it's ok) and do it as part of another issue/PR?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java Outdated
doPostCollection();
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Releasables.close(recordingWrapper);
doClose();

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

Can we make sure that doClose is called even if Releasables.close(recordingWrapper) throws an exception? For example:

try (Releasable _ = recordingWrapper) {
  doClose();
}
@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java Outdated
deferred.postCollection();
}};
BucketCollector filteredCollector;
//Optimize choice of collector for the simple case of a single bucket

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

Do we need this specialization?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

Without it your collector is doing a binarySort or hashtable look up on a set of 1 for each doc collected - seemed inefficient compared to a simple == test on bucket ordinal

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

But is it common to only request the top term?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

Maybe, but it may have to collect a million docs with that single term - each time looking in a collection of 1 to see if that same bucket ordinal is in there.

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

If it isn't used often, I think we shouldn't have this specialization. For example, we don't specialize the data structure to use to collect top hits if only 1 or 2 of them are requested, we always use a heap?

@jpountz

View changes

docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc Outdated
"terms" : {
"field" : "actors",
"size" : 10,
"collect_mode" : "prune_first"

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

Since we call it "Deferring calculation of child aggregations", should this collect_mode be called deferred?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

In future we may need to consider other deferral modes like @kimchy 's serial approach (although I tried adding that to this PR and failed due to the many extra changes that would be required - I'll save that discussion for another issue)

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

@colings86 also had the idea of "breadth_first" vs "depth_first" as mode names?

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

I tend to like these names!

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java Outdated
context.searchContext().addReleasable(this, Lifetime.PHASE);
// Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent initialize call)

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

alternatively, maybe the constructor should just set collectableSubAggregators to null?

This comment has been minimized.

Copy link
@markharwood

markharwood May 20, 2014

Author Contributor

That's where I was originally - a failure to call initialize threw an NPE but this felt like a more useful way of reporting any future issues with code neglecting to call initialize.

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java Outdated
thisPassCollectors.add(recordingWrapper);
}
collectableSubAggregators=BucketCollector.wrap(thisPassCollectors);
}

This comment has been minimized.

Copy link
@jpountz

jpountz May 20, 2014

Contributor

👍 I like how this method abstracts the way sub aggregators should be collected and how you can mix deferred aggregation with sorting.

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java Outdated
* @param analysis an object that represents the summary e.g. an {@link Aggregation}
*/
void add(Object analysis);
}

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

Can we have generics here so that implementers do not need to do unchecked casts?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/FilteringBucketCollector.java Outdated
public class FilteringBucketCollector extends BucketCollector implements Releasable {

private LongHash denseMap;
private BucketCollector delegate;

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

I think both could be final?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/FilteringBucketCollector.java Outdated
long ordinal = denseMap.find(bucketOrdinal);
if (ordinal >= 0) {
delegate.gatherAnalysis(analysisCollector, ordinal);
}

This comment has been minimized.

Copy link
@jpountz

jpountz May 23, 2014

Contributor

else throw an exception? Or do we rely on the fact that this is a no-op when the bucket ordinal is not known?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java Outdated
}
};
}
protected void initialize() {

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

Let's maybe call it preCollect? (symetric with postCollect)

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 3, 2014

Contributor

What do you think?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/RecordingBucketCollector.java Outdated
private PerSegmentCollects currentCollection;
private boolean recordingComplete;

class PerSegmentCollects {

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

I think it can be static?

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/RecordingBucketCollector.java Outdated
// No way of accurately predicting how many docs will be collected
docs = new AppendingPackedLongBuffer();
}
docs.add(doc);

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

Instead of storing doc IDs as-is, I think we should force collection to happen in order (AggregationContext.ensureScoreDocsInOrder) and store doc ID deltas: docId0, docId1 - docId0, ..., docIdN - docId(N-1) to save memory

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/RecordingBucketCollector.java Outdated
int numDocs = docs.get(pos, docsBuffer, 0, docsBuffer.length);
int numBuckets = buckets.get(pos, bucketsBuffer, 0, bucketsBuffer.length);
// TODO not sure why numDocs and numBuckets differs, but
// they do and this min statement was required!

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

They do because the bulk GET API tries to only decode full blocks and might return fewer values than asked. Maybe switch to the iterator API to make it less trappy? (which should still be quite efficient since it does bulk decoding under the hoods)

@jpountz

View changes

src/main/java/org/elasticsearch/search/aggregations/RecordingBucketCollector.java Outdated
if(recordingComplete){
// The way registration works for listening on reader changes we have the potential to be called > once
// TODO fixup the aggs framework so setNextReader calls are delegated to child aggs and not reliant on
// registering a listener.

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

Agreed, this is troublesome. Can you fix it so that setNextReader is called by the parent aggregator, or the collector for the root aggs? (either as part of this change or as another one, but IMO we need it for this change)

@jpountz

View changes

...n/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java Outdated
@Override
protected boolean shouldDefer(Aggregator aggregator) {
return (subAggCollectMode == SubAggCollectionMode.PRUNE_FIRST) && (aggregator != aggUsedForSorting);
}

This comment has been minimized.

Copy link
@jpountz

jpountz May 30, 2014

Contributor

Could we try to share this logic among Double/Long/String terms aggregators by having a common superclass?

@jpountz

This comment has been minimized.

Copy link
Contributor

commented May 30, 2014

I left a few comments, but I like the new per-segment buffering of documents/buckets. I also think we should remove FilteringSingleBucketCollector, I don't like having a specialization for something that would be used so rarely: it is only used when shard_size is 1.

@jpountz

This comment has been minimized.

Copy link
Contributor

commented May 30, 2014

I quickly looked at the last changes and they look good! Before we pull that in, I think we should make sure users would get a meaningful error if they try to use scores while replaying doc IDs and to take another look at the formatting (some missing spaces around operators/brackets and lines with trailing spaces).

markharwood and others added some commits May 2, 2014

Updated Aggregation Benchmark tests for collect mode
Added 'Deferred Aggregation' to the TermsAggregationSearchBenchmark and created a new benchmark for testing nested aggregations with different combinations of collect mode at each level
Optimisation to use rebased ordinals for deferred aggregations to all…
…ow for more compact data structures downstream where heavy pruning reduces the numbers of buckets under consideration
// A scorer used for the deferred collection mode to handle any child aggs asking for scores that are not
// recorded.
static final Scorer unavailableScorer=new Scorer(null){
private final String MSG="A limitation of the "+SubAggCollectionMode.DEPTH_FIRST.parseField.getPreferredName()+

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 3, 2014

Contributor

s/DEPTH_FIRST/BREADTH_FIRST/ ?

return mode;
}
}
throw new ElasticsearchParseException("No collectionMode found for value [" + value + "]");

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 3, 2014

Contributor

s/collectionMode/collect_mode/
(should we have a static ParseField for it that we would reuse in this message?)

@markharwood

This comment has been minimized.

Copy link
Contributor Author

commented Jun 4, 2014

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jun 4, 2014

I don't think it could help: building buckets based on counts is not practical as you would need the global counts to make a decision while a shard would only have shard-local knowledge.

// A scorer used for the deferred collection mode to handle any child aggs asking for scores that are not
// recorded.
static final Scorer unavailableScorer=new Scorer(null){
private final String MSG="A limitation of the "+SubAggCollectionMode.BREADTH_FIRST.parseField.getPreferredName()+

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

Can you add spaces around '=' and '+'?

for (Aggregator aggregator : collectables) {
if(shouldDefer(aggregator)){
nextPassCollectors.add(aggregator);
}else{

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

Can you add spaces after if and around else?

protected void preCollection() {
Iterable<Aggregator> collectables = Iterables.filter(Arrays.asList(subAggregators), COLLECTABLE_AGGREGATOR);
List<BucketCollector> nextPassCollectors=new ArrayList<>();
List<BucketCollector> thisPassCollectors=new ArrayList<>();

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces

}
if(nextPassCollectors.size()>0){
BucketCollector deferreds=BucketCollector.wrap(nextPassCollectors);
recordingWrapper=new DeferringBucketCollector(deferreds, context);

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces

@Override
public void setNextReader(AtomicReaderContext reader) {
// Need to set AggregationContext otherwise ValueSources in aggs
// don't read any values

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

looks like these comments have some extra indentation

*/
public void prepareSelectedBuckets(long... survivingBucketOrds) {

BucketCollector subs=new BucketCollector(){

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces around '='

@@ -69,6 +80,8 @@ public void parse(String aggregationName, XContentParser parser, SearchContext c
} else if (token == XContentParser.Token.VALUE_STRING) {
if (EXECUTION_HINT_FIELD_NAME.match(currentFieldName)) {
executionHint = parser.text();
} else if(Aggregator.COLLECT_MODE.match(currentFieldName)){
collectMode=SubAggCollectionMode.parse(parser.text());

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces after if and around '='

list[i] = bucket;
}
//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
//Now build the aggs

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces after '//'

list[i] = bucket;
}
//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
//Now build the aggs

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces after '//'


//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
//Now build the aggs

This comment has been minimized.

Copy link
@jpountz

jpountz Jun 4, 2014

Contributor

spaces after '//'

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jun 4, 2014

LGTM, I just left comments about formatting. Can you fix these before pushing?

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jun 6, 2014

LGTM

markharwood added a commit that referenced this pull request Jun 6, 2014

Aggregations optimisation for memory usage. Added changes to core Agg…
…regator class to support a new mode of deferred collection.

A new "breadth_first" results collection mode allows upper branches of aggregation tree to be calculated and then pruned
to a smaller selection before advancing into executing collection on child branches.

Closes #6128

@s1monw s1monw removed the review label Jun 18, 2014

@s1monw s1monw added feature labels Jul 9, 2014

@clintongormley clintongormley changed the title Deferred aggregation Aggregations: Deferred aggregations prevent combinatorial explosion Jul 16, 2014

@clintongormley clintongormley changed the title Aggregations: Deferred aggregations prevent combinatorial explosion Deferred aggregations prevent combinatorial explosion Jun 6, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.