Skip to content

Commit

Permalink
Refactor MultiBucketAggregatorsReducer and DelayedMultiBucketAggregat…
Browse files Browse the repository at this point in the history
…orsReducer (#106725)

renamed to BucketReducer DelayedBucketReducer and they have a new property containing the prototype bucket
  • Loading branch information
iverase committed Mar 26, 2024
1 parent dc40eef commit 47dbd61
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.ObjectObjectPagedHashMap;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorReducer;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.bucket.BucketReducer;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -180,36 +180,49 @@ public InternalBucket getBucketByKey(String key) {
@Override
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
return new AggregatorReducer() {
final Map<String, MultiBucketAggregatorsReducer> bucketsReducer = new HashMap<>(getBuckets().size());
final ObjectObjectPagedHashMap<String, BucketReducer<InternalBucket>> bucketsReducer = new ObjectObjectPagedHashMap<>(
getBuckets().size(),
reduceContext.bigArrays()
);

@Override
public void accept(InternalAggregation aggregation) {
final InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;
for (InternalBucket bucket : filters.buckets) {
MultiBucketAggregatorsReducer reducer = bucketsReducer.computeIfAbsent(
bucket.key,
k -> new MultiBucketAggregatorsReducer(reduceContext, size)
);
BucketReducer<InternalBucket> reducer = bucketsReducer.get(bucket.key);
if (reducer == null) {
reducer = new BucketReducer<>(bucket, reduceContext, size);
boolean success = false;
try {
bucketsReducer.put(bucket.key, reducer);
success = true;
} finally {
if (success == false) {
Releasables.close(reducer);
}
}
}
reducer.accept(bucket);
}
}

@Override
public InternalAggregation get() {
List<InternalBucket> reducedBuckets = new ArrayList<>(bucketsReducer.size());
for (Map.Entry<String, MultiBucketAggregatorsReducer> entry : bucketsReducer.entrySet()) {
if (entry.getValue().getDocCount() >= 1) {
reducedBuckets.add(new InternalBucket(entry.getKey(), entry.getValue().getDocCount(), entry.getValue().get()));
List<InternalBucket> reducedBuckets = new ArrayList<>((int) bucketsReducer.size());
bucketsReducer.forEach(entry -> {
if (entry.value.getDocCount() >= 1) {
reducedBuckets.add(new InternalBucket(entry.key, entry.value.getDocCount(), entry.value.getAggregations()));
}
}
});
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
reducedBuckets.sort(Comparator.comparing(InternalBucket::getKey));
return new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
}

@Override
public void close() {
Releasables.close(bucketsReducer.values());
bucketsReducer.forEach(entry -> Releasables.close(entry.value));
Releasables.close(bucketsReducer);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.bucket.BucketReducer;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
Expand Down Expand Up @@ -444,7 +444,7 @@ static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Rou
@Override
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
return new AggregatorReducer() {
private final LongObjectPagedHashMap<MultiBucketAggregatorsReducer> bucketsReducer = new LongObjectPagedHashMap<>(
private final LongObjectPagedHashMap<BucketReducer<Bucket>> bucketsReducer = new LongObjectPagedHashMap<>(
getBuckets().size(),
reduceContext.bigArrays()
);
Expand All @@ -460,9 +460,9 @@ public void accept(InternalAggregation aggregation) {
min = Math.min(min, histogram.buckets.get(0).key);
max = Math.max(max, histogram.buckets.get(histogram.buckets.size() - 1).key);
for (Bucket bucket : histogram.buckets) {
MultiBucketAggregatorsReducer reducer = bucketsReducer.get(bucket.key);
BucketReducer<Bucket> reducer = bucketsReducer.get(bucket.key);
if (reducer == null) {
reducer = new MultiBucketAggregatorsReducer(reduceContext, size);
reducer = new BucketReducer<>(bucket, reduceContext, size);
bucketsReducer.put(bucket.key, reducer);
}
reducer.accept(bucket);
Expand All @@ -480,34 +480,34 @@ public InternalAggregation get() {
{
// fill the array and sort it
final int[] index = new int[] { 0 };
bucketsReducer.iterator().forEachRemaining(c -> keys[index[0]++] = c.key);
bucketsReducer.forEach(c -> keys[index[0]++] = c.key);
Arrays.sort(keys);
}

final List<Bucket> reducedBuckets = new ArrayList<>();
if (keys.length > 0) {
// list of buckets coming from different shards that have the same key
MultiBucketAggregatorsReducer currentReducer = null;
BucketReducer<Bucket> currentReducer = null;
long key = reduceRounding.round(keys[0]);
for (long top : keys) {
if (reduceRounding.round(top) != key) {
assert currentReducer != null;
// the key changes, reduce what we already buffered and reset the buffer for current buckets
reducedBuckets.add(createBucket(key, currentReducer.getDocCount(), currentReducer.get()));
reducedBuckets.add(createBucket(key, currentReducer.getDocCount(), currentReducer.getAggregations()));
currentReducer = null;
key = reduceRounding.round(top);
}

final MultiBucketAggregatorsReducer nextReducer = bucketsReducer.get(top);
final BucketReducer<Bucket> nextReducer = bucketsReducer.get(top);
if (currentReducer == null) {
currentReducer = nextReducer;
} else {
currentReducer.accept(createBucket(key, nextReducer.getDocCount(), nextReducer.get()));
currentReducer.accept(createBucket(key, nextReducer.getDocCount(), nextReducer.getAggregations()));
}
}

if (currentReducer != null) {
reducedBuckets.add(createBucket(key, currentReducer.getDocCount(), currentReducer.get()));
reducedBuckets.add(createBucket(key, currentReducer.getDocCount(), currentReducer.getAggregations()));
}
}

Expand Down Expand Up @@ -546,7 +546,7 @@ public InternalAggregation get() {

@Override
public void close() {
bucketsReducer.iterator().forEachRemaining(c -> Releasables.close(c.value));
bucketsReducer.forEach(c -> Releasables.close(c.value));
Releasables.close(bucketsReducer);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,40 @@
import org.elasticsearch.search.aggregations.InternalAggregations;

/**
* Class for reducing a list of {@link MultiBucketsAggregation.Bucket} to a single
* {@link InternalAggregations} and the number of documents.
* Class for reducing a list of {@link B} to a single {@link InternalAggregations}
* and the number of documents.
*/
public final class MultiBucketAggregatorsReducer implements Releasable {
public final class BucketReducer<B extends MultiBucketsAggregation.Bucket> implements Releasable {

private final AggregatorsReducer aggregatorsReducer;
private final B proto;
private long count = 0;

public MultiBucketAggregatorsReducer(AggregationReduceContext context, int size) {
public BucketReducer(B proto, AggregationReduceContext context, int size) {
this.aggregatorsReducer = new AggregatorsReducer(context, size);
this.proto = proto;
}

/**
* Adds a {@link MultiBucketsAggregation.Bucket} for reduction.
* Adds a {@link B} for reduction.
*/
public void accept(MultiBucketsAggregation.Bucket bucket) {
public void accept(B bucket) {
count += bucket.getDocCount();
aggregatorsReducer.accept(bucket.getAggregations());
}

/**
* returns the bucket prototype.
*/
public B getProto() {
return proto;
}

/**
* returns the reduced {@link InternalAggregations}.
*/
public InternalAggregations get() {
public InternalAggregations getAggregations() {
return aggregatorsReducer.get();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,57 @@
import java.util.List;

/**
* Class for reducing a list of {@link MultiBucketsAggregation.Bucket} to a single
* {@link InternalAggregations} and the number of documents in a delayable fashion.
* Class for reducing a list of {@link B} to a single {@link InternalAggregations}
* and the number of documents in a delayable fashion.
*
* This class can be reused by calling {@link #reset()}.
* This class can be reused by calling {@link #reset(B)}.
*
* @see MultiBucketAggregatorsReducer
* @see BucketReducer
*/
public final class DelayedMultiBucketAggregatorsReducer {
public final class DelayedBucketReducer<B extends MultiBucketsAggregation.Bucket> {

private final AggregationReduceContext context;
// changes at reset time
private B proto;
// the maximum size of this array is the number of shards to be reduced. We currently do it in a batches of 256
// if we expect bigger batches, we might consider to use ObjectArray.
// by default. if we expect bigger batches, we might consider to use ObjectArray.
private final List<InternalAggregations> internalAggregations;
private long count = 0;

public DelayedMultiBucketAggregatorsReducer(AggregationReduceContext context) {
public DelayedBucketReducer(B proto, AggregationReduceContext context) {
this.proto = proto;
this.context = context;
this.internalAggregations = new ArrayList<>();
}

/**
* Adds a {@link MultiBucketsAggregation.Bucket} for reduction.
* Adds a {@link B} for reduction.
*/
public void accept(MultiBucketsAggregation.Bucket bucket) {
public void accept(B bucket) {
count += bucket.getDocCount();
internalAggregations.add(bucket.getAggregations());
}

/**
* returns the bucket prototype.
*/
public B getProto() {
return proto;
}

/**
* Reset the content of this reducer.
*/
public void reset() {
public void reset(B proto) {
this.proto = proto;
count = 0L;
internalAggregations.clear();
}

/**
* returns the reduced {@link InternalAggregations}.
*/
public InternalAggregations get() {
public InternalAggregations getAggregations() {
try (AggregatorsReducer aggregatorsReducer = new AggregatorsReducer(context, internalAggregations.size())) {
for (InternalAggregations agg : internalAggregations) {
aggregatorsReducer.accept(agg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
public abstract class FixedMultiBucketAggregatorsReducer<B extends MultiBucketsAggregation.Bucket> implements Releasable {

// we could use an ObjectArray here but these arrays are in normally small, so it is not worthy
private final MultiBucketAggregatorsReducer[] bucketsReducer;
private final List<B> protoList;
private final List<BucketReducer<B>> bucketReducer;

public FixedMultiBucketAggregatorsReducer(AggregationReduceContext reduceContext, int size, List<B> protoList) {
reduceContext.consumeBucketsAndMaybeBreak(protoList.size());
this.protoList = protoList;
this.bucketsReducer = new MultiBucketAggregatorsReducer[protoList.size()];
this.bucketReducer = new ArrayList<>(protoList.size());
for (int i = 0; i < protoList.size(); ++i) {
bucketsReducer[i] = new MultiBucketAggregatorsReducer(reduceContext, size);
bucketReducer.add(new BucketReducer<>(protoList.get(i), reduceContext, size));
}
}

Expand All @@ -40,30 +38,27 @@ public FixedMultiBucketAggregatorsReducer(AggregationReduceContext reduceContext
* of the list passed on the constructor
*/
public final void accept(List<B> buckets) {
assert buckets.size() == protoList.size();
int i = 0;
for (B bucket : buckets) {
bucketsReducer[i++].accept(bucket);
assert buckets.size() == bucketReducer.size();
for (int i = 0; i < buckets.size(); i++) {
bucketReducer.get(i).accept(buckets.get(i));
}
}

/**
* returns the reduced buckets.
*/
public final List<B> get() {
final List<B> reduceBuckets = new ArrayList<>(protoList.size());
for (int i = 0; i < protoList.size(); i++) {
final B proto = protoList.get(i);
final MultiBucketAggregatorsReducer reducer = bucketsReducer[i];
reduceBuckets.add(createBucket(proto, reducer.getDocCount(), reducer.get()));
final List<B> reduceBuckets = new ArrayList<>(bucketReducer.size());
for (final BucketReducer<B> reducer : bucketReducer) {
reduceBuckets.add(createBucket(reducer.getProto(), reducer.getDocCount(), reducer.getAggregations()));
}
return reduceBuckets;
}

protected abstract B createBucket(B proto, long focCount, InternalAggregations aggregations);
protected abstract B createBucket(B proto, long docCount, InternalAggregations aggregations);

@Override
public final void close() {
Releasables.close(bucketsReducer);
Releasables.close(bucketReducer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.DelayedMultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.bucket.DelayedBucketReducer;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -257,7 +257,7 @@ public void close() {
}

private class BucketsQueue implements Releasable {
private final ObjectObjectPagedHashMap<Object, DelayedMultiBucketAggregatorsReducer> bucketReducers;
private final ObjectObjectPagedHashMap<Object, DelayedBucketReducer<InternalBucket>> bucketReducers;
private final ObjectArrayPriorityQueue<InternalBucket> queue;
private final AggregationReduceContext reduceContext;

Expand All @@ -274,20 +274,20 @@ protected boolean lessThan(InternalBucket a, InternalBucket b) {

/** adds a bucket to the queue. Return false if the bucket is not competitive, otherwise true.*/
boolean add(InternalBucket bucket) {
DelayedMultiBucketAggregatorsReducer delayed = bucketReducers.get(bucket.key);
DelayedBucketReducer<InternalBucket> delayed = bucketReducers.get(bucket.key);
if (delayed == null) {
final InternalBucket out = queue.insertWithOverflow(bucket);
if (out == null) {
// bucket is added
delayed = new DelayedMultiBucketAggregatorsReducer(reduceContext);
delayed = new DelayedBucketReducer<>(bucket, reduceContext);
} else if (out == bucket) {
// bucket is not competitive
return false;
} else {
// bucket replaces existing bucket
delayed = bucketReducers.remove(out.key);
assert delayed != null;
delayed.reset();
delayed.reset(bucket);
}
bucketReducers.put(bucket.key, delayed);
}
Expand All @@ -307,15 +307,15 @@ List<InternalBucket> get() {
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
final var reducedFormats = bucket.formats;
final DelayedMultiBucketAggregatorsReducer reducer = Objects.requireNonNull(bucketReducers.get(bucket.key));
final DelayedBucketReducer<InternalBucket> reducer = Objects.requireNonNull(bucketReducers.get(bucket.key));
result[i] = new InternalBucket(
sourceNames,
reducedFormats,
bucket.key,
reverseMuls,
missingOrders,
reducer.getDocCount(),
reducer.get()
reducer.getAggregations()
);
}
return List.of(result);
Expand Down

0 comments on commit 47dbd61

Please sign in to comment.