Skip to content

Commit

Permalink
Improve reduction of terms aggregations (#61779)
Browse files Browse the repository at this point in the history
* Improve reduction of terms aggregations

Today, the terms aggregation reduces multiple aggregations at once using a map
to group same buckets together. This operation can be costly since it requires
to lookup every bucket in a global map with no particular order.
This commit changes how term buckets are sorted by shards and partial reduces in
order to be able to reduce results using a merge-sort strategy.
For bwc, results are merged with the legacy code if any of the aggregations use
a different sort (if it was returned by a node in prior versions).

Relates #51857
  • Loading branch information
jimczi committed Sep 4, 2020
1 parent 50a74f9 commit 49ae2bb
Show file tree
Hide file tree
Showing 24 changed files with 421 additions and 226 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;

import java.util.Iterator;

public class IteratorAndCurrent<B extends InternalMultiBucketAggregation.InternalBucket> implements Iterator<B> {
private final Iterator<B> iterator;
private B current;

public IteratorAndCurrent(Iterator<B> iterator) {
this.iterator = iterator;
this.current = iterator.next();
}

public Iterator<B> getIterator() {
return iterator;
}

public B current() {
return current;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public B next() {
return current = iterator.next();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;

Expand All @@ -38,7 +39,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -270,17 +270,6 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations);
}

private static class IteratorAndCurrent {

private final Iterator<Bucket> iterator;
private Bucket current;

IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}

/**
* This method works almost exactly the same as
* InternalDateHistogram#reduceBuckets(List, ReduceContext), the different
Expand All @@ -305,10 +294,10 @@ private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations,
}
Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);

final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return a.current.key < b.current.key;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return a.current().key < b.current().key;
}
};
for (InternalAggregation aggregation : aggregations) {
Expand All @@ -322,25 +311,24 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
long key = reduceRounding.round(pq.top().current.key);
long key = reduceRounding.round(pq.top().current().key);

do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();

if (reduceRounding.round(top.current.key) != key) {
if (reduceRounding.round(top.current().key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
key = reduceRounding.round(top.current().key);
}

currentBuckets.add(top.current);
currentBuckets.add(top.current());

if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert top.current().key > key: "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -289,24 +289,12 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}

private static class IteratorAndCurrent {

private final Iterator<Bucket> iterator;
private Bucket current;

IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}

}

private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return a.current.key < b.current.key;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return a.current().key < b.current().key;
}
};
for (InternalAggregation aggregation : aggregations) {
Expand All @@ -320,27 +308,26 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current.key;
double key = pq.top().current().key;

do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();

if (top.current.key != key) {
if (top.current().key != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
}
currentBuckets.clear();
key = top.current.key;
key = top.current().key;
}

currentBuckets.add(top.current);
currentBuckets.add(top.current());

if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert top.current().key > key : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -279,24 +279,12 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}

private static class IteratorAndCurrent {

private final Iterator<Bucket> iterator;
private Bucket current;

IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}

}

private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return Double.compare(a.current.key, b.current.key) < 0;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return Double.compare(a.current().key, b.current().key) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
Expand All @@ -310,28 +298,27 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current.key;
double key = pq.top().current().key;

do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();

if (Double.compare(top.current.key, key) != 0) {
if (Double.compare(top.current().key, key) != 0) {
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
// Using Double.compare instead of != to handle NaN correctly.
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
}
currentBuckets.clear();
key = top.current.key;
key = top.current().key;
}

currentBuckets.add(top.current);
currentBuckets.add(top.current());

if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert Double.compare(next.key, top.current.key) > 0 : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -317,18 +317,6 @@ public Number nextKey(Number key) {
= */
private double nextKey(double key){ return key + 1; }

private static class IteratorAndCurrent {

private final Iterator<Bucket> iterator;
private Bucket current;

IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}

}

@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
Expand All @@ -350,10 +338,10 @@ protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
}

public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return Double.compare(a.current.centroid, b.current.centroid) < 0;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return Double.compare(a.current().centroid, b.current().centroid) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
Expand All @@ -365,27 +353,27 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {

List<Bucket> reducedBuckets = new ArrayList<>();
if(pq.size() > 0) {
double key = pq.top().current.centroid();
double key = pq.top().current().centroid();
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
do {
IteratorAndCurrent top = pq.top();
IteratorAndCurrent<Bucket> top = pq.top();

if (Double.compare(top.current.centroid(), key) != 0) {
if (Double.compare(top.current().centroid(), key) != 0) {
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = top.current.centroid();
key = top.current().centroid();
}

currentBuckets.add(top.current);
currentBuckets.add(top.current());

if (top.iterator.hasNext()) {
Bucket next = top.iterator.next();
assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid";
top.current = next;
if (top.hasNext()) {
Bucket prev = top.current();
top.next();
assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid";
pq.updateTop();
} else {
pq.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
}

protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
}

Expand Down
Loading

0 comments on commit 49ae2bb

Please sign in to comment.