Skip to content

Commit

Permalink
Aggregations: change to default shard_size in terms aggregation
Browse files Browse the repository at this point in the history
The default shard size in the terms aggregation now uses BucketUtils.suggestShardSideQueueSize() to set the shard size if the user does not specify it as a parameter.

Closes #6857
  • Loading branch information
colings86 committed Jul 24, 2014
1 parent 5487c56 commit dc9e9cb
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 9 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.internal.SearchContext;
Expand All @@ -32,7 +33,6 @@
*/
public class TermsParser implements Aggregator.Parser {


@Override
public String type() {
return StringTerms.TYPE.name();
Expand All @@ -41,19 +41,22 @@ public String type() {
@Override
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
TermsParametersParser aggParser = new TermsParametersParser();
ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context)
.scriptable(true)
.formattable(true)
.requiresSortedValues(true)
.requiresUniqueValues(true)
.build();
ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context).scriptable(true).formattable(true)
.requiresSortedValues(true).requiresUniqueValues(true).build();
IncludeExclude.Parser incExcParser = new IncludeExclude.Parser(aggregationName, StringTerms.TYPE, context);
aggParser.parse(aggregationName, parser, context, vsParser, incExcParser);

InternalOrder order = resolveOrder(aggParser.getOrderKey(), aggParser.isOrderAsc());
TermsAggregator.BucketCountThresholds bucketCountThresholds = aggParser.getBucketCountThresholds();
if (!(order == InternalOrder.TERM_ASC || order == InternalOrder.TERM_DESC)
&& bucketCountThresholds.getShardSize() == aggParser.getDefaultBucketCountThresholds().getShardSize()) {
// The user has not made a shardSize selection. Use default heuristic to avoid any wrong-ranking caused by distributed counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
}
bucketCountThresholds.ensureValidity();
InternalOrder order = resolveOrder(aggParser.getOrderKey(), aggParser.isOrderAsc());
return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(), aggParser.getExecutionHint(), aggParser.getCollectionMode());
return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(),
aggParser.getExecutionHint(), aggParser.getCollectionMode());
}

static InternalOrder resolveOrder(String key, boolean asc) {
Expand Down
Expand Up @@ -45,6 +45,31 @@ public void noShardSize_string() throws Exception {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<String, Long> expected = ImmutableMap.<String, Long>builder()
.put("1", 8l)
.put("3", 8l)
.put("2", 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string())));
}
}

@Test
public void shardSizeEqualsSize_string() throws Exception {
createIdx("type=string,index=not_analyzed");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Expand Down Expand Up @@ -109,6 +134,31 @@ public void withShardSize_string_singleShard() throws Exception {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKey())));
}
}

@Test
public void noShardSizeTermOrder_string() throws Exception {
createIdx("type=string,index=not_analyzed");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<String, Long> expected = ImmutableMap.<String, Long>builder()
.put("1", 8l)
.put("2", 5l)
.put("3", 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string())));
}
}

@Test
public void noShardSize_long() throws Exception {
Expand All @@ -123,6 +173,32 @@ public void noShardSize_long() throws Exception {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(3, 8l)
.put(2, 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}

@Test
public void shardSizeEqualsSize_long() throws Exception {

createIdx("type=long");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Expand Down Expand Up @@ -188,6 +264,32 @@ public void withShardSize_long_singleShard() throws Exception {
}
}

@Test
public void noShardSizeTermOrder_long() throws Exception {

createIdx("type=long");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(2, 5l)
.put(3, 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}

@Test
public void noShardSize_double() throws Exception {

Expand All @@ -201,6 +303,32 @@ public void noShardSize_double() throws Exception {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(3, 8l)
.put(2, 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}

@Test
public void shardSizeEqualsSize_double() throws Exception {

createIdx("type=double");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Expand Down Expand Up @@ -265,4 +393,30 @@ public void withShardSize_double_singleShard() throws Exception {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}

@Test
public void noShardSizeTermOrder_double() throws Exception {

createIdx("type=double");

indexData();

SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();

Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(2, 5l)
.put(3, 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
}

0 comments on commit dc9e9cb

Please sign in to comment.