Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an option to show the upper bound of the error for the terms aggregation #6778

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
158 changes: 157 additions & 1 deletion docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc
Expand Up @@ -43,7 +43,7 @@ Response:
By default, the `terms` aggregation will return the buckets for the top ten terms ordered by the `doc_count`. One can
change this default behaviour by setting the `size` parameter.

==== Size & Shard Size
==== Size

The `size` parameter can be set to define how many term buckets should be returned out of the overall terms list. By
default, the node coordinating the search process will request each shard to provide its own top `size` term buckets
Expand All @@ -52,6 +52,87 @@ This means that if the number of unique terms is greater than `size`, the return
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned). If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.

==== Document counts are approximate

As described above, the document counts (and the results of any sub aggregations) in the terms aggregation are not always
accurate. This is because each shard provides its own view of what the ordered list of terms should be and these are
combined to give a final view. Consider the following scenario:

A request is made to obtain the top 5 terms in the field product, ordered by descending document count from an index with
3 shards. In this case each shard is asked to give its top 5 terms.

[source,js]
--------------------------------------------------
{
"aggs" : {
"products" : {
"terms" : {
"field" : "product",
"size" : 5
}
}
}
}
--------------------------------------------------

The terms for each of the three shards are shown below with their
respective document counts in brackets:

[width="100%",cols="^2,^2,^2,^2",options="header"]
|=========================================================
| | Shard A | Shard B | Shard C

| 1 | Product A (25) | Product A (30) | Product A (45)
| 2 | Product B (18) | Product B (25) | Product C (44)
| 3 | Product C (6) | Product F (17) | Product Z (36)
| 4 | Product D (3) | Product Z (16) | Product G (30)
| 5 | Product E (2) | Product G (15) | Product E (29)
| 6 | Product F (2) | Product H (14) | Product H (28)
| 7 | Product G (2) | Product I (10) | Product Q (2)
| 8 | Product H (2) | Product Q (6) | Product D (1)
| 9 | Product I (1) | Product J (8) |
| 10 | Product J (1) | Product C (4) |

|=========================================================

The shards will return their top 5 terms so the results from the shards will be:


[width="100%",cols="^2,^2,^2,^2",options="header"]
|=========================================================
| | Shard A | Shard B | Shard C

| 1 | Product A (25) | Product A (30) | Product A (45)
| 2 | Product B (18) | Product B (25) | Product C (44)
| 3 | Product C (6) | Product F (17) | Product Z (36)
| 4 | Product D (3) | Product Z (16) | Product G (30)
| 5 | Product E (2) | Product G (15) | Product E (29)

|=========================================================

Taking the top 5 results from each of the shards (as requested) and combining them to make a final top 5 list produces
the following:

[width="40%",cols="^2,^2"]
|=========================================================

| 1 | Product A (100)
| 2 | Product Z (52)
| 3 | Product C (50)
| 4 | Product G (45)
| 5 | Product B (43)

|=========================================================

Because Product A was returned from all shards we know that its document count value is accurate. Product C was only
returned by shards A and C so its document count is shown as 50 but this is not an accurate count. Product C exists on
shard B, but its count of 4 was not high enough to put Product C into the top 5 list for that shard. Product Z was also
returned only by 2 shards but the third shard does not contain the term. There is no way of knowing, at the point of
combining the results to produce the final list of terms, that there is an error in the document count for Product C and
not for Product Z. Product H has a document count of 44 across all 3 shards but was not included in the final list of
terms because it did not make it into the top five terms on any of the shards.

==== Shard Size

The higher the requested `size` is, the more accurate the results will be, but also, the more expensive it will be to
compute the final results (both due to bigger priority queues that are managed on a shard level and due to bigger data
Expand All @@ -70,6 +151,81 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens
added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this
on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network.

==== Calculating Document Count Error

coming[1.4.0]

There are two error values which can be shown on the terms aggregation. The first gives a value for the aggregation as
a whole which represents the maximum potential document count for a term which did not make it into the final list of
terms. This is calculated as the sum of the document count from the last term returned from each shard .For the example
given above the value would be 46 (2 + 15 + 29). This means that in the worst case scenario a term which was not returned
could have the 4th highest document count.

[source,js]
--------------------------------------------------
{
...

"aggregations" : {
"products" : {
"doc_count_error_upper_bound" : 46,
"buckets" : [
{
"key" : "Product A",
"doc_count" : 100
},
{
"key" : "Product Z",
"doc_count" : 52
},
...
]
}
}
}
--------------------------------------------------

The second error value can be enabled by setting the `show_term_doc_count_error` parameter to true. This shows an error value
for each term returned by the aggregation which represents the 'worst case' error in the document count and can be useful when
deciding on a value for the `shard_size` parameter. This is calculated by summing the document counts for the last term returned
by all shards which did not return the term. In the example above the error in the document count for Product C would be 15 as
Shard B was the only shard not to return the term and the document count of the last termit did return was 15. The actual document
count of Product C was 54 so the document count was only actually off by 4 even though the worst case was that it would be off by
15. Product A, however has an error of 0 for its document count, since every shard returned it we can be confident that the count
returned is accurate.

[source,js]
--------------------------------------------------
{
...

"aggregations" : {
"products" : {
"doc_count_error_upper_bound" : 46,
"buckets" : [
{
"key" : "Product A",
"doc_count" : 100,
"doc_count_error_upper_bound" : 0
},
{
"key" : "Product Z",
"doc_count" : 52,
"doc_count_error_upper_bound" : 2
},
...
]
}
}
}
--------------------------------------------------

These errors can only be calculated in this way when the terms are ordered by descending document count. When the aggregation is
ordered by the terms values themselves (either ascending or descending) there is no error in the document count since if a shard
does not return a particular term which appears in the results from another shard, it must not have that term in its index. When the
aggregation is either sorted by a sub aggregation or in order of ascending document count, the error in the document counts cannot be
determined and is given a value of -1 to indicate this.

==== Order

The order of the buckets can be customized by setting the `order` parameter. By default, the buckets are ordered by
Expand Down
Expand Up @@ -47,7 +47,7 @@ public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, bucketCountThresholds, includeExclude, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST);
super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, bucketCountThresholds, includeExclude, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST, false);
this.termsAggFactory = termsAggFactory;
}

Expand Down
Expand Up @@ -42,7 +42,7 @@ public SignificantLongTermsAggregator(String name, AggregatorFactories factories
long estimatedBucketCount, BucketCountThresholds bucketCountThresholds,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, format, estimatedBucketCount, null, bucketCountThresholds, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST);
super(name, factories, valuesSource, format, estimatedBucketCount, null, bucketCountThresholds, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST, false);
this.termsAggFactory = termsAggFactory;
}

Expand Down
Expand Up @@ -46,7 +46,7 @@ public SignificantStringTermsAggregator(String name, AggregatorFactories factori
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, estimatedBucketCount, null, bucketCountThresholds, includeExclude, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST);
super(name, factories, valuesSource, estimatedBucketCount, null, bucketCountThresholds, includeExclude, aggregationContext, parent, SubAggCollectionMode.DEPTH_FIRST, false);
this.termsAggFactory = termsAggFactory;
}

Expand Down
Expand Up @@ -29,10 +29,13 @@
abstract class AbstractStringTermsAggregator extends TermsAggregator {


protected final boolean showTermDocCountError;

public AbstractStringTermsAggregator(String name, AggregatorFactories factories,
long estimatedBucketsCount, AggregationContext context, Aggregator parent,
InternalOrder order, BucketCountThresholds bucketCountThresholds, SubAggCollectionMode subAggCollectMode) {
InternalOrder order, BucketCountThresholds bucketCountThresholds, SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError) {
super(name, BucketAggregationMode.PER_BUCKET, factories, estimatedBucketsCount, context, parent, bucketCountThresholds, order, subAggCollectMode);
this.showTermDocCountError = showTermDocCountError;
}

@Override
Expand All @@ -42,7 +45,7 @@ public boolean shouldCollect() {

@Override
public InternalAggregation buildEmptyAggregation() {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList());
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket>emptyList(), showTermDocCountError, 0);
}

}
Expand Up @@ -36,6 +36,7 @@ public abstract class AbstractTermsParametersParser {
public static final ParseField MIN_DOC_COUNT_FIELD_NAME = new ParseField("min_doc_count");
public static final ParseField SHARD_MIN_DOC_COUNT_FIELD_NAME = new ParseField("shard_min_doc_count");
public static final ParseField REQUIRED_SIZE_FIELD_NAME = new ParseField("size");
public static final ParseField SHOW_TERM_DOC_COUNT_ERROR = new ParseField("show_term_doc_count_error");


//These are the results of the parsing.
Expand Down Expand Up @@ -64,7 +65,6 @@ public SubAggCollectionMode getCollectionMode() {
return collectMode;
}


public void parse(String aggregationName, XContentParser parser, SearchContext context, ValuesSourceParser vsParser, IncludeExclude.Parser incExcParser) throws IOException {
bucketCountThresholds = getDefaultBucketCountThresholds();
XContentParser.Token token;
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -31,7 +32,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
Expand All @@ -58,8 +58,8 @@ static class Bucket extends InternalTerms.Bucket {

double term;

public Bucket(double term, long docCount, InternalAggregations aggregations) {
super(docCount, aggregations);
public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
super(docCount, aggregations, showDocCountError, docCountError);
this.term = term;
}

Expand Down Expand Up @@ -89,17 +89,17 @@ Object getKeyAsObject() {
}

@Override
Bucket newBucket(long docCount, InternalAggregations aggs) {
return new Bucket(term, docCount, aggs);
Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
return new Bucket(term, docCount, aggs, showDocCountError, docCountError);
}
}

private @Nullable ValueFormatter formatter;

DoubleTerms() {} // for serialization

public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, long minDocCount, Collection<InternalTerms.Bucket> buckets) {
super(name, order, requiredSize, minDocCount, buckets);
public DoubleTerms(String name, InternalOrder order, @Nullable ValueFormatter formatter, int requiredSize, int shardSize, long minDocCount, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
super(name, order, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
this.formatter = formatter;
}

Expand All @@ -109,21 +109,40 @@ public Type type() {
}

@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
return new DoubleTerms(name, order, formatter, requiredSize, minDocCount, buckets);
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError) {
return new DoubleTerms(name, order, formatter, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError);
}

@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.docCountError = in.readLong();
} else {
this.docCountError = -1;
}
this.order = InternalOrder.Streams.readOrder(in);
this.formatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = readSize(in);
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.shardSize = readSize(in);
this.showTermDocCountError = in.readBoolean();
} else {
this.shardSize = requiredSize;
this.showTermDocCountError = false;
}
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
buckets.add(new Bucket(in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in)));
double term = in.readDouble();
long docCount = in.readVLong();
long bucketDocCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0) && showTermDocCountError) {
bucketDocCountError = in.readLong();
}
InternalAggregations aggregations = InternalAggregations.readAggregations(in);
buckets.add(new Bucket(term, docCount, aggregations, showTermDocCountError, bucketDocCountError));
}
this.buckets = buckets;
this.bucketMap = null;
Expand All @@ -132,20 +151,33 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeLong(docCountError);
}
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(formatter, out);
writeSize(requiredSize, out);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
writeSize(shardSize, out);
out.writeBoolean(showTermDocCountError);
}
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeDouble(((Bucket) bucket).term);
out.writeVLong(bucket.getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0) && showTermDocCountError) {
out.writeLong(bucket.docCountError);
}
((InternalAggregations) bucket.getAggregations()).writeTo(out);
}
}

@Override
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
Expand All @@ -154,6 +186,9 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term));
}
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
if (showTermDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError());
}
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
}
Expand Down