Skip to content

Commit

Permalink
Fixed an issue where there are sug aggregations executing on a single…
Browse files Browse the repository at this point in the history
… shard, the reduce call was not propagated properly down the agg hierarchy.

Closes elastic#4843
  • Loading branch information
uboness committed Jan 23, 2014
1 parent 049ca67 commit 3fbce4d
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 114 deletions.
Expand Up @@ -115,15 +115,15 @@ public <A extends Aggregation> A get(String name) {
/**
* Reduces the given lists of addAggregation.
*
* @param aggregationsList A list of addAggregation to reduce
* @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, CacheRecycler cacheRecycler) {
if (aggregationsList.isEmpty()) {
return null;
}

// first we collect all addAggregation of the same type and list them together
// first we collect all aggregations of the same type and list them together

Map<String, List<InternalAggregation>> aggByName = new HashMap<String, List<InternalAggregation>>();
for (InternalAggregations aggregations : aggregationsList) {
Expand All @@ -150,6 +150,17 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
return result;
}

/**
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
* @param cacheRecycler
*/
public void reduce(CacheRecycler cacheRecycler) {
for (int i = 0; i < aggregations.size(); i++) {
InternalAggregation aggregation = aggregations.get(i);
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), cacheRecycler)));
}
}

/** The fields required to write this addAggregation to xcontent */
static class Fields {
public static final XContentBuilderString AGGREGATIONS = new XContentBuilderString("aggregations");
Expand Down
Expand Up @@ -64,7 +64,9 @@ public InternalAggregations getAggregations() {
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return aggregations.get(0);
B reduced = ((B) aggregations.get(0));
reduced.aggregations.reduce(reduceContext.cacheRecycler());
return reduced;
}
B reduced = null;
List<InternalAggregations> subAggregationsList = new ArrayList<InternalAggregations>(aggregations.size());
Expand Down
Expand Up @@ -26,15 +26,14 @@

/**
* Creates an aggregation based on bucketing points into GeoHashes
*
*/
public class GeoHashGridBuilder extends AggregationBuilder<GeoHashGridBuilder> {


private String field;
private int precision=GeoHashGridParser.DEFAULT_PRECISION;
private int requiredSize=GeoHashGridParser.DEFAULT_MAX_NUM_CELLS;
private int shardSize=0;
private int precision = GeoHashGridParser.DEFAULT_PRECISION;
private int requiredSize = GeoHashGridParser.DEFAULT_MAX_NUM_CELLS;
private int shardSize = 0;

public GeoHashGridBuilder(String name) {
super(name, InternalGeoHashGrid.TYPE.name());
Expand All @@ -46,18 +45,19 @@ public GeoHashGridBuilder field(String field) {
}

public GeoHashGridBuilder precision(int precision) {
if((precision<1)||(precision>12))
{
throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of "+precision
+"must be between 1 and 12");
if ((precision < 1) || (precision > 12)) {
throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of " + precision
+ "must be between 1 and 12");
}
this.precision = precision;
return this;
}

public GeoHashGridBuilder size(int requiredSize) {
this.requiredSize = requiredSize;
return this;
}

public GeoHashGridBuilder shardSize(int shardSize) {
this.shardSize = shardSize;
return this;
Expand Down
Expand Up @@ -102,7 +102,10 @@ public int compareTo(Bucket other) {
}
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) {
if (buckets.size() == 1) {
return buckets.get(0);
// we still need to reduce the sub aggs
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler);
return bucket;
}
Bucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<InternalAggregations>(buckets.size());
Expand Down Expand Up @@ -166,7 +169,7 @@ public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
grid.trimExcessEntries();
grid.trimExcessEntries(reduceContext.cacheRecycler());
return grid;
}
InternalGeoHashGrid reduced = null;
Expand Down Expand Up @@ -227,21 +230,14 @@ public int getNumberOfBuckets() {
}


protected void trimExcessEntries() {
if (requiredSize >= buckets.size()) {
return;
}

if (buckets instanceof List) {
buckets = ((List) buckets).subList(0, requiredSize);
return;
}

protected void trimExcessEntries(CacheRecycler cacheRecycler) {
int i = 0;
for (Iterator<Bucket> iter = buckets.iterator(); iter.hasNext();) {
iter.next();
Bucket bucket = iter.next();
if (i++ >= requiredSize) {
iter.remove();
} else {
bucket.aggregations.reduce(cacheRecycler);
}
}
}
Expand Down
Expand Up @@ -77,7 +77,10 @@ public Aggregations getAggregations() {

Bucket reduce(List<Bucket> buckets, CacheRecycler cacheRecycler) {
if (buckets.size() == 1) {
return buckets.get(0);
// we only need to reduce the sub aggregations
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler);
return bucket;
}
List<InternalAggregations> aggregations = new ArrayList<InternalAggregations>(buckets.size());
Bucket reduced = null;
Expand Down Expand Up @@ -172,21 +175,27 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {

AbstractHistogramBase<B> histo = (AbstractHistogramBase<B>) aggregations.get(0);

if (minDocCount == 1) {
return aggregations.get(0);
for (B bucket : histo.buckets) {
((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler());
}
return histo;
}

AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0);

CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
List<HistogramBase.Bucket> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
List<B> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
HistogramBase.Bucket prevBucket = null;
ListIterator<HistogramBase.Bucket> iter = list.listIterator();
ListIterator<B> iter = list.listIterator();
if (minDocCount == 0) {
// we need to fill the gaps with empty buckets
while (iter.hasNext()) {
// look ahead on the next bucket without advancing the iter
// so we'll be able to insert elements at the right position
HistogramBase.Bucket nextBucket = list.get(iter.nextIndex());
((Bucket) nextBucket).aggregations.reduce(reduceContext.cacheRecycler());
if (prevBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey());
while (key != nextBucket.getKey()) {
Expand All @@ -198,8 +207,11 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
}
} else {
while (iter.hasNext()) {
if (iter.next().getDocCount() < minDocCount) {
Bucket bucket = (Bucket) iter.next();
if (bucket.getDocCount() < minDocCount) {
iter.remove();
} else {
bucket.aggregations.reduce(reduceContext.cacheRecycler());
}
}
}
Expand Down
Expand Up @@ -86,7 +86,10 @@ public Aggregations getAggregations() {

Bucket reduce(List<Bucket> ranges, CacheRecycler cacheRecycler) {
if (ranges.size() == 1) {
return ranges.get(0);
// we stil need to call reduce on all the sub aggregations
Bucket bucket = ranges.get(0);
bucket.aggregations.reduce(cacheRecycler);
return bucket;
}
Bucket reduced = null;
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
Expand Down Expand Up @@ -196,7 +199,11 @@ public List<B> buckets() {
public AbstractRangeBase reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (AbstractRangeBase) aggregations.get(0);
AbstractRangeBase<B> reduced = (AbstractRangeBase<B>) aggregations.get(0);
for (B bucket : reduced.buckets()) {
((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler());
}
return reduced;
}
List<List<Bucket>> rangesList = null;
for (InternalAggregation aggregation : aggregations) {
Expand Down
Expand Up @@ -107,7 +107,7 @@ public InternalTerms reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries();
terms.trimExcessEntries(reduceContext.cacheRecycler());
return terms;
}
InternalTerms reduced = null;
Expand Down
Expand Up @@ -60,7 +60,9 @@ public Aggregations getAggregations() {

public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) {
if (buckets.size() == 1) {
return buckets.get(0);
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler);
return bucket;
}
Bucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<InternalAggregations>(buckets.size());
Expand Down Expand Up @@ -121,12 +123,11 @@ public InternalTerms reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries();
terms.trimExcessEntries(reduceContext.cacheRecycler());
return terms;
}
InternalTerms reduced = null;

// TODO: would it be better to use a hppc map and then directly work on the backing array instead of using a PQ?
InternalTerms reduced = null;

Map<Text, List<InternalTerms.Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
Expand Down Expand Up @@ -172,14 +173,15 @@ public InternalTerms reduce(ReduceContext reduceContext) {
return reduced;
}

final void trimExcessEntries() {
final void trimExcessEntries(CacheRecycler cacheRecycler) {
final List<Bucket> newBuckets = Lists.newArrayList();
for (Bucket b : buckets) {
if (newBuckets.size() >= requiredSize) {
break;
}
if (b.docCount >= minDocCount) {
newBuckets.add(b);
b.aggregations.reduce(cacheRecycler);
}
}
buckets = newBuckets;
Expand Down
Expand Up @@ -104,7 +104,7 @@ public InternalTerms reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries();
terms.trimExcessEntries(reduceContext.cacheRecycler());
return terms;
}
InternalTerms reduced = null;
Expand Down

0 comments on commit 3fbce4d

Please sign in to comment.