Skip to content

Commit

Permalink
Aggregations: Return the sum of the doc counts of other buckets.
Browse files Browse the repository at this point in the history
This commit adds a new field to the response of the terms aggregation called
`sum_other_doc_count` which is equal to the sum of the doc counts of the buckets
that did not make it to the list of top buckets. It is typically useful to have
a sector called eg. `other` when using terms aggregations to build pie charts.

Example query and response:

```json
GET test/_search?search_type=count
{
  "aggs": {
    "colors": {
      "terms": {
        "field": "color",
        "size": 3
      }
    }
  }
}
```

```json
{
   [...],
   "aggregations": {
      "colors": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 4,
         "buckets": [
            {
               "key": "blue",
               "doc_count": 65
            },
            {
               "key": "red",
               "doc_count": 14
            },
            {
               "key": "brown",
               "doc_count": 3
            }
         ]
      }
   }
}
```

Close elastic#8213
  • Loading branch information
jpountz committed Oct 27, 2014
1 parent 2319e73 commit 5e283da
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 43 deletions.
Expand Up @@ -25,7 +25,9 @@ Response:
"aggregations" : {
"genders" : {
"buckets" : [
"doc_count_error_upper_bound": 0, <1>
"sum_other_doc_count": 0, <2>
"buckets" : [ <3>
{
"key" : "male",
"doc_count" : 10
Expand All @@ -40,6 +42,10 @@ Response:
}
--------------------------------------------------

<1> an upper bound of the error on the document counts for each term, see <<search-aggregations-bucket-terms-aggregation-approximate-counts,below>>
<2> when there are lots of unique terms, elasticsearch only returns the top terms; this number is the sum of the document counts for all buckets that are not part of the response
<3> the list of the top buckets, the meaning of `top` being defined by the <<search-aggregations-bucket-terms-aggregation-order,order>>

By default, the `terms` aggregation will return the buckets for the top ten terms ordered by the `doc_count`. One can
change this default behaviour by setting the `size` parameter.

Expand All @@ -52,6 +58,7 @@ This means that if the number of unique terms is greater than `size`, the return
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned). If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.

[[search-aggregations-bucket-terms-aggregation-approximate-counts]]
==== Document counts are approximate

As described above, the document counts (and the results of any sub aggregations) in the terms aggregation are not always
Expand Down Expand Up @@ -226,6 +233,7 @@ does not return a particular term which appears in the results from another shar
aggregation is either sorted by a sub aggregation or in order of ascending document count, the error in the document counts cannot be
determined and is given a value of -1 to indicate this.

[[search-aggregations-bucket-terms-aggregation-order]]
==== Order

The order of the buckets can be customized by setting the `order` parameter. By default, the buckets are ordered by
Expand Down
Expand Up @@ -45,7 +45,7 @@ public boolean shouldCollect() {

@Override
public InternalAggregation buildEmptyAggregation() {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0);
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0, 0);
}

}
Expand Up @@ -98,8 +98,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

DoubleTerms() {} // for serialization

public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
this.formatter = formatter;
}

Expand All @@ -109,8 +109,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new DoubleTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new DoubleTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -132,6 +132,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -162,6 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeDouble(((Bucket) bucket).term);
Expand All @@ -176,6 +182,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down
Expand Up @@ -69,7 +69,7 @@ private static DoubleTerms convertToDouble(LongTerms terms) {
for (int i = 0; i < buckets.length; ++i) {
buckets[i] = convertToDouble(buckets[i]);
}
return new DoubleTerms(terms.getName(), terms.order, terms.formatter, terms.requiredSize, terms.shardSize, terms.minDocCount, Arrays.asList(buckets), terms.showTermDocCountError, terms.docCountError);
return new DoubleTerms(terms.getName(), terms.order, terms.formatter, terms.requiredSize, terms.shardSize, terms.minDocCount, Arrays.asList(buckets), terms.showTermDocCountError, terms.docCountError, terms.otherDocCount);
}

}
Expand Up @@ -150,6 +150,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
long otherDocCount = 0;
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
Expand All @@ -161,6 +162,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (bucketCountThresholds.getMinDocCount() > 0 && bucketDocCount == 0) {
continue;
}
otherDocCount += bucketDocCount;
spare.globalOrd = globalTermOrd;
spare.bucketOrd = bucketOrd;
spare.docCount = bucketDocCount;
Expand All @@ -182,6 +184,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
copy(globalOrds.lookupOrd(bucket.globalOrd), scratch);
list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0);
list[i].bucketOrd = bucket.bucketOrd;
otherDocCount -= list[i].docCount;
}
//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
Expand All @@ -193,7 +196,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
bucket.docCountError = 0;
}

return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0);
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount);
}

/** This is used internally only, just for compare using global ordinal instead of term bytes in the PQ */
Expand Down
Expand Up @@ -37,6 +37,7 @@
public abstract class InternalTerms extends InternalAggregation implements Terms, ToXContent, Streamable {

protected static final String DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME = "doc_count_error_upper_bound";
protected static final String SUM_OF_OTHER_DOC_COUNTS = "sum_other_doc_count";

public static abstract class Bucket extends Terms.Bucket {

Expand Down Expand Up @@ -104,10 +105,11 @@ public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
protected Map<String, Bucket> bucketMap;
protected long docCountError;
protected boolean showTermDocCountError;
protected long otherDocCount;

protected InternalTerms() {} // for serialization

protected InternalTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<Bucket> buckets, boolean showTermDocCountError, long docCountError) {
protected InternalTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name);
this.order = order;
this.requiredSize = requiredSize;
Expand All @@ -116,6 +118,7 @@ protected InternalTerms(String name, InternalOrder order, int requiredSize, int
this.buckets = buckets;
this.showTermDocCountError = showTermDocCountError;
this.docCountError = docCountError;
this.otherDocCount = otherDocCount;
}

@Override
Expand All @@ -139,14 +142,21 @@ public long getDocCountError() {
return docCountError;
}

@Override
public long getSumOfOtherDocCounts() {
return otherDocCount;
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();

Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
long sumDocCountError = 0;
long otherDocCount = 0;
for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation;
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;
if (terms.buckets.size() < this.shardSize || this.order == InternalOrder.TERM_ASC || this.order == InternalOrder.TERM_DESC) {
thisAggDocCountError = 0;
Expand Down Expand Up @@ -182,7 +192,10 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
}
}
if (b.docCount >= minDocCount) {
ordered.insertWithOverflow(b);
Terms.Bucket removed = ordered.insertWithOverflow(b);
if (removed != null) {
otherDocCount += removed.getDocCount();
}
}
}
Bucket[] list = new Bucket[ordered.size()];
Expand All @@ -195,9 +208,9 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
} else {
docCountError = aggregations.size() == 1 ? 0 : sumDocCountError;
}
return newAggregation(name, Arrays.asList(list), showTermDocCountError, docCountError);
return newAggregation(name, Arrays.asList(list), showTermDocCountError, docCountError, otherDocCount);
}

protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets, boolean showTermDocCountError, long docCountError);
protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount);

}
Expand Up @@ -99,8 +99,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

LongTerms() {} // for serialization

public LongTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public LongTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
this.formatter = formatter;
}

Expand All @@ -110,8 +110,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new LongTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new LongTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -133,6 +133,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -163,6 +166,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeLong(((Bucket) bucket).term);
Expand All @@ -177,6 +183,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down
Expand Up @@ -115,6 +115,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {

final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

long otherDocCount = 0;
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
LongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
Expand All @@ -123,6 +124,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
}
spare.term = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
otherDocCount += spare.docCount;
spare.bucketOrd = i;
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (LongTerms.Bucket) ordered.insertWithOverflow(spare);
Expand All @@ -136,6 +138,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
final LongTerms.Bucket bucket = (LongTerms.Bucket) ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
otherDocCount -= bucket.docCount;
}

runDeferredCollections(survivingBucketOrds);
Expand All @@ -146,13 +149,13 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
list[i].docCountError = 0;
}

return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0);
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount);
}


@Override
public InternalAggregation buildEmptyAggregation() {
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0);
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0, 0);
}

@Override
Expand Down
Expand Up @@ -98,8 +98,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

StringTerms() {} // for serialization

public StringTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public StringTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -108,8 +108,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new StringTerms(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new StringTerms(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -130,6 +130,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -159,6 +162,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeBytesRef(((Bucket) bucket).termBytes);
Expand All @@ -173,6 +179,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down

0 comments on commit 5e283da

Please sign in to comment.