Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shard_min_doc_count parameter to terms aggregation #6143

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -290,7 +290,9 @@ default, the node coordinating the search process will request each shard to pro
and once all shards respond, it will reduce the results to the final list that will then be returned to the client.
If the number of unique terms is greater than `size`, the returned list can be slightly off and not accurate
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned).
size buckets was not returned).

coming[1.2.0] If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.

To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
Expand All @@ -300,7 +302,11 @@ Low-frequency terms can turn out to be the most interesting ones once all result
significant_terms aggregation can produce higher-quality results when the `shard_size` parameter is set to
values significantly higher than the `size` setting. This ensures that a bigger volume of promising candidate terms are given
a consolidated review by the reducing node before the final selection. Obviously large candidate term lists
will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced.
will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced. If `shard_size` is set to -1 (the default) then `shard_size` will be automatically estimated based on the number of shards and the `size` parameter.


coming[1.2.0] If set to `0`, the `shard_size` will be set to `Integer.MAX_VALUE`.


NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
Expand Down Expand Up @@ -330,6 +336,8 @@ The above aggregation would only return tags which have been found in 10 hits or

Terms that score highly will be collected on a shard level and merged with the terms collected from other shards in a second step. However, the shard does not have the information about the global term frequencies available. The decision if a term is added to a candidate list depends only on the score computed on the shard using local shard frequencies, not the global frequencies of the word. The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`. This might cause many (globally) high frequent terms to be missing in the final result if low frequent but high scoring terms populated the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards. However, this increases memory consumption and network traffic.

coming[1.2.0] `shard_min_doc_count` parameter

The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the `shard_min_doc_count`. If your dictionary contains many low frequent words and you are not interested in these (for example misspellings), then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a resonable certainty not reach the required `min_doc_count` even after merging the local frequencies. `shard_min_doc_count` is set to `1` per default and has no effect unless you explicitly set it.


Expand Down
Expand Up @@ -50,17 +50,19 @@ default, the node coordinating the search process will request each shard to pro
and once all shards respond, it will reduce the results to the final list that will then be returned to the client.
This means that if the number of unique terms is greater than `size`, the returned list is slightly off and not accurate
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned).
size buckets was not returned). If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.


The higher the requested `size` is, the more accurate the results will be, but also, the more expensive it will be to
compute the final results (both due to bigger priority queues that are managed on a shard level and due to bigger data
transfers between the nodes and the client).
transfers between the nodes and the client).

The `shard_size` parameter can be used to minimize the extra work that comes with bigger requested `size`. When defined,
it will determine how many terms the coordinating node will request from each shard. Once all the shards responded, the
coordinating node will then reduce them to a final result which will be based on the `size` parameter - this way,
one can increase the accuracy of the returned terms and avoid the overhead of streaming a big list of buckets back to
the client.
the client. If set to `0`, the `shard_size` will be set to `Integer.MAX_VALUE`.


NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
Expand Down Expand Up @@ -184,7 +186,7 @@ PATH := <AGG_NAME>[<AGG_SEPARATOR><AGG_NAME>]*[<METRIC_SEPARATOR

The above will sort the countries buckets based on the average height among the female population.

==== Minimum document count
==== Minimum document count

It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option:

Expand All @@ -204,13 +206,23 @@ It is possible to only return terms that match more than a configured number of

The above aggregation would only return tags which have been found in 10 hits or more. Default value is `1`.


Terms are collected and ordered on a shard level and merged with the terms collected from other shards in a second step. However, the shard does not have the information about the global document count available. The decision if a term is added to a candidate list depends only on the order computed on the shard using local shard frequencies. The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`. This might cause many (globally) high frequent terms to be missing in the final result if low frequent terms populated the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards. However, this increases memory consumption and network traffic.

coming[1.2.0] `shard_min_doc_count` parameter

The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the `shard_min_doc_count`. If your dictionary contains many low frequent terms and you are not interested in those (for example misspellings), then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a resonable certainty not reach the required `min_doc_count` even after merging the local counts. `shard_min_doc_count` is set to `0` per default and has no effect unless you explicitly set it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add coming[1.2.0]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done




NOTE: Setting `min_doc_count`=`0` will also return buckets for terms that didn't match any hit. However, some of
the returned terms which have a document count of zero might only belong to deleted documents, so there is
no warranty that a `match_all` query would find a positive document count for those terms.

WARNING: When NOT sorting on `doc_count` descending, high values of `min_doc_count` may return a number of buckets
which is less than `size` because not enough data was gathered from the shards. Missing buckets can be
back by increasing `shard_size`.
Setting `shard_min_doc_count` too high will cause terms to be filtered out on a shard level. This value should be set much lower than `min_doc_count/#shards`.

==== Script

Expand Down
Expand Up @@ -42,16 +42,14 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri

protected long numCollectedDocs;
protected final SignificantTermsAggregatorFactory termsAggFactory;
protected long shardMinDocCount;

public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
long estimatedBucketCount, long maxOrd, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount,
long estimatedBucketCount, long maxOrd, BucketCountThresholds bucketCountThresholds,
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent);
super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, bucketCountThresholds, includeExclude, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
this.shardMinDocCount = shardMinDocCount;
}

@Override
Expand All @@ -68,11 +66,11 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
}

final int size;
if (minDocCount == 0) {
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(globalOrdinals.getMaxOrd(), shardSize);
size = (int) Math.min(globalOrdinals.getMaxOrd(), bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), shardSize);
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
long supersetSize = termsAggFactory.prepareBackground(context);
long subsetSize = numCollectedDocs;
Expand All @@ -85,7 +83,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
}
final long bucketOrd = getBucketOrd(globalTermOrd);
final long bucketDocCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
if (minDocCount > 0 && bucketDocCount == 0) {
if (bucketCountThresholds.getMinDocCount() > 0 && bucketDocCount == 0) {
continue;
}
if (spare == null) {
Expand All @@ -102,7 +100,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
// Back at the central reducer these properties will be updated with
// global stats
spare.updateScore();
if (spare.subsetDf >= shardMinDocCount) {
if (spare.subsetDf >= bucketCountThresholds.getShardMinDocCount()) {
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
}
}
Expand All @@ -116,7 +114,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
list[i] = bucket;
}

return new SignificantStringTerms(subsetSize, supersetSize, name, requiredSize, minDocCount, Arrays.asList(list));
return new SignificantStringTerms(subsetSize, supersetSize, name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list));
}

@Override
Expand All @@ -125,7 +123,7 @@ public SignificantStringTerms buildEmptyAggregation() {
ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
return new SignificantStringTerms(0, supersetSize, name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalSignificantTerms.Bucket>emptyList());
}

@Override
Expand All @@ -137,8 +135,8 @@ public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {

private final LongHash bucketOrds;

public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, includeExclude, aggregationContext, parent, termsAggFactory);
public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, bucketCountThresholds, includeExclude, aggregationContext, parent, termsAggFactory);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.bigArrays());
}

Expand Down
Expand Up @@ -39,17 +39,15 @@
public class SignificantLongTermsAggregator extends LongTermsAggregator {

public SignificantLongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, @Nullable ValueFormat format,
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount,
long estimatedBucketCount, BucketCountThresholds bucketCountThresholds,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, format, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, aggregationContext, parent);
super(name, factories, valuesSource, format, estimatedBucketCount, null, bucketCountThresholds, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
this.shardMinDocCount = shardMinDocCount;
}

protected long numCollectedDocs;
private final SignificantTermsAggregatorFactory termsAggFactory;
protected long shardMinDocCount;

@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
Expand All @@ -61,7 +59,7 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException {
public SignificantLongTerms buildAggregation(long owningBucketOrdinal) {
assert owningBucketOrdinal == 0;

final int size = (int) Math.min(bucketOrds.size(), shardSize);
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

long supersetSize = termsAggFactory.prepareBackground(context);
long subsetSize = numCollectedDocs;
Expand All @@ -83,7 +81,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) {
spare.updateScore();

spare.bucketOrd = i;
if (spare.subsetDf >= shardMinDocCount) {
if (spare.subsetDf >= bucketCountThresholds.getShardMinDocCount()) {
spare = (SignificantLongTerms.Bucket) ordered.insertWithOverflow(spare);
}
}
Expand All @@ -94,7 +92,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return new SignificantLongTerms(subsetSize, supersetSize, name, formatter, requiredSize, minDocCount, Arrays.asList(list));
return new SignificantLongTerms(subsetSize, supersetSize, name, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), Arrays.asList(list));
}

@Override
Expand All @@ -103,7 +101,7 @@ public SignificantLongTerms buildEmptyAggregation() {
ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantLongTerms(0, supersetSize, name, formatter, requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
return new SignificantLongTerms(0, supersetSize, name, formatter, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), Collections.<InternalSignificantTerms.Bucket>emptyList());
}

@Override
Expand Down