Skip to content

Commit

Permalink
Terms aggregations: make size=0 return all terms.
Browse files Browse the repository at this point in the history
Terms aggregations return up to `size` terms, so up to now, the way to get all
matching terms back was to set `size` to an arbitrary high number that would be
larger than the number of unique terms.

Terms aggregators already made sure to not allocate memory based on the `size`
parameter so this commit mostly consists in making `0` an alias for the
maximum integer value in the TermsParser.

Close #4837
  • Loading branch information
jpountz committed Jan 22, 2014
1 parent 21897fd commit 8b0a863
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 12 deletions.
Expand Up @@ -65,6 +65,8 @@ the client.
NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.

added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this
on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network.

==== Order

Expand Down
Expand Up @@ -167,7 +167,7 @@ public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -183,7 +183,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -29,6 +31,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;

import java.io.IOException;
import java.util.*;

/**
Expand Down Expand Up @@ -185,4 +188,17 @@ final void trimExcessEntries() {
buckets = newBuckets;
}

// 0 actually means unlimited
protected static int readSize(StreamInput in) throws IOException {
final int size = in.readVInt();
return size == 0 ? Integer.MAX_VALUE : size;
}

protected static void writeSize(int size, StreamOutput out) throws IOException {
if (size == Integer.MAX_VALUE) {
size = 0;
}
out.writeVInt(size);
}

}
Expand Up @@ -164,7 +164,7 @@ public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -180,7 +180,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Expand Up @@ -96,7 +96,7 @@ public Type type() {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -111,7 +111,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Expand Up @@ -176,6 +176,14 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
}
}

if (shardSize == 0) {
shardSize = Integer.MAX_VALUE;
}

if (requiredSize == 0) {
requiredSize = Integer.MAX_VALUE;
}

// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
if (shardSize < requiredSize) {
shardSize = requiredSize;
Expand Down
Expand Up @@ -66,7 +66,7 @@ public Type type() {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.buckets = BUCKETS;
this.bucketMap = BUCKETS_MAP;
Expand All @@ -76,7 +76,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
}

Expand Down
Expand Up @@ -64,15 +64,15 @@ public Settings indexSettings() {
@Before
public void init() throws Exception {
createIndex("idx");

IndexRequestBuilder[] lowcardBuilders = new IndexRequestBuilder[NUM_DOCS];
for (int i = 0; i < lowcardBuilders.length; i++) {
lowcardBuilders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder()
.startObject()
.field("value", (double) i)
.startArray("values").value((double)i).value(i + 1d).endArray()
.endObject());

}
indexRandom(randomBoolean(), lowcardBuilders);
IndexRequestBuilder[] highCardBuilders = new IndexRequestBuilder[100]; // TODO: randomize the size?
Expand All @@ -89,6 +89,24 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -550,7 +568,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();

assertSearchResponse(response);
Expand Down
Expand Up @@ -87,6 +87,24 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -544,7 +562,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();

assertSearchResponse(response);
Expand Down
Expand Up @@ -97,6 +97,26 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
final int minDocCount = randomInt(1);
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field("value")
.minDocCount(minDocCount)
.size(0))
.execute().actionGet();

assertSearchResponse(response);System.out.println(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(minDocCount == 0 ? 105 : 100)); // 105 because of the other type
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -686,6 +706,7 @@ public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.size(randomInt(5))
.field("value"))
.execute().actionGet();

Expand Down

0 comments on commit 8b0a863

Please sign in to comment.