Skip to content

Commit

Permalink
Add new option min_doc_count to terms and histogram aggregations.
Browse files Browse the repository at this point in the history
`min_doc_count` is the minimum number of hits that a term or histogram key
should match in order to appear in the response.

`min_doc_count=0` replaces `compute_empty_buckets` for histograms and will
behave exactly like facets' `all_terms=true` for terms aggregations.

Close elastic#4662
  • Loading branch information
jpountz committed Jan 10, 2014
1 parent fcdafad commit da9063d
Show file tree
Hide file tree
Showing 42 changed files with 822 additions and 136 deletions.
Expand Up @@ -159,6 +159,63 @@ If the histogram aggregation has a direct metrics sub-aggregation, the latter ca

<2> There is no need to configure the `price` field for the `price_stats` aggregation as it will inherit it by default from its parent histogram aggregation.

==== Minimum document count

It is possible to only return buckets that have a document count that is greater than or equal to a configured limit through the `min_doc_count` option.

[source,js]
--------------------------------------------------
{
"aggs" : {
"prices" : {
"histogram" : {
"field" : "price",
"interval" : 50,
"min_doc_count": 10
}
}
}
}
--------------------------------------------------

The above aggregation would only return buckets that contain 10 documents or more. Default value is `1`.

NOTE: The special value `0` can be used to add empty buckets to the response between the minimum and the maximum buckets. Here is an example of what the response could look like:

[source,js]
--------------------------------------------------
{
"aggregations": {
"prices": {
"0": {
"key": 0,
"doc_count": 2
},
"50": {
"key": 50,
"doc_count": 0
},
"150": {
"key": 150,
"doc_count": 3
},
"200": {
"key": 150,
"doc_count": 0
},
"250": {
"key": 150,
"doc_count": 0
},
"300": {
"key": 150,
"doc_count": 1
}
}
}
}
--------------------------------------------------

==== Response Format

By default, the buckets are retuned as an ordered array. It is also possilbe to request the response as a hash instead keyed by the buckets keys:
Expand Down
Expand Up @@ -142,6 +142,34 @@ Ordering the buckets by multi value metrics sub-aggregation (identified by the a
}
--------------------------------------------------

==== Minimum document count

It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option:

[source,js]
--------------------------------------------------
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tag",
"min_doc_count": 10
}
}
}
}
--------------------------------------------------

The above aggregation would only return tags which have been found in 10 hits or more. Default value is `1`.

NOTE: Setting `min_doc_count`=`0` will also return buckets for terms that didn't match any hit. However, some of
the returned terms which have a document count of zero might only belong to deleted documents, so there is
no warranty that a `match_all` query would find a positive document count for those terms.

WARNING: When NOT sorting on `doc_count` descending, high values of `min_doc_count` may return a number of buckets
which is less than `size` because not enough data was gathered from the shards. Missing buckets can be
back by increasing `shard_size`.

==== Script

Generating the terms using a script:
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/org/elasticsearch/common/collect/Iterators2.java
@@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.collect;

import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.UnmodifiableIterator;

import java.util.Comparator;
import java.util.Iterator;

public enum Iterators2 {
;

/** Remove duplicated elements from an iterator over sorted content. */
public static <T> Iterator<T> deduplicateSorted(Iterator<? extends T> iterator, final Comparator<? super T> comparator) {
final PeekingIterator<T> it = Iterators.peekingIterator(iterator);
return new UnmodifiableIterator<T>() {

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public T next() {
final T ret = it.next();
while (it.hasNext() && comparator.compare(ret, it.peek()) == 0) {
it.next();
}
assert !it.hasNext() || comparator.compare(ret, it.peek()) < 0 : "iterator is not sorted: " + ret + " > " + it.peek();
return ret;
}

};
}

/** Return a merged view over several iterators, optionally deduplicating equivalent entries. */
public static <T> Iterator<T> mergeSorted(Iterable<Iterator<? extends T>> iterators, Comparator<? super T> comparator, boolean deduplicate) {
Iterator<T> it = Iterators.mergeSorted(iterators, comparator);
if (deduplicate) {
it = deduplicateSorted(it, comparator);
}
return it;
}

}
Expand Up @@ -118,7 +118,7 @@ public static interface Factory<B extends HistogramBase.Bucket> {

String type();

AbstractHistogramBase create(String name, List<B> buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed);
AbstractHistogramBase create(String name, List<B> buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed);

Bucket createBucket(long key, long docCount, InternalAggregations aggregations);

Expand All @@ -129,14 +129,17 @@ public static interface Factory<B extends HistogramBase.Bucket> {
private InternalOrder order;
private ValueFormatter formatter;
private boolean keyed;
private long minDocCount;
private EmptyBucketInfo emptyBucketInfo;

protected AbstractHistogramBase() {} // for serialization

protected AbstractHistogramBase(String name, List<B> buckets, InternalOrder order, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) {
protected AbstractHistogramBase(String name, List<B> buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed) {
super(name);
this.buckets = buckets;
this.order = order;
assert (minDocCount == 0) == (emptyBucketInfo != null);
this.minDocCount = minDocCount;
this.emptyBucketInfo = emptyBucketInfo;
this.formatter = formatter;
this.keyed = keyed;
Expand Down Expand Up @@ -170,28 +173,36 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {

if (emptyBucketInfo == null) {
if (minDocCount == 1) {
return aggregations.get(0);
}

// we need to fill the gaps with empty buckets
AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0);
CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
List<HistogramBase.Bucket> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
HistogramBase.Bucket prevBucket = null;
ListIterator<HistogramBase.Bucket> iter = list.listIterator();
while (iter.hasNext()) {
// look ahead on the next bucket without advancing the iter
// so we'll be able to insert elements at the right position
HistogramBase.Bucket nextBucket = list.get(iter.nextIndex());
if (prevBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey());
while (key != nextBucket.getKey()) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
if (minDocCount == 0) {
// we need to fill the gaps with empty buckets
while (iter.hasNext()) {
// look ahead on the next bucket without advancing the iter
// so we'll be able to insert elements at the right position
HistogramBase.Bucket nextBucket = list.get(iter.nextIndex());
if (prevBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey());
while (key != nextBucket.getKey()) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
prevBucket = iter.next();
}
} else {
while (iter.hasNext()) {
if (iter.next().getDocCount() < minDocCount) {
iter.remove();
}
}
prevBucket = iter.next();
}

if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) {
Expand Down Expand Up @@ -223,15 +234,17 @@ public InternalAggregation reduce(ReduceContext reduceContext) {
for (int i = 0; i < allocated.length; i++) {
if (allocated[i]) {
Bucket bucket = ((List<Bucket>) buckets[i]).get(0).reduce(((List<Bucket>) buckets[i]), reduceContext.cacheRecycler());
reducedBuckets.add(bucket);
if (bucket.getDocCount() >= minDocCount) {
reducedBuckets.add(bucket);
}
}
}
bucketsByKey.release();



// adding empty buckets in needed
if (emptyBucketInfo != null) {
if (minDocCount == 0) {
CollectionUtil.introSort(reducedBuckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
List<HistogramBase.Bucket> list = order.asc ? reducedBuckets : Lists.reverse(reducedBuckets);
HistogramBase.Bucket prevBucket = null;
Expand Down Expand Up @@ -269,7 +282,8 @@ protected B createBucket(long key, long docCount, InternalAggregations aggregati
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
order = InternalOrder.Streams.readOrder(in);
if (in.readBoolean()) {
minDocCount = in.readVLong();
if (minDocCount == 0) {
emptyBucketInfo = EmptyBucketInfo.readFrom(in);
}
formatter = ValueFormatterStreams.readOptional(in);
Expand All @@ -287,10 +301,8 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
if (emptyBucketInfo == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVLong(minDocCount);
if (minDocCount == 0) {
EmptyBucketInfo.writeTo(emptyBucketInfo, out);
}
ValueFormatterStreams.writeOptional(formatter, out);
Expand Down
Expand Up @@ -87,7 +87,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
String scriptLang = null;
Map<String, Object> scriptParams = null;
boolean keyed = false;
boolean computeEmptyBuckets = false;
long minDocCount = 1;
InternalOrder order = (InternalOrder) Histogram.Order.KEY_ASC;
String interval = null;
boolean preZoneAdjustLargeInterval = false;
Expand Down Expand Up @@ -132,13 +132,17 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
if ("keyed".equals(currentFieldName)) {
keyed = parser.booleanValue();
} else if ("compute_empty_buckets".equals(currentFieldName) || "computeEmptyBuckets".equals(currentFieldName)) {
computeEmptyBuckets = parser.booleanValue();
} else if ("script_values_sorted".equals(currentFieldName)) {
assumeSorted = parser.booleanValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) {
minDocCount = parser.longValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if ("params".equals(currentFieldName)) {
scriptParams = parser.map();
Expand Down Expand Up @@ -200,17 +204,17 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
if (searchScript != null) {
ValueParser valueParser = new ValueParser.DateMath(new DateMathParser(DateFieldMapper.Defaults.DATE_TIME_FORMATTER, DateFieldMapper.Defaults.TIME_UNIT));
config.parser(valueParser);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY);
}

// falling back on the get field data context
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY);
}

FieldMapper<?> mapper = context.smartNameFieldMapper(field);
if (mapper == null) {
config.unmapped(true);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY);
}

if (!(mapper instanceof DateFieldMapper)) {
Expand All @@ -219,7 +223,7 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se

IndexFieldData<?> indexFieldData = context.fieldData().getForField(mapper);
config.fieldContext(new FieldContext(field, indexFieldData));
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, computeEmptyBuckets, InternalDateHistogram.FACTORY);
return new HistogramAggregator.Factory(aggregationName, config, rounding, order, keyed, minDocCount, InternalDateHistogram.FACTORY);
}

private static InternalOrder resolveOrder(String key, boolean asc) {
Expand Down

1 comment on commit da9063d

@uboness
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Please sign in to comment.