Skip to content

Commit

Permalink
Aggregations: Return the sum of the doc counts of other buckets.
Browse files Browse the repository at this point in the history
This commit adds a new field to the response of the terms aggregation called
`sum_other_doc_count` which is equal to the sum of the doc counts of the buckets
that did not make it to the list of top buckets. It is typically useful to have
a sector called eg. `other` when using terms aggregations to build pie charts.

Example query and response:

```json
GET test/_search?search_type=count
{
  "aggs": {
    "colors": {
      "terms": {
        "field": "color",
        "size": 3
      }
    }
  }
}
```

```json
{
   [...],
   "aggregations": {
      "colors": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 4,
         "buckets": [
            {
               "key": "blue",
               "doc_count": 65
            },
            {
               "key": "red",
               "doc_count": 14
            },
            {
               "key": "brown",
               "doc_count": 3
            }
         ]
      }
   }
}
```

Close #8213
  • Loading branch information
jpountz committed Oct 27, 2014
1 parent b6221e0 commit 238e405
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ Response:
"aggregations" : {
"genders" : {
"buckets" : [
"doc_count_error_upper_bound": 0, <1>
"sum_other_doc_count": 0, <2>
"buckets" : [ <3>
{
"key" : "male",
"doc_count" : 10
Expand All @@ -40,6 +42,10 @@ Response:
}
--------------------------------------------------

<1> an upper bound of the error on the document counts for each term, see <<search-aggregations-bucket-terms-aggregation-approximate-counts,below>>
<2> when there are lots of unique terms, elasticsearch only returns the top terms; this number is the sum of the document counts for all buckets that are not part of the response
<3> the list of the top buckets, the meaning of `top` being defined by the <<search-aggregations-bucket-terms-aggregation-order,order>>

By default, the `terms` aggregation will return the buckets for the top ten terms ordered by the `doc_count`. One can
change this default behaviour by setting the `size` parameter.

Expand All @@ -52,6 +58,7 @@ This means that if the number of unique terms is greater than `size`, the return
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned). If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.

[[search-aggregations-bucket-terms-aggregation-approximate-counts]]
==== Document counts are approximate

As described above, the document counts (and the results of any sub aggregations) in the terms aggregation are not always
Expand Down Expand Up @@ -226,6 +233,7 @@ does not return a particular term which appears in the results from another shar
aggregation is either sorted by a sub aggregation or in order of ascending document count, the error in the document counts cannot be
determined and is given a value of -1 to indicate this.

[[search-aggregations-bucket-terms-aggregation-order]]
==== Order

The order of the buckets can be customized by setting the `order` parameter. By default, the buckets are ordered by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public boolean shouldCollect() {

@Override
public InternalAggregation buildEmptyAggregation() {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0);
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0, 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

DoubleTerms() {} // for serialization

public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
this.formatter = formatter;
}

Expand All @@ -109,8 +109,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new DoubleTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new DoubleTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -132,6 +132,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -162,6 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeDouble(((Bucket) bucket).term);
Expand All @@ -176,6 +182,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static DoubleTerms convertToDouble(LongTerms terms) {
for (int i = 0; i < buckets.length; ++i) {
buckets[i] = convertToDouble(buckets[i]);
}
return new DoubleTerms(terms.getName(), terms.order, terms.formatter, terms.requiredSize, terms.shardSize, terms.minDocCount, Arrays.asList(buckets), terms.showTermDocCountError, terms.docCountError);
return new DoubleTerms(terms.getName(), terms.order, terms.formatter, terms.requiredSize, terms.shardSize, terms.minDocCount, Arrays.asList(buckets), terms.showTermDocCountError, terms.docCountError, terms.otherDocCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
long otherDocCount = 0;
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
Expand All @@ -161,6 +162,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (bucketCountThresholds.getMinDocCount() > 0 && bucketDocCount == 0) {
continue;
}
otherDocCount += bucketDocCount;
spare.globalOrd = globalTermOrd;
spare.bucketOrd = bucketOrd;
spare.docCount = bucketDocCount;
Expand All @@ -182,6 +184,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
copy(globalOrds.lookupOrd(bucket.globalOrd), scratch);
list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0);
list[i].bucketOrd = bucket.bucketOrd;
otherDocCount -= list[i].docCount;
}
//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
Expand All @@ -193,7 +196,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
bucket.docCountError = 0;
}

return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0);
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount);
}

/** This is used internally only, just for compare using global ordinal instead of term bytes in the PQ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public abstract class InternalTerms extends InternalAggregation implements Terms, ToXContent, Streamable {

protected static final String DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME = "doc_count_error_upper_bound";
protected static final String SUM_OF_OTHER_DOC_COUNTS = "sum_other_doc_count";

public static abstract class Bucket extends Terms.Bucket {

Expand Down Expand Up @@ -104,10 +105,11 @@ public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
protected Map<String, Bucket> bucketMap;
protected long docCountError;
protected boolean showTermDocCountError;
protected long otherDocCount;

protected InternalTerms() {} // for serialization

protected InternalTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<Bucket> buckets, boolean showTermDocCountError, long docCountError) {
protected InternalTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name);
this.order = order;
this.requiredSize = requiredSize;
Expand All @@ -116,6 +118,7 @@ protected InternalTerms(String name, InternalOrder order, int requiredSize, int
this.buckets = buckets;
this.showTermDocCountError = showTermDocCountError;
this.docCountError = docCountError;
this.otherDocCount = otherDocCount;
}

@Override
Expand All @@ -139,14 +142,21 @@ public long getDocCountError() {
return docCountError;
}

@Override
public long getSumOfOtherDocCounts() {
return otherDocCount;
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();

Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
long sumDocCountError = 0;
long otherDocCount = 0;
for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation;
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;
if (terms.buckets.size() < this.shardSize || this.order == InternalOrder.TERM_ASC || this.order == InternalOrder.TERM_DESC) {
thisAggDocCountError = 0;
Expand Down Expand Up @@ -182,7 +192,10 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
}
}
if (b.docCount >= minDocCount) {
ordered.insertWithOverflow(b);
Terms.Bucket removed = ordered.insertWithOverflow(b);
if (removed != null) {
otherDocCount += removed.getDocCount();
}
}
}
Bucket[] list = new Bucket[ordered.size()];
Expand All @@ -195,9 +208,9 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
} else {
docCountError = aggregations.size() == 1 ? 0 : sumDocCountError;
}
return newAggregation(name, Arrays.asList(list), showTermDocCountError, docCountError);
return newAggregation(name, Arrays.asList(list), showTermDocCountError, docCountError, otherDocCount);
}

protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets, boolean showTermDocCountError, long docCountError);
protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount);

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

LongTerms() {} // for serialization

public LongTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public LongTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
this.formatter = formatter;
}

Expand All @@ -110,8 +110,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new LongTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new LongTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -133,6 +133,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -163,6 +166,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeLong(((Bucket) bucket).term);
Expand All @@ -177,6 +183,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {

final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

long otherDocCount = 0;
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
LongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
Expand All @@ -123,6 +124,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
}
spare.term = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
otherDocCount += spare.docCount;
spare.bucketOrd = i;
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (LongTerms.Bucket) ordered.insertWithOverflow(spare);
Expand All @@ -136,6 +138,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
final LongTerms.Bucket bucket = (LongTerms.Bucket) ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
otherDocCount -= bucket.docCount;
}

runDeferredCollections(survivingBucketOrds);
Expand All @@ -146,13 +149,13 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
list[i].docCountError = 0;
}

return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0);
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount);
}


@Override
public InternalAggregation buildEmptyAggregation() {
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0);
return new LongTerms(name, order, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {

StringTerms() {} // for serialization

public StringTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
public StringTerms(String name, InternalOrder order, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -108,8 +108,8 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new StringTerms(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount) {
return new StringTerms(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount);
}

@Override
Expand All @@ -130,6 +130,9 @@ public void readFrom(StreamInput in) throws IOException {
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.otherDocCount = in.readVLong();
}
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -159,6 +162,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVLong(otherDocCount);
}
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeBytesRef(((Bucket) bucket).termBytes);
Expand All @@ -173,6 +179,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand Down
Loading

0 comments on commit 238e405

Please sign in to comment.