From da9063d76643786aef7d32cffbf323f3db70b362 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 8 Jan 2014 15:08:18 +0100 Subject: [PATCH] Add new option `min_doc_count` to terms and histogram aggregations. `min_doc_count` is the minimum number of hits that a term or histogram key should match in order to appear in the response. `min_doc_count=0` replaces `compute_empty_buckets` for histograms and will behave exactly like facets' `all_terms=true` for terms aggregations. Close #4662 --- .../bucket/histogram-aggregation.asciidoc | 57 ++++ .../bucket/terms-aggregation.asciidoc | 28 ++ .../common/collect/Iterators2.java | 64 ++++ .../histogram/AbstractHistogramBase.java | 54 ++-- .../bucket/histogram/DateHistogramParser.java | 18 +- .../bucket/histogram/HistogramAggregator.java | 24 +- .../bucket/histogram/HistogramBuilder.java | 10 +- .../bucket/histogram/HistogramParser.java | 12 +- .../histogram/InternalDateHistogram.java | 8 +- .../bucket/histogram/InternalHistogram.java | 8 +- .../bucket/terms/DoubleTerms.java | 15 +- .../bucket/terms/DoubleTermsAggregator.java | 24 +- .../bucket/terms/InternalTerms.java | 34 +- .../aggregations/bucket/terms/LongTerms.java | 11 +- .../bucket/terms/LongTermsAggregator.java | 24 +- .../bucket/terms/StringTerms.java | 6 +- .../bucket/terms/StringTermsAggregator.java | 124 +++++++- .../bucket/terms/TermsAggregatorFactory.java | 14 +- .../bucket/terms/TermsBuilder.java | 12 + .../bucket/terms/TermsParser.java | 9 +- .../bucket/terms/UnmappedTerms.java | 6 +- .../bucket/terms/UnmappedTermsAggregator.java | 8 +- .../common/collect/Iterators2Tests.java | 50 +++ .../bucket/DateHistogramTests.java | 2 +- .../aggregations/bucket/DateRangeTests.java | 2 +- .../aggregations/bucket/DoubleTermsTests.java | 2 +- .../aggregations/bucket/FilterTests.java | 4 +- .../aggregations/bucket/GeoDistanceTests.java | 2 +- .../aggregations/bucket/HistogramTests.java | 2 +- .../aggregations/bucket/IPv4RangeTests.java | 2 +- .../aggregations/bucket/LongTermsTests.java | 2 +- .../aggregations/bucket/MinDocCountTests.java | 298 ++++++++++++++++++ .../aggregations/bucket/MissingTests.java | 2 +- .../aggregations/bucket/NestedTests.java | 2 +- .../aggregations/bucket/RangeTests.java | 2 +- .../aggregations/bucket/StringTermsTests.java | 4 +- .../search/aggregations/metrics/AvgTests.java | 2 +- .../metrics/ExtendedStatsTests.java | 2 +- .../search/aggregations/metrics/MaxTests.java | 2 +- .../search/aggregations/metrics/MinTests.java | 2 +- .../aggregations/metrics/StatsTests.java | 2 +- .../search/aggregations/metrics/SumTests.java | 2 +- 42 files changed, 822 insertions(+), 136 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/collect/Iterators2.java create mode 100644 src/test/java/org/elasticsearch/common/collect/Iterators2Tests.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountTests.java diff --git a/docs/reference/search/aggregations/bucket/histogram-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/histogram-aggregation.asciidoc index eb13117fee60b..4703e724817c9 100644 --- a/docs/reference/search/aggregations/bucket/histogram-aggregation.asciidoc +++ b/docs/reference/search/aggregations/bucket/histogram-aggregation.asciidoc @@ -159,6 +159,63 @@ If the histogram aggregation has a direct metrics sub-aggregation, the latter ca <2> There is no need to configure the `price` field for the `price_stats` aggregation as it will inherit it by default from its parent histogram aggregation. +==== Minimum document count + +It is possible to only return buckets that have a document count that is greater than or equal to a configured limit through the `min_doc_count` option. + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "prices" : { + "histogram" : { + "field" : "price", + "interval" : 50, + "min_doc_count": 10 + } + } + } +} +-------------------------------------------------- + +The above aggregation would only return buckets that contain 10 documents or more. Default value is `1`. + +NOTE: The special value `0` can be used to add empty buckets to the response between the minimum and the maximum buckets. Here is an example of what the response could look like: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "prices": { + "0": { + "key": 0, + "doc_count": 2 + }, + "50": { + "key": 50, + "doc_count": 0 + }, + "150": { + "key": 150, + "doc_count": 3 + }, + "200": { + "key": 150, + "doc_count": 0 + }, + "250": { + "key": 150, + "doc_count": 0 + }, + "300": { + "key": 150, + "doc_count": 1 + } + } + } +} +-------------------------------------------------- + ==== Response Format By default, the buckets are retuned as an ordered array. It is also possilbe to request the response as a hash instead keyed by the buckets keys: diff --git a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc index 35cfeb52d4268..d6d89c49dd25e 100644 --- a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc @@ -142,6 +142,34 @@ Ordering the buckets by multi value metrics sub-aggregation (identified by the a } -------------------------------------------------- +==== Minimum document count + +It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "tags" : { + "terms" : { + "field" : "tag", + "min_doc_count": 10 + } + } + } +} +-------------------------------------------------- + +The above aggregation would only return tags which have been found in 10 hits or more. Default value is `1`. + +NOTE: Setting `min_doc_count`=`0` will also return buckets for terms that didn't match any hit. However, some of + the returned terms which have a document count of zero might only belong to deleted documents, so there is + no warranty that a `match_all` query would find a positive document count for those terms. + +WARNING: When NOT sorting on `doc_count` descending, high values of `min_doc_count` may return a number of buckets + which is less than `size` because not enough data was gathered from the shards. Missing buckets can be + back by increasing `shard_size`. + ==== Script Generating the terms using a script: diff --git a/src/main/java/org/elasticsearch/common/collect/Iterators2.java b/src/main/java/org/elasticsearch/common/collect/Iterators2.java new file mode 100644 index 0000000000000..b2ca021a93987 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/collect/Iterators2.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; + +import java.util.Comparator; +import java.util.Iterator; + +public enum Iterators2 { + ; + + /** Remove duplicated elements from an iterator over sorted content. */ + public static Iterator deduplicateSorted(Iterator iterator, final Comparator comparator) { + final PeekingIterator it = Iterators.peekingIterator(iterator); + return new UnmodifiableIterator() { + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + final T ret = it.next(); + while (it.hasNext() && comparator.compare(ret, it.peek()) == 0) { + it.next(); + } + assert !it.hasNext() || comparator.compare(ret, it.peek()) < 0 : "iterator is not sorted: " + ret + " > " + it.peek(); + return ret; + } + + }; + } + + /** Return a merged view over several iterators, optionally deduplicating equivalent entries. */ + public static Iterator mergeSorted(Iterable> iterators, Comparator comparator, boolean deduplicate) { + Iterator it = Iterators.mergeSorted(iterators, comparator); + if (deduplicate) { + it = deduplicateSorted(it, comparator); + } + return it; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java index 8811d737be7e1..9b8db04b750e9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java @@ -118,7 +118,7 @@ public static interface Factory { String type(); - AbstractHistogramBase create(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed); + AbstractHistogramBase create(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed); Bucket createBucket(long key, long docCount, InternalAggregations aggregations); @@ -129,14 +129,17 @@ public static interface Factory { private InternalOrder order; private ValueFormatter formatter; private boolean keyed; + private long minDocCount; private EmptyBucketInfo emptyBucketInfo; protected AbstractHistogramBase() {} // for serialization - protected AbstractHistogramBase(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { + protected AbstractHistogramBase(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { super(name); this.buckets = buckets; this.order = order; + assert (minDocCount == 0) == (emptyBucketInfo != null); + this.minDocCount = minDocCount; this.emptyBucketInfo = emptyBucketInfo; this.formatter = formatter; this.keyed = keyed; @@ -170,28 +173,36 @@ public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { - if (emptyBucketInfo == null) { + if (minDocCount == 1) { return aggregations.get(0); } - // we need to fill the gaps with empty buckets AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0); CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator()); List list = order.asc ? histo.buckets : Lists.reverse(histo.buckets); HistogramBase.Bucket prevBucket = null; ListIterator iter = list.listIterator(); - while (iter.hasNext()) { - // look ahead on the next bucket without advancing the iter - // so we'll be able to insert elements at the right position - HistogramBase.Bucket nextBucket = list.get(iter.nextIndex()); - if (prevBucket != null) { - long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey()); - while (key != nextBucket.getKey()) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); + if (minDocCount == 0) { + // we need to fill the gaps with empty buckets + while (iter.hasNext()) { + // look ahead on the next bucket without advancing the iter + // so we'll be able to insert elements at the right position + HistogramBase.Bucket nextBucket = list.get(iter.nextIndex()); + if (prevBucket != null) { + long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey()); + while (key != nextBucket.getKey()) { + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations)); + key = emptyBucketInfo.rounding.nextRoundingValue(key); + } + } + prevBucket = iter.next(); + } + } else { + while (iter.hasNext()) { + if (iter.next().getDocCount() < minDocCount) { + iter.remove(); } } - prevBucket = iter.next(); } if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) { @@ -223,7 +234,9 @@ public InternalAggregation reduce(ReduceContext reduceContext) { for (int i = 0; i < allocated.length; i++) { if (allocated[i]) { Bucket bucket = ((List) buckets[i]).get(0).reduce(((List) buckets[i]), reduceContext.cacheRecycler()); - reducedBuckets.add(bucket); + if (bucket.getDocCount() >= minDocCount) { + reducedBuckets.add(bucket); + } } } bucketsByKey.release(); @@ -231,7 +244,7 @@ public InternalAggregation reduce(ReduceContext reduceContext) { // adding empty buckets in needed - if (emptyBucketInfo != null) { + if (minDocCount == 0) { CollectionUtil.introSort(reducedBuckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator()); List list = order.asc ? reducedBuckets : Lists.reverse(reducedBuckets); HistogramBase.Bucket prevBucket = null; @@ -269,7 +282,8 @@ protected B createBucket(long key, long docCount, InternalAggregations aggregati public void readFrom(StreamInput in) throws IOException { name = in.readString(); order = InternalOrder.Streams.readOrder(in); - if (in.readBoolean()) { + minDocCount = in.readVLong(); + if (minDocCount == 0) { emptyBucketInfo = EmptyBucketInfo.readFrom(in); } formatter = ValueFormatterStreams.readOptional(in); @@ -287,10 +301,8 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); - if (emptyBucketInfo == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); + out.writeVLong(minDocCount); + if (minDocCount == 0) { EmptyBucketInfo.writeTo(emptyBucketInfo, out); } ValueFormatterStreams.writeOptional(formatter, out); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java index 237448daf56ee..152bd956c5e5e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java @@ -87,7 +87,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se String scriptLang = null; Map scriptParams = null; boolean keyed = false; - boolean computeEmptyBuckets = false; + long minDocCount = 1; InternalOrder order = (InternalOrder) Histogram.Order.KEY_ASC; String interval = null; boolean preZoneAdjustLargeInterval = false; @@ -132,13 +132,17 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se } else if (token == XContentParser.Token.VALUE_BOOLEAN) { if ("keyed".equals(currentFieldName)) { keyed = parser.booleanValue(); - } else if ("compute_empty_buckets".equals(currentFieldName) || "computeEmptyBuckets".equals(currentFieldName)) { - computeEmptyBuckets = parser.booleanValue(); } else if ("script_values_sorted".equals(currentFieldName)) { assumeSorted = parser.booleanValue(); } else { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) { + minDocCount = parser.longValue(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } } else if (token == XContentParser.Token.START_OBJECT) { if ("params".equals(currentFieldName)) { scriptParams = parser.map(); @@ -200,17 +204,17 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se if (searchScript != null) { ValueParser valueParser = new ValueParser.DateMath(new DateMathParser(DateFieldMapper.Defaults.DATE_TIME_FORMATTER, DateFieldMapper.Defaults.TIME_UNIT)); config.parser(valueParser); - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY); } // falling back on the get field data context - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY); } FieldMapper mapper = context.smartNameFieldMapper(field); if (mapper == null) { config.unmapped(true); - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY); } if (!(mapper instanceof DateFieldMapper)) { @@ -219,7 +223,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se IndexFieldData indexFieldData = context.fieldData().getForField(mapper); config.fieldContext(new FieldContext(field, indexFieldData)); - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY); } private static InternalOrder resolveOrder(String key, boolean asc) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java index e9fd21f87dd8c..120d16938d4d7 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java @@ -46,7 +46,7 @@ public class HistogramAggregator extends BucketsAggregator { private final Rounding rounding; private final InternalOrder order; private final boolean keyed; - private final boolean computeEmptyBuckets; + private final long minDocCount; private final AbstractHistogramBase.Factory histogramFactory; private final LongHash bucketOrds; @@ -56,7 +56,7 @@ public HistogramAggregator(String name, Rounding rounding, InternalOrder order, boolean keyed, - boolean computeEmptyBuckets, + long minDocCount, @Nullable NumericValuesSource valuesSource, long initialCapacity, AbstractHistogramBase.Factory histogramFactory, @@ -68,7 +68,7 @@ public HistogramAggregator(String name, this.rounding = rounding; this.order = order; this.keyed = keyed; - this.computeEmptyBuckets = computeEmptyBuckets; + this.minDocCount = minDocCount; this.histogramFactory = histogramFactory; bucketOrds = new LongHash(initialCapacity, aggregationContext.pageCacheRecycler()); @@ -119,15 +119,15 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) { // value source will be null for unmapped fields ValueFormatter formatter = valuesSource != null ? valuesSource.formatter() : null; - AbstractHistogramBase.EmptyBucketInfo emptyBucketInfo = computeEmptyBuckets ? new AbstractHistogramBase.EmptyBucketInfo(rounding, buildEmptySubAggregations()) : null; - return histogramFactory.create(name, buckets, order, emptyBucketInfo, formatter, keyed); + AbstractHistogramBase.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 ? new AbstractHistogramBase.EmptyBucketInfo(rounding, buildEmptySubAggregations()) : null; + return histogramFactory.create(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } @Override public InternalAggregation buildEmptyAggregation() { ValueFormatter formatter = valuesSource != null ? valuesSource.formatter() : null; - AbstractHistogramBase.EmptyBucketInfo emptyBucketInfo = computeEmptyBuckets ? new AbstractHistogramBase.EmptyBucketInfo(rounding, buildEmptySubAggregations()) : null; - return histogramFactory.create(name, (List) Collections.EMPTY_LIST, order, emptyBucketInfo, formatter, keyed); + AbstractHistogramBase.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 ? new AbstractHistogramBase.EmptyBucketInfo(rounding, buildEmptySubAggregations()) : null; + return histogramFactory.create(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed); } @Override @@ -140,28 +140,28 @@ public static class Factory extends ValueSourceAggregatorFactory histogramFactory; public Factory(String name, ValuesSourceConfig valueSourceConfig, - Rounding rounding, InternalOrder order, boolean keyed, boolean computeEmptyBuckets, AbstractHistogramBase.Factory histogramFactory) { + Rounding rounding, InternalOrder order, boolean keyed, long minDocCount, AbstractHistogramBase.Factory histogramFactory) { super(name, histogramFactory.type(), valueSourceConfig); this.rounding = rounding; this.order = order; this.keyed = keyed; - this.computeEmptyBuckets = computeEmptyBuckets; + this.minDocCount = minDocCount; this.histogramFactory = histogramFactory; } @Override protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { - return new HistogramAggregator(name, factories, rounding, order, keyed, computeEmptyBuckets, null, 0, histogramFactory, aggregationContext, parent); + return new HistogramAggregator(name, factories, rounding, order, keyed, minDocCount, null, 0, histogramFactory, aggregationContext, parent); } @Override protected Aggregator create(NumericValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) { // todo if we'll keep track of min/max values in IndexFieldData, we could use the max here to come up with a better estimation for the buckets count - return new HistogramAggregator(name, factories, rounding, order, keyed, computeEmptyBuckets, valuesSource, 50, histogramFactory, aggregationContext, parent); + return new HistogramAggregator(name, factories, rounding, order, keyed, minDocCount, valuesSource, 50, histogramFactory, aggregationContext, parent); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramBuilder.java index 3a703959b8856..de8f4a7756f64 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramBuilder.java @@ -32,7 +32,7 @@ public class HistogramBuilder extends ValuesSourceAggregationBuilder scriptParams = null; boolean keyed = false; - boolean emptyBuckets = false; + long minDocCount = 1; InternalOrder order = (InternalOrder) InternalOrder.KEY_ASC; long interval = -1; boolean assumeSorted = false; @@ -81,14 +81,14 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se } else if (token == XContentParser.Token.VALUE_NUMBER) { if ("interval".equals(currentFieldName)) { interval = parser.longValue(); + } else if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) { + minDocCount = parser.longValue(); } else { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); } } else if (token == XContentParser.Token.VALUE_BOOLEAN) { if ("keyed".equals(currentFieldName)) { keyed = parser.booleanValue(); - } else if ("empty_buckets".equals(currentFieldName) || "emptyBuckets".equals(currentFieldName)) { - emptyBuckets = parser.booleanValue(); } else if ("script_values_sorted".equals(currentFieldName)) { assumeSorted = parser.booleanValue(); } else { @@ -131,13 +131,13 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se } if (field == null) { - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, emptyBuckets, InternalHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalHistogram.FACTORY); } FieldMapper mapper = context.smartNameFieldMapper(field); if (mapper == null) { config.unmapped(true); - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, emptyBuckets, InternalHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalHistogram.FACTORY); } IndexFieldData indexFieldData = context.fieldData().getForField(mapper); @@ -147,7 +147,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se config.formatter(new ValueFormatter.Number.Pattern(format)); } - return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, emptyBuckets, InternalHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalHistogram.FACTORY); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 6569ec34fe1b1..776a25bb876c5 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -77,8 +77,8 @@ public String type() { } @Override - public AbstractHistogramBase create(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { - return new InternalDateHistogram(name, buckets, order, emptyBucketInfo, formatter, keyed); + public AbstractHistogramBase create(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { + return new InternalDateHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } @Override @@ -89,8 +89,8 @@ public AbstractHistogramBase.Bucket createBucket(long key, long docCount, Intern InternalDateHistogram() {} // for serialization - InternalDateHistogram(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { - super(name, buckets, order, emptyBucketInfo, formatter, keyed); + InternalDateHistogram(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { + super(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 3f031fbda22e9..ac9e5ea3631a7 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -66,8 +66,8 @@ public String type() { return TYPE.name(); } - public AbstractHistogramBase create(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { - return new InternalHistogram(name, buckets, order, emptyBucketInfo, formatter, keyed); + public AbstractHistogramBase create(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } public Bucket createBucket(long key, long docCount, InternalAggregations aggregations) { @@ -78,8 +78,8 @@ public Bucket createBucket(long key, long docCount, InternalAggregations aggrega public InternalHistogram() {} // for serialization - public InternalHistogram(String name, List buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { - super(name, buckets, order, emptyBucketInfo, formatter, keyed); + public InternalHistogram(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) { + super(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index 11582107810a2..3b6b290af9692 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -89,12 +89,12 @@ int compareTerm(Terms.Bucket other) { DoubleTerms() {} // for serialization - public DoubleTerms(String name, InternalOrder order, int requiredSize, Collection buckets) { - this(name, order, null, requiredSize, buckets); + public DoubleTerms(String name, InternalOrder order, int requiredSize, long minDocCount, Collection buckets) { + this(name, order, null, requiredSize, minDocCount, buckets); } - public DoubleTerms(String name, InternalOrder order, ValueFormatter valueFormatter, int requiredSize, Collection buckets) { - super(name, order, requiredSize, buckets); + public DoubleTerms(String name, InternalOrder order, ValueFormatter valueFormatter, int requiredSize, long minDocCount, Collection buckets) { + super(name, order, requiredSize, minDocCount, buckets); this.valueFormatter = valueFormatter; } @@ -148,7 +148,10 @@ public InternalTerms reduce(ReduceContext reduceContext) { for (int i = 0; i < states.length; i++) { if (states[i]) { List sameTermBuckets = (List) internalBuckets[i]; - ordered.insertWithOverflow(sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler())); + final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); + if (b.getDocCount() >= minDocCount) { + ordered.insertWithOverflow(b); + } } } buckets.release(); @@ -166,6 +169,7 @@ public void readFrom(StreamInput in) throws IOException { this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); this.requiredSize = in.readVInt(); + this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); for (int i = 0; i < size; i++) { @@ -181,6 +185,7 @@ public void writeTo(StreamOutput out) throws IOException { InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); out.writeVInt(requiredSize); + out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { out.writeDouble(((Bucket) bucket).term); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java index d35e0238b7234..5506e43570c82 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.AtomicReaderContext; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.index.fielddata.DoubleValues; import org.elasticsearch.search.aggregations.Aggregator; @@ -41,16 +42,18 @@ public class DoubleTermsAggregator extends BucketsAggregator { private final InternalOrder order; private final int requiredSize; private final int shardSize; + private final long minDocCount; private final NumericValuesSource valuesSource; private final LongHash bucketOrds; public DoubleTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount, - InternalOrder order, int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) { + InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { super(name, BucketAggregationMode.PER_BUCKET, factories, estimatedBucketCount, aggregationContext, parent); this.valuesSource = valuesSource; this.order = InternalOrder.validate(order, this); this.requiredSize = requiredSize; this.shardSize = shardSize; + this.minDocCount = minDocCount; bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.pageCacheRecycler()); } @@ -79,6 +82,21 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException { @Override public DoubleTerms buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; + + if (minDocCount == 0 && (order != InternalOrder.COUNT_DESC || bucketOrds.size() < requiredSize)) { + // we need to fill-in the blanks + for (AtomicReaderContext ctx : context.searchContext().searcher().getTopReaderContext().leaves()) { + context.setNextReader(ctx); + final DoubleValues values = valuesSource.doubleValues(); + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + final int valueCount = values.setDocument(docId); + for (int i = 0; i < valueCount; ++i) { + bucketOrds.add(Double.doubleToLongBits(values.nextValue())); + } + } + } + } + final int size = (int) Math.min(bucketOrds.size(), shardSize); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this)); @@ -105,12 +123,12 @@ public DoubleTerms buildAggregation(long owningBucketOrdinal) { bucket.aggregations = bucketAggregations(bucket.bucketOrd); list[i] = bucket; } - return new DoubleTerms(name, order, valuesSource.formatter(), requiredSize, Arrays.asList(list)); + return new DoubleTerms(name, order, valuesSource.formatter(), requiredSize, minDocCount, Arrays.asList(list)); } @Override public DoubleTerms buildEmptyAggregation() { - return new DoubleTerms(name, order, valuesSource.formatter(), requiredSize, Collections.emptyList()); + return new DoubleTerms(name, order, valuesSource.formatter(), requiredSize, minDocCount, Collections.emptyList()); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index a8f6d1f6a7dc4..670e06295a104 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.terms; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.io.stream.Streamable; @@ -79,15 +80,17 @@ public Bucket reduce(List buckets, CacheRecycler cacheRecycler protected InternalOrder order; protected int requiredSize; + protected long minDocCount; protected Collection buckets; protected Map bucketMap; protected InternalTerms() {} // for serialization - protected InternalTerms(String name, InternalOrder order, int requiredSize, Collection buckets) { + protected InternalTerms(String name, InternalOrder order, int requiredSize, long minDocCount, Collection buckets) { super(name); this.order = order; this.requiredSize = requiredSize; + this.minDocCount = minDocCount; this.buckets = buckets; } @@ -157,7 +160,10 @@ public InternalTerms reduce(ReduceContext reduceContext) { BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); - ordered.insertWithOverflow(sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler())); + final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); + if (b.docCount >= minDocCount) { + ordered.insertWithOverflow(b); + } } Bucket[] list = new Bucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; i--) { @@ -167,23 +173,17 @@ public InternalTerms reduce(ReduceContext reduceContext) { return reduced; } - protected void trimExcessEntries() { - if (requiredSize >= buckets.size()) { - return; - } - - if (buckets instanceof List) { - buckets = ((List) buckets).subList(0, requiredSize); - return; - } - - int i = 0; - for (Iterator iter = buckets.iterator(); iter.hasNext();) { - iter.next(); - if (i++ >= requiredSize) { - iter.remove(); + final void trimExcessEntries() { + final List newBuckets = Lists.newArrayList(); + for (Bucket b : buckets) { + if (newBuckets.size() >= requiredSize) { + break; + } + if (b.docCount >= minDocCount) { + newBuckets.add(b); } } + buckets = newBuckets; } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 4f71f534fdb2d..2116491083311 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -90,8 +90,8 @@ int compareTerm(Terms.Bucket other) { LongTerms() {} // for serialization - public LongTerms(String name, InternalOrder order, ValueFormatter valueFormatter, int requiredSize, Collection buckets) { - super(name, order, requiredSize, buckets); + public LongTerms(String name, InternalOrder order, ValueFormatter valueFormatter, int requiredSize, long minDocCount, Collection buckets) { + super(name, order, requiredSize, minDocCount, buckets); this.valueFormatter = valueFormatter; } @@ -145,7 +145,10 @@ public InternalTerms reduce(ReduceContext reduceContext) { for (int i = 0; i < states.length; i++) { if (states[i]) { List sameTermBuckets = (List) internalBuckets[i]; - ordered.insertWithOverflow(sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler())); + final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); + if (b.getDocCount() >= minDocCount) { + ordered.insertWithOverflow(b); + } } } buckets.release(); @@ -163,6 +166,7 @@ public void readFrom(StreamInput in) throws IOException { this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); this.requiredSize = in.readVInt(); + this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); for (int i = 0; i < size; i++) { @@ -178,6 +182,7 @@ public void writeTo(StreamOutput out) throws IOException { InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); out.writeVInt(requiredSize); + out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { out.writeLong(((Bucket) bucket).term); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java index cc962b756c6bf..9b9eab8845f54 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.AtomicReaderContext; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.index.fielddata.LongValues; import org.elasticsearch.search.aggregations.Aggregator; @@ -41,16 +42,18 @@ public class LongTermsAggregator extends BucketsAggregator { private final InternalOrder order; private final int requiredSize; private final int shardSize; + private final long minDocCount; private final NumericValuesSource valuesSource; private final LongHash bucketOrds; public LongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount, - InternalOrder order, int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) { + InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { super(name, BucketAggregationMode.PER_BUCKET, factories, estimatedBucketCount, aggregationContext, parent); this.valuesSource = valuesSource; this.order = InternalOrder.validate(order, this); this.requiredSize = requiredSize; this.shardSize = shardSize; + this.minDocCount = minDocCount; bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.pageCacheRecycler()); } @@ -78,6 +81,21 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException { @Override public LongTerms buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; + + if (minDocCount == 0 && (order != InternalOrder.COUNT_DESC || bucketOrds.size() < requiredSize)) { + // we need to fill-in the blanks + for (AtomicReaderContext ctx : context.searchContext().searcher().getTopReaderContext().leaves()) { + context.setNextReader(ctx); + final LongValues values = valuesSource.longValues(); + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + final int valueCount = values.setDocument(docId); + for (int i = 0; i < valueCount; ++i) { + bucketOrds.add(values.nextValue()); + } + } + } + } + final int size = (int) Math.min(bucketOrds.size(), shardSize); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this)); @@ -104,12 +122,12 @@ public LongTerms buildAggregation(long owningBucketOrdinal) { bucket.aggregations = bucketAggregations(bucket.bucketOrd); list[i] = bucket; } - return new LongTerms(name, order, valuesSource.formatter(), requiredSize, Arrays.asList(list)); + return new LongTerms(name, order, valuesSource.formatter(), requiredSize, minDocCount, Arrays.asList(list)); } @Override public LongTerms buildEmptyAggregation() { - return new LongTerms(name, order, valuesSource.formatter(), requiredSize, Collections.emptyList()); + return new LongTerms(name, order, valuesSource.formatter(), requiredSize, minDocCount, Collections.emptyList()); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index 9f2cef5e1a1e4..7e917c4a483eb 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -84,8 +84,8 @@ int compareTerm(Terms.Bucket other) { StringTerms() {} // for serialization - public StringTerms(String name, InternalOrder order, int requiredSize, Collection buckets) { - super(name, order, requiredSize, buckets); + public StringTerms(String name, InternalOrder order, int requiredSize, long minDocCount, Collection buckets) { + super(name, order, requiredSize, minDocCount, buckets); } @Override @@ -98,6 +98,7 @@ public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.requiredSize = in.readVInt(); + this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); for (int i = 0; i < size; i++) { @@ -112,6 +113,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); out.writeVInt(requiredSize); + out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { out.writeBytesRef(((Bucket) bucket).termBytes); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java index 2f0c9d164e95f..08f72d32f7f47 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java @@ -19,30 +19,29 @@ package org.elasticsearch.search.aggregations.bucket.terms; -import com.sun.corba.se.impl.naming.cosnaming.InterOperableNamingImpl; +import com.google.common.collect.Lists; +import com.google.common.collect.UnmodifiableIterator; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.collect.Iterators2; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.ReaderContextAware; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.fielddata.BytesValues; import org.elasticsearch.index.fielddata.ordinals.Ordinals; -import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.BytesRefHash; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; -import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; +import java.util.*; /** * An aggregator of string values. @@ -53,11 +52,12 @@ public class StringTermsAggregator extends BucketsAggregator { private final InternalOrder order; private final int requiredSize; private final int shardSize; + private final long minDocCount; protected final BytesRefHash bucketOrds; private final IncludeExclude includeExclude; public StringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, - InternalOrder order, int requiredSize, int shardSize, + InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent) { super(name, BucketAggregationMode.PER_BUCKET, factories, estimatedBucketCount, aggregationContext, parent); @@ -65,6 +65,7 @@ public StringTermsAggregator(String name, AggregatorFactories factories, ValuesS this.order = InternalOrder.validate(order, this); this.requiredSize = requiredSize; this.shardSize = shardSize; + this.minDocCount = minDocCount; this.includeExclude = includeExclude; bucketOrds = new BytesRefHash(estimatedBucketCount, aggregationContext.pageCacheRecycler()); } @@ -95,9 +96,111 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException { } } + /** Returns an iterator over the field data terms. */ + private static Iterator terms(final BytesValues.WithOrdinals bytesValues, boolean reverse) { + final Ordinals.Docs ordinals = bytesValues.ordinals(); + if (reverse) { + return new UnmodifiableIterator() { + + long i = ordinals.getMaxOrd() - 1; + + @Override + public boolean hasNext() { + return i >= Ordinals.MIN_ORDINAL; + } + + @Override + public BytesRef next() { + bytesValues.getValueByOrd(i--); + return bytesValues.copyShared(); + } + + }; + } else { + return new UnmodifiableIterator() { + + long i = Ordinals.MIN_ORDINAL; + + @Override + public boolean hasNext() { + return i < ordinals.getMaxOrd(); + } + + @Override + public BytesRef next() { + bytesValues.getValueByOrd(i++); + return bytesValues.copyShared(); + } + + }; + } + } + @Override public StringTerms buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; + + if (minDocCount == 0 && (order != InternalOrder.COUNT_DESC || bucketOrds.size() < requiredSize)) { + // we need to fill-in the blanks + List valuesWithOrdinals = Lists.newArrayList(); + for (AtomicReaderContext ctx : context.searchContext().searcher().getTopReaderContext().leaves()) { + context.setNextReader(ctx); + final BytesValues values = valuesSource.bytesValues(); + if (values instanceof BytesValues.WithOrdinals) { + valuesWithOrdinals.add((BytesValues.WithOrdinals) values); + } else { + // brute force + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + final int valueCount = values.setDocument(docId); + for (int i = 0; i < valueCount; ++i) { + bucketOrds.add(values.nextValue(), values.currentValueHash()); + } + } + } + } + + // With ordinals we can be smarter and add just as many terms as necessary to the hash table + // For instance, if sorting by term asc, we only need to get the first `requiredSize` terms as other terms would + // either be excluded by the priority queue or at reduce time. + if (valuesWithOrdinals.size() > 0) { + final boolean reverse = order == InternalOrder.TERM_DESC; + Comparator comparator = BytesRef.getUTF8SortedAsUnicodeComparator(); + if (reverse) { + comparator = Collections.reverseOrder(comparator); + } + Iterator[] iterators = new Iterator[valuesWithOrdinals.size()]; + for (int i = 0; i < valuesWithOrdinals.size(); ++i) { + iterators[i] = terms(valuesWithOrdinals.get(i), reverse); + } + Iterator terms = Iterators2.mergeSorted(Arrays.asList(iterators), comparator, true); + if (order == InternalOrder.COUNT_ASC) { + // let's try to find `shardSize` terms that matched no hit + // this one needs shardSize and not requiredSize because even though terms have a count of 0 here, + // they might have higher counts on other shards + for (int added = 0; added < shardSize && terms.hasNext(); ) { + if (bucketOrds.add(terms.next()) >= 0) { + ++added; + } + } + } else if (order == InternalOrder.COUNT_DESC) { + // add terms until there are enough buckets + while (bucketOrds.size() < requiredSize && terms.hasNext()) { + bucketOrds.add(terms.next()); + } + } else if (order == InternalOrder.TERM_ASC || order == InternalOrder.TERM_DESC) { + // add the `requiredSize` least terms + for (int i = 0; i < requiredSize && terms.hasNext(); ++i) { + bucketOrds.add(terms.next()); + } + } else { + // other orders (aggregations) are not optimizable + while (terms.hasNext()) { + bucketOrds.add(terms.next()); + } + } + } + } + final int size = (int) Math.min(bucketOrds.size(), shardSize); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this)); @@ -118,12 +221,13 @@ public StringTerms buildAggregation(long owningBucketOrdinal) { bucket.aggregations = bucketAggregations(bucket.bucketOrd); list[i] = bucket; } - return new StringTerms(name, order, requiredSize, Arrays.asList(list)); + + return new StringTerms(name, order, requiredSize, minDocCount, Arrays.asList(list)); } @Override public StringTerms buildEmptyAggregation() { - return new StringTerms(name, order, requiredSize, Collections.emptyList()); + return new StringTerms(name, order, requiredSize, minDocCount, Collections.emptyList()); } @Override @@ -142,8 +246,8 @@ public static class WithOrdinals extends StringTermsAggregator implements Reader private LongArray ordinalToBucket; public WithOrdinals(String name, AggregatorFactories factories, BytesValuesSource.WithOrdinals valuesSource, long esitmatedBucketCount, - InternalOrder order, int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) { - super(name, factories, valuesSource, esitmatedBucketCount, order, requiredSize, shardSize, null, aggregationContext, parent); + InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { + super(name, factories, valuesSource, esitmatedBucketCount, order, requiredSize, shardSize, minDocCount, null, aggregationContext, parent); this.valuesSource = valuesSource; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 126647075568a..3d84eeeabc875 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -42,21 +42,23 @@ public class TermsAggregatorFactory extends ValueSourceAggregatorFactory { private final InternalOrder order; private final int requiredSize; private final int shardSize; + private final long minDocCount; private final IncludeExclude includeExclude; private final String executionHint; - public TermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, InternalOrder order, int requiredSize, int shardSize, IncludeExclude includeExclude, String executionHint) { + public TermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) { super(name, StringTerms.TYPE.name(), valueSourceConfig); this.order = order; this.requiredSize = requiredSize; this.shardSize = shardSize; + this.minDocCount = minDocCount; this.includeExclude = includeExclude; this.executionHint = executionHint; } @Override protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { - return new UnmappedTermsAggregator(name, order, requiredSize, aggregationContext, parent); + return new UnmappedTermsAggregator(name, order, requiredSize, minDocCount, aggregationContext, parent); } private static boolean hasParentBucketAggregator(Aggregator parent) { @@ -108,11 +110,11 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) { assert includeExclude == null; final StringTermsAggregator.WithOrdinals aggregator = new StringTermsAggregator.WithOrdinals(name, - factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, aggregationContext, parent); + factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent); aggregationContext.registerReaderContextAware(aggregator); return aggregator; } else { - return new StringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, includeExclude, aggregationContext, parent); + return new StringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent); } } @@ -123,9 +125,9 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount if (valuesSource instanceof NumericValuesSource) { if (((NumericValuesSource) valuesSource).isFloatingPoint()) { - return new DoubleTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, aggregationContext, parent); + return new DoubleTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent); } - return new LongTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, aggregationContext, parent); + return new LongTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent); } throw new AggregationExecutionException("terms aggregation cannot be applied to field [" + valuesSourceConfig.fieldContext().field() + diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsBuilder.java index 769e40bb7f2a6..0a0dac9b99756 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsBuilder.java @@ -33,6 +33,7 @@ public class TermsBuilder extends ValuesSourceAggregationBuilder { private int size = -1; private int shardSize = -1; + private long minDocCount = -1; private Terms.ValueType valueType; private Terms.Order order; private String includePattern; @@ -62,6 +63,14 @@ public TermsBuilder shardSize(int shardSize) { return this; } + /** + * Set the minimum document count terms should have in order to appear in the response. + */ + public TermsBuilder minDocCount(long minDocCount) { + this.minDocCount = minDocCount; + return this; + } + /** * Define a regular expression that will determine what terms should be aggregated. The regular expression is based * on the {@link java.util.regex.Pattern} class. @@ -135,6 +144,9 @@ protected XContentBuilder doInternalXContent(XContentBuilder builder, Params par if (shardSize >= 0) { builder.field("shard_size", shardSize); } + if (minDocCount >= 0) { + builder.field("min_doc_count", minDocCount); + } if (valueType != null) { builder.field("value_type", valueType.name().toLowerCase(Locale.ROOT)); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java index fc89050100b1b..edf5655c58888 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java @@ -73,6 +73,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se String exclude = null; int excludeFlags = 0; // 0 means no flags String executionHint = null; + long minDocCount = 1; XContentParser.Token token; @@ -111,6 +112,8 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se requiredSize = parser.intValue(); } else if ("shard_size".equals(currentFieldName) || "shardSize".equals(currentFieldName)) { shardSize = parser.intValue(); + } else if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) { + minDocCount = parser.intValue(); } else { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); } @@ -207,14 +210,14 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se if (!assumeUnique) { config.ensureUnique(true); } - return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, includeExclude, executionHint); + return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, minDocCount, includeExclude, executionHint); } FieldMapper mapper = context.smartNameFieldMapper(field); if (mapper == null) { ValuesSourceConfig config = new ValuesSourceConfig(BytesValuesSource.class); config.unmapped(true); - return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, includeExclude, executionHint); + return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, minDocCount, includeExclude, executionHint); } IndexFieldData indexFieldData = context.fieldData().getForField(mapper); @@ -256,7 +259,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se config.ensureUnique(true); } - return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, includeExclude, executionHint); + return new TermsAggregatorFactory(aggregationName, config, order, requiredSize, shardSize, minDocCount, includeExclude, executionHint); } static InternalOrder resolveOrder(String key, boolean asc) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 96fd596b6fe0e..3bc2937cce288 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -54,8 +54,8 @@ public static void registerStreams() { UnmappedTerms() {} // for serialization - public UnmappedTerms(String name, InternalOrder order, int requiredSize) { - super(name, order, requiredSize, BUCKETS); + public UnmappedTerms(String name, InternalOrder order, int requiredSize, long minDocCount) { + super(name, order, requiredSize, minDocCount, BUCKETS); } @Override @@ -68,6 +68,7 @@ public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.requiredSize = in.readVInt(); + this.minDocCount = in.readVLong(); this.buckets = BUCKETS; this.bucketMap = BUCKETS_MAP; } @@ -77,6 +78,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); out.writeVInt(requiredSize); + out.writeVLong(minDocCount); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTermsAggregator.java index 4eaf70d34ab3b..a4655208d8616 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTermsAggregator.java @@ -33,11 +33,13 @@ public class UnmappedTermsAggregator extends Aggregator { private final InternalOrder order; private final int requiredSize; + private final long minDocCount; - public UnmappedTermsAggregator(String name, InternalOrder order, int requiredSize, AggregationContext aggregationContext, Aggregator parent) { + public UnmappedTermsAggregator(String name, InternalOrder order, int requiredSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { super(name, BucketAggregationMode.PER_BUCKET, AggregatorFactories.EMPTY, 0, aggregationContext, parent); this.order = order; this.requiredSize = requiredSize; + this.minDocCount = minDocCount; } @Override @@ -52,11 +54,11 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException { @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; - return new UnmappedTerms(name, order, requiredSize); + return new UnmappedTerms(name, order, requiredSize, minDocCount); } @Override public InternalAggregation buildEmptyAggregation() { - return new UnmappedTerms(name, order, requiredSize); + return new UnmappedTerms(name, order, requiredSize, minDocCount); } } diff --git a/src/test/java/org/elasticsearch/common/collect/Iterators2Tests.java b/src/test/java/org/elasticsearch/common/collect/Iterators2Tests.java new file mode 100644 index 0000000000000..65aa51c8ec0c1 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/collect/Iterators2Tests.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.test.ElasticsearchTestCase; + +import java.util.Iterator; +import java.util.List; + +public class Iterators2Tests extends ElasticsearchTestCase { + + public void testDeduplicateSorted() { + final List list = Lists.newArrayList(); + for (int i = randomInt(100); i >= 0; --i) { + final int frequency = randomIntBetween(1, 10); + final String s = randomAsciiOfLength(randomIntBetween(2, 20)); + for (int j = 0; j < frequency; ++j) { + list.add(s); + } + } + CollectionUtil.introSort(list); + final List deduplicated = Lists.newArrayList(); + for (Iterator it = Iterators2.deduplicateSorted(list.iterator(), Ordering.natural()); it.hasNext(); ) { + deduplicated.add(it.next()); + } + assertEquals(Lists.newArrayList(Sets.newTreeSet(list)), deduplicated); + } + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java index 14ecd7fd8d7a2..02b2ce6f9f26f 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java @@ -889,7 +889,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(dateHistogram("date_histo").interval(1))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(dateHistogram("date_histo").interval(1))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/DateRangeTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/DateRangeTests.java index 0e08d6308b03f..82b3a5ff1bcc3 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/DateRangeTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/DateRangeTests.java @@ -995,7 +995,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(dateRange("date_range").addRange("0-1", 0, 1))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(dateRange("date_range").addRange("0-1", 0, 1))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java index 838058216b437..355be3f060b84 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java @@ -601,7 +601,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(terms("terms"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/FilterTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/FilterTests.java index f610d38c5f21f..aa5d9b1d5aa6c 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/FilterTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/FilterTests.java @@ -24,8 +24,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.hamcrest.Matchers; @@ -160,7 +160,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(filter("filter").filter(matchAllFilter()))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoDistanceTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoDistanceTests.java index ef3509475e5de..2e5839a5965a5 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoDistanceTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoDistanceTests.java @@ -372,7 +372,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(geoDistance("geo_dist").field("location").point("52.3760, 4.894").addRange("0-100", 0.0, 100.0))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/HistogramTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/HistogramTests.java index 269a43178d082..01d27aab1f41a 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/HistogramTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/HistogramTests.java @@ -757,7 +757,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(histogram("sub_histo").interval(1l))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/IPv4RangeTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/IPv4RangeTests.java index 707dade41dddc..967cb64ca768a 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/IPv4RangeTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/IPv4RangeTests.java @@ -843,7 +843,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(ipRange("ip_range").field("ip").addRange("r1", "10.0.0.1", "10.0.0.10"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java index a21ae18d2daed..4c5df643d2f83 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java @@ -595,7 +595,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(terms("terms"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountTests.java new file mode 100644 index 0000000000000..5be2d97757e5f --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountTests.java @@ -0,0 +1,298 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; + +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; + +public class MinDocCountTests extends ElasticsearchIntegrationTest { + + private static final QueryBuilder QUERY = QueryBuilders.termQuery("match", true); + + @Override + public Settings indexSettings() { + return ImmutableSettings.builder() + .put("index.number_of_shards", between(1, 5)) + .put("index.number_of_replicas", between(0, 1)) + .build(); + } + + private int cardinality; + + @Before + public void indexData() throws Exception { + createIndex("idx"); + + cardinality = randomIntBetween(8, 30); + final List indexRequests = new ArrayList(); + final Set stringTerms = new HashSet(); + final LongSet longTerms = new LongOpenHashSet(); + for (int i = 0; i < cardinality; ++i) { + String stringTerm; + do { + stringTerm = RandomStrings.randomAsciiOfLength(getRandom(), 8); + } while (!stringTerms.add(stringTerm)); + long longTerm; + do { + longTerm = randomInt(cardinality * 2); + } while (!longTerms.add(longTerm)); + double doubleTerm = longTerm * Math.PI; + final int frequency = randomBoolean() ? 1 : randomIntBetween(2, 20); + for (int j = 0; j < frequency; ++j) { + indexRequests.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject().field("s", stringTerm).field("l", longTerm).field("d", doubleTerm).field("match", randomBoolean()).endObject())); + } + } + cardinality = stringTerms.size(); + + indexRandom(true, indexRequests); + ensureSearchable(); + } + + private enum Script { + NO { + @Override + TermsBuilder apply(TermsBuilder builder, String field) { + return builder.field(field); + } + }, + YES { + @Override + TermsBuilder apply(TermsBuilder builder, String field) { + return builder.script("doc['" + field + "'].values"); + } + }; + abstract TermsBuilder apply(TermsBuilder builder, String field); + } + + // check that terms2 is a subset of terms1 + private void assertSubset(Terms terms1, Terms terms2, long minDocCount, int size) { + final Iterator it1 = terms1.iterator(); + final Iterator it2 = terms2.iterator(); + int size2 = 0; + while (it1.hasNext()) { + final Terms.Bucket bucket1 = it1.next(); + if (bucket1.getDocCount() >= minDocCount) { + if (size2++ == size) { + break; + } + assertTrue(it2.hasNext()); + final Terms.Bucket bucket2 = it2.next(); + assertEquals(bucket1.getKey(), bucket2.getKey()); + assertEquals(bucket1.getDocCount(), bucket2.getDocCount()); + } + } + assertFalse(it2.hasNext()); + } + + private void assertSubset(Histogram histo1, Histogram histo2, long minDocCount) { + final Iterator it2 = histo2.iterator(); + for (Histogram.Bucket b1 : histo1) { + if (b1.getDocCount() >= minDocCount) { + final Histogram.Bucket b2 = it2.next(); + assertEquals(b1.getKey(), b2.getKey()); + assertEquals(b1.getDocCount(), b2.getDocCount()); + } + } + } + + public void testStringTermAsc() throws Exception { + testMinDocCountOnTerms("s", Script.NO, Terms.Order.term(true)); + } + + public void testStringScriptTermAsc() throws Exception { + testMinDocCountOnTerms("s", Script.YES, Terms.Order.term(true)); + } + + public void testStringTermDesc() throws Exception { + testMinDocCountOnTerms("s", Script.NO, Terms.Order.term(false)); + } + + public void testStringScriptTermDesc() throws Exception { + testMinDocCountOnTerms("s", Script.YES, Terms.Order.term(false)); + } + + public void testStringCountAsc() throws Exception { + testMinDocCountOnTerms("s", Script.NO, Terms.Order.count(true)); + } + + public void testStringScriptCountAsc() throws Exception { + testMinDocCountOnTerms("s", Script.YES, Terms.Order.count(true)); + } + + public void testStringCountDesc() throws Exception { + testMinDocCountOnTerms("s", Script.NO, Terms.Order.count(false)); + } + + public void testStringScriptCountDesc() throws Exception { + testMinDocCountOnTerms("s", Script.YES, Terms.Order.count(false)); + } + + public void testLongTermAsc() throws Exception { + testMinDocCountOnTerms("l", Script.NO, Terms.Order.term(true)); + } + + public void testLongScriptTermAsc() throws Exception { + testMinDocCountOnTerms("l", Script.YES, Terms.Order.term(true)); + } + + public void testLongTermDesc() throws Exception { + testMinDocCountOnTerms("l", Script.NO, Terms.Order.term(false)); + } + + public void testLongScriptTermDesc() throws Exception { + testMinDocCountOnTerms("l", Script.YES, Terms.Order.term(false)); + } + + public void testLongCountAsc() throws Exception { + testMinDocCountOnTerms("l", Script.NO, Terms.Order.count(true)); + } + + public void testLongScriptCountAsc() throws Exception { + testMinDocCountOnTerms("l", Script.YES, Terms.Order.count(true)); + } + + public void testLongCountDesc() throws Exception { + testMinDocCountOnTerms("l", Script.NO, Terms.Order.count(false)); + } + + public void testLongScriptCountDesc() throws Exception { + testMinDocCountOnTerms("l", Script.YES, Terms.Order.count(false)); + } + + public void testDoubleTermAsc() throws Exception { + testMinDocCountOnTerms("d", Script.NO, Terms.Order.term(true)); + } + + public void testDoubleScriptTermAsc() throws Exception { + testMinDocCountOnTerms("d", Script.YES, Terms.Order.term(true)); + } + + public void testDoubleTermDesc() throws Exception { + testMinDocCountOnTerms("d", Script.NO, Terms.Order.term(false)); + } + + public void testDoubleScriptTermDesc() throws Exception { + testMinDocCountOnTerms("d", Script.YES, Terms.Order.term(false)); + } + + public void testDoubleCountAsc() throws Exception { + testMinDocCountOnTerms("d", Script.NO, Terms.Order.count(true)); + } + + public void testDoubleScriptCountAsc() throws Exception { + testMinDocCountOnTerms("d", Script.YES, Terms.Order.count(true)); + } + + public void testDoubleCountDesc() throws Exception { + testMinDocCountOnTerms("d", Script.NO, Terms.Order.count(false)); + } + + public void testDoubleScriptCountDesc() throws Exception { + testMinDocCountOnTerms("d", Script.YES, Terms.Order.count(false)); + } + + public void testMinDocCountOnTerms(String field, Script script, Terms.Order order) throws Exception { + // all terms + final SearchResponse allTermsResponse = client().prepareSearch("idx").setTypes("type") + .setSearchType(SearchType.COUNT) + .setQuery(QUERY) + .addAggregation(script.apply(terms("terms"), field) + .executionHint(StringTermsTests.randomExecutionHint()) + .order(order) + .size(cardinality + randomInt(10)) + .minDocCount(0)) + .execute().actionGet(); + final Terms allTerms = allTermsResponse.getAggregations().get("terms"); + assertEquals(cardinality, allTerms.buckets().size()); + + for (long minDocCount = 0; minDocCount < 20; ++minDocCount) { + final int size = randomIntBetween(1, cardinality + 2); + final SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setSearchType(SearchType.COUNT) + .setQuery(QUERY) + .addAggregation(script.apply(terms("terms"), field) + .executionHint(StringTermsTests.randomExecutionHint()) + .order(order) + .size(size) + .shardSize(cardinality + randomInt(10)) + .minDocCount(minDocCount)) + .execute().actionGet(); + assertSubset(allTerms, (Terms) response.getAggregations().get("terms"), minDocCount, size); + } + + } + + public void testHistogramCountAsc() throws Exception { + testHistogram(Histogram.Order.COUNT_ASC); + } + + public void testHistogramCountDesc() throws Exception { + testHistogram(Histogram.Order.COUNT_DESC); + } + + public void testHistogramKeyAsc() throws Exception { + testHistogram(Histogram.Order.KEY_ASC); + } + + public void testHistogramKeyDesc() throws Exception { + testHistogram(Histogram.Order.KEY_DESC); + } + + public void testHistogram(Histogram.Order order) throws Exception { + final int interval = randomIntBetween(1, 3); + final SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .setSearchType(SearchType.COUNT) + .setQuery(QUERY) + .addAggregation(histogram("histo").field("d").interval(interval).order(order).minDocCount(0)) + .execute().actionGet(); + + final Histogram allHisto = allResponse.getAggregations().get("histo"); + + for (long minDocCount = 0; minDocCount < 50; ++minDocCount) { + final SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setSearchType(SearchType.COUNT) + .setQuery(QUERY) + .addAggregation(histogram("histo").field("d").interval(interval).order(order).minDocCount(minDocCount)) + .execute().actionGet(); + assertSubset(allHisto, (Histogram) response.getAggregations().get("histo"), minDocCount); + } + + } + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/MissingTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/MissingTests.java index 041653375d538..379ab228aca56 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/MissingTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/MissingTests.java @@ -203,7 +203,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(missing("missing"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java index 92bec5abe5240..f1277fab0b959 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java @@ -252,7 +252,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(nested("nested").path("nested"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeTests.java index da550f7190e21..667de35b8217e 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeTests.java @@ -922,7 +922,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(range("range").addRange("0-2", 0.0, 2.0))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java index 2984f4dc3bf1b..fd6346f738210 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java @@ -67,7 +67,7 @@ public Settings indexSettings() { .build(); } - private String randomExecutionHint() { + public static String randomExecutionHint() { return randomFrom(Arrays.asList(null, TermsAggregatorFactory.EXECUTION_HINT_VALUE_MAP, TermsAggregatorFactory.EXECUTION_HINT_VALUE_ORDINALS)); } @@ -759,7 +759,7 @@ public void emptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .subAggregation(terms("terms"))) .execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgTests.java index 1d31003321eae..4f87a117a46e2 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgTests.java @@ -40,7 +40,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(avg("avg"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(avg("avg"))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsTests.java index 853fc3ea79bcb..ec5df622fdbbd 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsTests.java @@ -54,7 +54,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(extendedStats("stats"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(extendedStats("stats"))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxTests.java index 251c1a2e4c427..346eff33cc372 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxTests.java @@ -40,7 +40,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(max("max"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(max("max"))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/MinTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/MinTests.java index a6eb24c0486c9..94a4e618dd050 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/MinTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/MinTests.java @@ -40,7 +40,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(min("min"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(min("min"))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/StatsTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/StatsTests.java index 1ee02c6773f35..b5d8d35042c7e 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/StatsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/StatsTests.java @@ -41,7 +41,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(stats("stats"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(stats("stats"))) .execute().actionGet(); assertShardExecutionState(searchResponse, 0); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/SumTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/SumTests.java index 150a008a2c7dd..64063c761b6f3 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/metrics/SumTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/SumTests.java @@ -40,7 +40,7 @@ public void testEmptyAggregation() throws Exception { SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") .setQuery(matchAllQuery()) - .addAggregation(histogram("histo").field("value").interval(1l).emptyBuckets(true).subAggregation(sum("sum"))) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0).subAggregation(sum("sum"))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));