Skip to content

Commit

Permalink
Reduce InternalComposite in a streaming fashion (#106566)
Browse files Browse the repository at this point in the history
Use a priority queue with a hashmap to keep track of the competitive buckets. We still delayed the merging of the child 
aggregations by introducing a DelayedMultiBucketAggregatorsReducer.
  • Loading branch information
iverase committed Mar 25, 2024
1 parent d44b9b6 commit 12441f5
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorsReducer;
import org.elasticsearch.search.aggregations.InternalAggregations;

import java.util.ArrayList;
import java.util.List;

/**
* Class for reducing a list of {@link MultiBucketsAggregation.Bucket} to a single
* {@link InternalAggregations} and the number of documents in a delayable fashion.
*
* This class can be reused by calling {@link #reset()}.
*
* @see MultiBucketAggregatorsReducer
*/
public final class DelayedMultiBucketAggregatorsReducer {

private final AggregationReduceContext context;
// the maximum size of this array is the number of shards to be reduced. We currently do it in a batches of 256
// if we expect bigger batches, we might consider to use ObjectArray.
private final List<InternalAggregations> internalAggregations;
private long count = 0;

public DelayedMultiBucketAggregatorsReducer(AggregationReduceContext context) {
this.context = context;
this.internalAggregations = new ArrayList<>();
}

/**
* Adds a {@link MultiBucketsAggregation.Bucket} for reduction.
*/
public void accept(MultiBucketsAggregation.Bucket bucket) {
count += bucket.getDocCount();
internalAggregations.add(bucket.getAggregations());
}

/**
* Reset the content of this reducer.
*/
public void reset() {
count = 0L;
internalAggregations.clear();
}

/**
* returns the reduced {@link InternalAggregations}.
*/
public InternalAggregations get() {
try (AggregatorsReducer aggregatorsReducer = new AggregatorsReducer(context, internalAggregations.size())) {
for (InternalAggregations agg : internalAggregations) {
aggregatorsReducer.accept(agg);
}
return aggregatorsReducer.get();
}
}

/**
* returns the number of docs
*/
public long getDocCount() {
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public final class MultiBucketAggregatorsReducer implements Releasable {

private final AggregatorsReducer aggregatorsReducer;
long count = 0;
private long count = 0;

public MultiBucketAggregatorsReducer(AggregationReduceContext context, int size) {
this.aggregatorsReducer = new AggregatorsReducer(context, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.common.util.ObjectObjectPagedHashMap;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
Expand All @@ -20,6 +22,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.DelayedMultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -201,56 +204,29 @@ int[] getReverseMuls() {
@Override
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
return new AggregatorReducer() {
final ObjectArrayPriorityQueue<BucketIterator> pq = new ObjectArrayPriorityQueue<>(size, reduceContext.bigArrays()) {
@Override
protected boolean lessThan(BucketIterator a, BucketIterator b) {
return a.compareTo(b) < 0;
}
};
final BucketsQueue queue = new BucketsQueue(reduceContext);
boolean earlyTerminated = false;

@Override
public void accept(InternalAggregation aggregation) {
InternalComposite sortedAgg = (InternalComposite) aggregation;
earlyTerminated |= sortedAgg.earlyTerminated;
BucketIterator it = new BucketIterator(sortedAgg.buckets);
if (it.next() != null) {
pq.add(it);
for (InternalBucket bucket : sortedAgg.getBuckets()) {
if (queue.add(bucket) == false) {
// if the bucket is not competitive, we can break
// because incoming buckets are sorted
break;
}
}
}

@Override
public InternalAggregation get() {
InternalBucket lastBucket = null;
List<InternalBucket> buckets = new ArrayList<>();
List<InternalBucket> result = new ArrayList<>();
while (pq.size() > 0) {
BucketIterator bucketIt = pq.top();
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
result.add(reduceBucket);
if (result.size() >= getSize()) {
break;
}
}
lastBucket = bucketIt.current;
buckets.add(bucketIt.current);
if (bucketIt.next() != null) {
pq.updateTop();
} else {
pq.pop();
}
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
result.add(reduceBucket);
}

final List<InternalBucket> result = queue.get();
List<DocValueFormat> reducedFormats = formats;
CompositeKey lastKey = null;
if (result.size() > 0) {
lastBucket = result.get(result.size() - 1);
if (result.isEmpty() == false) {
InternalBucket lastBucket = result.get(result.size() - 1);
/* Attach the formats from the last bucket to the reduced composite
* so that we can properly format the after key. */
reducedFormats = lastBucket.formats;
Expand All @@ -275,11 +251,82 @@ public InternalAggregation get() {

@Override
public void close() {
Releasables.close(pq);
Releasables.close(queue);
}
};
}

private class BucketsQueue implements Releasable {
private final ObjectObjectPagedHashMap<Object, DelayedMultiBucketAggregatorsReducer> bucketReducers;
private final ObjectArrayPriorityQueue<InternalBucket> queue;
private final AggregationReduceContext reduceContext;

private BucketsQueue(AggregationReduceContext reduceContext) {
this.reduceContext = reduceContext;
bucketReducers = new ObjectObjectPagedHashMap<>(getSize(), reduceContext.bigArrays());
queue = new ObjectArrayPriorityQueue<>(getSize(), reduceContext.bigArrays()) {
@Override
protected boolean lessThan(InternalBucket a, InternalBucket b) {
return b.compareKey(a) < 0;
}
};
}

/** adds a bucket to the queue. Return false if the bucket is not competitive, otherwise true.*/
boolean add(InternalBucket bucket) {
DelayedMultiBucketAggregatorsReducer delayed = bucketReducers.get(bucket.key);
if (delayed == null) {
final InternalBucket out = queue.insertWithOverflow(bucket);
if (out == null) {
// bucket is added
delayed = new DelayedMultiBucketAggregatorsReducer(reduceContext);
} else if (out == bucket) {
// bucket is not competitive
return false;
} else {
// bucket replaces existing bucket
delayed = bucketReducers.remove(out.key);
assert delayed != null;
delayed.reset();
}
bucketReducers.put(bucket.key, delayed);
}
delayed.accept(bucket);
return true;
}

/** Return the list of reduced buckets */
List<InternalBucket> get() {
final int bucketsSize = (int) bucketReducers.size();
final InternalBucket[] result = new InternalBucket[bucketsSize];
for (int i = bucketsSize - 1; i >= 0; i--) {
final InternalBucket bucket = queue.pop();
assert bucket != null;
/* Use the formats from the bucket because they'll be right to format
* the key. The formats on the InternalComposite doing the reducing are
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
final var reducedFormats = bucket.formats;
final DelayedMultiBucketAggregatorsReducer reducer = Objects.requireNonNull(bucketReducers.get(bucket.key));
result[i] = new InternalBucket(
sourceNames,
reducedFormats,
bucket.key,
reverseMuls,
missingOrders,
reducer.getDocCount(),
reducer.get()
);
}
return List.of(result);
}

@Override
public void close() {
Releasables.close(bucketReducers, queue);
}
}

@Override
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
return new InternalComposite(
Expand All @@ -296,22 +343,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
);
}

private InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.isEmpty() == false;
long docCount = 0;
for (InternalBucket bucket : buckets) {
docCount += bucket.docCount;
}
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
/* Use the formats from the bucket because they'll be right to format
* the key. The formats on the InternalComposite doing the reducing are
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
final var reducedFormats = buckets.get(0).formats;
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
Expand All @@ -331,24 +362,6 @@ public int hashCode() {
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders));
}

private static class BucketIterator implements Comparable<BucketIterator> {
final Iterator<InternalBucket> it;
InternalBucket current;

private BucketIterator(List<InternalBucket> buckets) {
this.it = buckets.iterator();
}

@Override
public int compareTo(BucketIterator other) {
return current.compareKey(other.current);
}

InternalBucket next() {
return current = it.hasNext() ? it.next() : null;
}
}

public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket
implements
CompositeAggregation.Bucket,
Expand Down

0 comments on commit 12441f5

Please sign in to comment.