Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorReducer;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -201,50 +202,44 @@ protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceCont
return new AggregatorReducer() {
long globalSubsetSize = 0;
long globalSupersetSize = 0;
final Map<String, ReducerAndProto<B>> buckets = new HashMap<>();

final List<InternalSignificantTerms<A, B>> aggregations = new ArrayList<>(size);

// Compute the overall result set size and the corpus size using the
// top-level Aggregations from each shard
@Override
public void accept(InternalAggregation aggregation) {
@SuppressWarnings("unchecked")
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
final InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
// Compute the overall result set size and the corpus size using the
// top-level Aggregations from each shard
globalSubsetSize += terms.getSubsetSize();
globalSupersetSize += terms.getSupersetSize();
aggregations.add(terms);
for (B bucket : terms.getBuckets()) {
final ReducerAndProto<B> reducerAndProto = buckets.computeIfAbsent(
bucket.getKeyAsString(),
k -> new ReducerAndProto<>(new MultiBucketAggregatorsReducer(reduceContext, size), bucket)
);
reducerAndProto.reducer.accept(bucket);
reducerAndProto.subsetDf[0] += bucket.subsetDf;
reducerAndProto.supersetDf[0] += bucket.supersetDf;
}
}

@Override
public InternalAggregation get() {
final Map<String, List<B>> buckets = new HashMap<>();
for (InternalSignificantTerms<A, B> terms : aggregations) {
for (B bucket : terms.getBuckets()) {
List<B> existingBuckets = buckets.computeIfAbsent(bucket.getKeyAsString(), k -> new ArrayList<>(size));
// Adjust the buckets with the global stats representing the
// total size of the pots from which the stats are drawn
existingBuckets.add(
createBucket(
bucket.getSubsetDf(),
globalSubsetSize,
bucket.getSupersetDf(),
globalSupersetSize,
bucket.aggregations,
bucket
)
);
}
}

final SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext);
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
final BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
List<B> sameTermBuckets = entry.getValue();
final B b = reduceBucket(sameTermBuckets, reduceContext);
for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
final B b = createBucket(
reducerAndProto.subsetDf[0],
globalSubsetSize,
reducerAndProto.supersetDf[0],
globalSupersetSize,
reducerAndProto.reducer.get(),
reducerAndProto.proto
);
b.updateScore(heuristic);
if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
B removed = ordered.insertWithOverflow(b);
final B removed = ordered.insertWithOverflow(b);
if (removed == null) {
reduceContext.consumeBucketsAndMaybeBreak(1);
} else {
Expand All @@ -254,15 +249,28 @@ public InternalAggregation get() {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
}
}
B[] list = createBucketsArray(ordered.size());
final B[] list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
}

@Override
public void close() {
for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
Releasables.close(reducerAndProto.reducer);
}
}
};
}

private record ReducerAndProto<B>(MultiBucketAggregatorsReducer reducer, B proto, long[] subsetDf, long[] supersetDf) {
private ReducerAndProto(MultiBucketAggregatorsReducer reducer, B proto) {
this(reducer, proto, new long[] { 0 }, new long[] { 0 });
}
}

@Override
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
long supersetSize = samplingContext.scaleUp(getSupersetSize());
Expand All @@ -285,19 +293,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
);
}

private B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.isEmpty() == false;
long subsetDf = 0;
long supersetDf = 0;
for (B bucket : buckets) {
subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf;
}
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0));
}

abstract B createBucket(
long subsetDf,
long subsetSize,
Expand Down