Skip to content

Commit

Permalink
use shard_min_doc_count also in TermsAggregation
Browse files Browse the repository at this point in the history
This was discussed in issue #6041 and #5998 .

closes #6143
  • Loading branch information
brwe committed May 14, 2014
1 parent d4a0eb8 commit 08e5789
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 12 deletions.
Expand Up @@ -290,7 +290,9 @@ default, the node coordinating the search process will request each shard to pro
and once all shards respond, it will reduce the results to the final list that will then be returned to the client.
If the number of unique terms is greater than `size`, the returned list can be slightly off and not accurate
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned).
size buckets was not returned).

coming[1.2.0] If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.

To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
Expand All @@ -300,7 +302,11 @@ Low-frequency terms can turn out to be the most interesting ones once all result
significant_terms aggregation can produce higher-quality results when the `shard_size` parameter is set to
values significantly higher than the `size` setting. This ensures that a bigger volume of promising candidate terms are given
a consolidated review by the reducing node before the final selection. Obviously large candidate term lists
will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced.
will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced. If `shard_size` is set to -1 (the default) then `shard_size` will be automatically estimated based on the number of shards and the `size` parameter.


coming[1.2.0] If set to `0`, the `shard_size` will be set to `Integer.MAX_VALUE`.


NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
Expand Down Expand Up @@ -330,6 +336,8 @@ The above aggregation would only return tags which have been found in 10 hits or

Terms that score highly will be collected on a shard level and merged with the terms collected from other shards in a second step. However, the shard does not have the information about the global term frequencies available. The decision if a term is added to a candidate list depends only on the score computed on the shard using local shard frequencies, not the global frequencies of the word. The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`. This might cause many (globally) high frequent terms to be missing in the final result if low frequent but high scoring terms populated the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards. However, this increases memory consumption and network traffic.

coming[1.2.0] `shard_min_doc_count` parameter

The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the `shard_min_doc_count`. If your dictionary contains many low frequent words and you are not interested in these (for example misspellings), then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a resonable certainty not reach the required `min_doc_count` even after merging the local frequencies. `shard_min_doc_count` is set to `1` per default and has no effect unless you explicitly set it.


Expand Down
Expand Up @@ -50,17 +50,19 @@ default, the node coordinating the search process will request each shard to pro
and once all shards respond, it will reduce the results to the final list that will then be returned to the client.
This means that if the number of unique terms is greater than `size`, the returned list is slightly off and not accurate
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned).
size buckets was not returned). If set to `0`, the `size` will be set to `Integer.MAX_VALUE`.


The higher the requested `size` is, the more accurate the results will be, but also, the more expensive it will be to
compute the final results (both due to bigger priority queues that are managed on a shard level and due to bigger data
transfers between the nodes and the client).
transfers between the nodes and the client).

The `shard_size` parameter can be used to minimize the extra work that comes with bigger requested `size`. When defined,
it will determine how many terms the coordinating node will request from each shard. Once all the shards responded, the
coordinating node will then reduce them to a final result which will be based on the `size` parameter - this way,
one can increase the accuracy of the returned terms and avoid the overhead of streaming a big list of buckets back to
the client.
the client. If set to `0`, the `shard_size` will be set to `Integer.MAX_VALUE`.


NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
Expand Down Expand Up @@ -184,7 +186,7 @@ PATH := <AGG_NAME>[<AGG_SEPARATOR><AGG_NAME>]*[<METRIC_SEPARATOR

The above will sort the countries buckets based on the average height among the female population.

==== Minimum document count
==== Minimum document count

It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option:

Expand All @@ -204,13 +206,23 @@ It is possible to only return terms that match more than a configured number of

The above aggregation would only return tags which have been found in 10 hits or more. Default value is `1`.


Terms are collected and ordered on a shard level and merged with the terms collected from other shards in a second step. However, the shard does not have the information about the global document count available. The decision if a term is added to a candidate list depends only on the order computed on the shard using local shard frequencies. The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`. This might cause many (globally) high frequent terms to be missing in the final result if low frequent terms populated the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards. However, this increases memory consumption and network traffic.

coming[1.2.0] `shard_min_doc_count` parameter

The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the `shard_min_doc_count`. If your dictionary contains many low frequent terms and you are not interested in those (for example misspellings), then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a resonable certainty not reach the required `min_doc_count` even after merging the local counts. `shard_min_doc_count` is set to `0` per default and has no effect unless you explicitly set it.



NOTE: Setting `min_doc_count`=`0` will also return buckets for terms that didn't match any hit. However, some of
the returned terms which have a document count of zero might only belong to deleted documents, so there is
no warranty that a `match_all` query would find a positive document count for those terms.

WARNING: When NOT sorting on `doc_count` descending, high values of `min_doc_count` may return a number of buckets
which is less than `size` because not enough data was gathered from the shards. Missing buckets can be
back by increasing `shard_size`.
Setting `shard_min_doc_count` too high will cause terms to be filtered out on a shard level. This value should be set much lower than `min_doc_count/#shards`.

==== Script

Expand Down
Expand Up @@ -112,7 +112,9 @@ public DoubleTerms buildAggregation(long owningBucketOrdinal) {
spare.term = Double.longBitsToDouble(bucketOrds.get(i));
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = (DoubleTerms.Bucket) ordered.insertWithOverflow(spare);
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (DoubleTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
Expand Down
Expand Up @@ -138,7 +138,9 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
spare.bucketOrd = bucketOrd;
spare.docCount = bucketDocCount;
copy(globalValues.getValueByOrd(globalTermOrd), spare.termBytes);
spare = (StringTerms.Bucket) ordered.insertWithOverflow(spare);
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (StringTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
Expand Down
Expand Up @@ -112,7 +112,9 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
spare.term = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = (LongTerms.Bucket) ordered.insertWithOverflow(spare);
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (LongTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
Expand Down
Expand Up @@ -221,7 +221,9 @@ public boolean apply(BytesRef input) {
bucketOrds.get(i, spare.termBytes);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = (StringTerms.Bucket) ordered.insertWithOverflow(spare);
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = (StringTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
Expand Down
Expand Up @@ -69,6 +69,14 @@ public TermsBuilder minDocCount(long minDocCount) {
return this;
}

/**
* Set the minimum document count terms should have on the shard in order to appear in the response.
*/
public TermsBuilder shardMinDocCount(long shardMinDocCount) {
bucketCountThresholds.setShardMinDocCount(shardMinDocCount);
return this;
}

/**
* Define a regular expression that will determine what terms should be aggregated. The regular expression is based
* on the {@link java.util.regex.Pattern} class.
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand All @@ -41,7 +43,7 @@
/**
*
*/
public class SignificantTermsMinDocCountTests extends ElasticsearchIntegrationTest {
public class TermsShardMinDocCountTests extends ElasticsearchIntegrationTest {
private static final String index = "someindex";
private static final String type = "testtype";
public String randomExecutionHint() {
Expand All @@ -50,7 +52,7 @@ public String randomExecutionHint() {

// see https://github.com/elasticsearch/elasticsearch/issues/5998
@Test
public void shardMinDocCountTest() throws Exception {
public void shardMinDocCountSignificantTermsTest() throws Exception {

String termtype = "string";
if (randomBoolean()) {
Expand Down Expand Up @@ -108,4 +110,53 @@ private void addTermsDocs(String term, int numInClass, int numNotInClass, List<I
}
}

// see https://github.com/elasticsearch/elasticsearch/issues/5998
@Test
public void shardMinDocCountTermsTest() throws Exception {
final String [] termTypes = {"string", "long", "integer", "float", "double"};
String termtype = termTypes[randomInt(termTypes.length - 1)];

assertAcked(prepareCreate(index).setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0).addMapping(type, "{\"properties\":{\"text\": {\"type\": \"" + termtype + "\"}}}"));
ensureYellow(index);
List<IndexRequestBuilder> indexBuilders = new ArrayList<>();

addTermsDocs("1", 1, indexBuilders);//low doc freq but high score
addTermsDocs("2", 1, indexBuilders);
addTermsDocs("3", 1, indexBuilders);
addTermsDocs("4", 1, indexBuilders);
addTermsDocs("5", 3, indexBuilders);//low score but high doc freq
addTermsDocs("6", 3, indexBuilders);
indexRandom(true, indexBuilders);

// first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned
SearchResponse response = client().prepareSearch(index)
.addAggregation(
new TermsBuilder("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint()).order(Terms.Order.term(true))
)
.execute()
.actionGet();
assertSearchResponse(response);
Terms sigterms = response.getAggregations().get("myTerms");
assertThat(sigterms.getBuckets().size(), equalTo(0));


response = client().prepareSearch(index)
.addAggregation(
new TermsBuilder("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint()).order(Terms.Order.term(true))
)
.execute()
.actionGet();
assertSearchResponse(response);
sigterms = response.getAggregations().get("myTerms");
assertThat(sigterms.getBuckets().size(), equalTo(2));

}

private void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
String sourceClass = "{\"text\": \"" + term + "\"}";
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex(index, type).setSource(sourceClass));
}

}
}

0 comments on commit 08e5789

Please sign in to comment.