New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for partitioning set of terms #21626

Closed
wants to merge 1 commit into
base: master
from

Conversation

Projects
None yet
4 participants
@markharwood
Copy link
Contributor

markharwood commented Nov 17, 2016

Filters terms into arbitrary sized partitions so that multiple requests can be done without trying to compute everything in one request.

First draft for review of approach - needs tests/docs etc

Closes #21487

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated

@Override
public boolean accept(long value) {
// Should we hash the value to keep even distributions?

This comment has been minimized.

@jpountz

jpountz Nov 17, 2016

Contributor

I think we should. For instance in the case of a date field that only stores dates with second precision, all values would be multiples of 1000.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
if (i % incNumPartitions == zeroBasedPartition){
acceptedGlobalOrdinals.set(i);
}
}

This comment has been minimized.

@jpountz

jpountz Nov 17, 2016

Contributor

To me this is the part of the PR that needs attention. The way it is implemented here means that the same partition could match different terms across refreshes. I am unclear whether it would be an issue. Maybe it would not, like pagination of the search hits.

This comment has been minimized.

@markharwood

markharwood Nov 17, 2016

Author Contributor

There is an element of user control available - if the user chooses execution mode "map" then the values not the ordinals are used in a way that is consistent across refreshes?

This comment has been minimized.

@jpountz

jpountz Nov 17, 2016

Contributor

This is true. The alternative way I was thinking about would be to pull a terms enum from the sorted set doc values, and set bits in the bit set based on the hash of the values rather than their ordinal. This way, the partitioning would be based on the value but the terms aggregation would still be able to leverage ordinals to do the bucketing. The drawback is that it requires to compute a hash on every term of the field.

This comment has been minimized.

@jimczi

jimczi Nov 17, 2016

Member

+1 for the terms enum pulled from the sorted set. It should be fast to enumerate the terms since it is a sequential reads in the term dict of each segment. The hashing will be slow though, but again it's great if we can have a way to exhaust a term aggregations consistently even if it's not as fast as we would like ;).

This comment has been minimized.

@markharwood

markharwood Nov 17, 2016

Author Contributor

On the 2.7m userID terms test I see ords taking 2.4s vs 2.8s using hashes of terms.

This comment has been minimized.

@jimczi

jimczi Nov 17, 2016

Member

What do you mean by "using hashes of terms" ? The terms aggregation with execution hint set to "map" ?

This comment has been minimized.

@markharwood

markharwood Nov 17, 2016

Author Contributor

No, sorry. I mean the proposed approach (implemented here now) where take a TermsEnum, and only set ord bits where term hash-modulos to the required partition.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
final TermsEnum termEnum = globalOrdinals.termsEnum();

BytesRef term = termEnum.next();
int ord = 0;

This comment has been minimized.

@jpountz

jpountz Nov 17, 2016

Contributor

you need a long, or this could overflow. Or even not store it at all, and use termsEnum.ord() to know the ord of the current term

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
int ord = 0;
while (term != null) {
int hash = Math.abs(term.hashCode());
if (hash % incNumPartitions == incZeroBasedPartition) {

This comment has been minimized.

@jpountz

jpountz Nov 17, 2016

Contributor

I would use Math.floorMod(term.hashCode(), incNumPartitions) to avoid issues in the case that the hashcode is Integer.MIN_VALUE

@jpountz
Copy link
Contributor

jpountz left a comment

I left some comments but I like it in general. In addition to the inline comments, could you add docs and make the formatting more consistent with the rest of the code base? @jimczi Could you also have a look?

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
public boolean accept(long value) {
// hash the value to keep even distributions
buffer.putLong(0, value);
final int hashCode = StringHelper.murmurhash3_x86_32(buffer.array(), 0, Long.BYTES, StringHelper.GOOD_FAST_HASH_SEED);

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

you could call BitMixer.mix64(value), which should have the same quality but might be a bit faster

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -283,6 +354,13 @@ public IncludeExclude(StreamInput in) throws IOException {
} else {
excludeValues = null;
}
if (in.getVersion().after(Version.V_5_0_0)) {

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

I think the version should be 5_1_0

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

you can have a look at SimpleQueryStringBuilder.VERSION_5_1_0_UNRELEASED to see how to do it

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
}
if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.intValue());
}
}

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

can you add an else block that throws an exception?

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName()+" must be between >1 and <="+pOf);
}
} else {
throw new IllegalArgumentException(OF_FIELD.getPreferredName()+" value must be an integer");

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

it might make sense to throw a different exception message if pOf is null?

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
if(ofObject instanceof Integer){
pOf = (Integer) ofObject;
if (pOf < 2){
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName()+" must be >1");

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

Should we make the partitions 0-based like the from parameter of the search API?

This comment has been minimized.

@markharwood

markharwood Nov 18, 2016

Author Contributor

If we do we should change the "of" parameter name. "partition 19 of 20" doesn't sound like the last partition any more.

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

+1 for the 0-based partitions, this would be consistent with the from and the slice.

This comment has been minimized.

@markharwood

markharwood Nov 21, 2016

Author Contributor

OK - I'll changeof to be num_partitions just because "19 of 20" seems misleading

core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java Outdated
assertFalse(foundTerms.contains(bucket.getKeyAsNumber()));
foundTerms.add(bucket.getKeyAsNumber());
}
}

This comment has been minimized.

@jpountz

jpountz Nov 18, 2016

Contributor

maybe we should also assert on the size of foundTerms to ensure we visited all of them?

This comment has been minimized.

@jimczi
@jimczi
Copy link
Member

jimczi left a comment

I left some comments but I also like it. It needs to be documented, maybe with a full example on how to exhaust a term aggregation safely.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName()+" must be >1");
}
if (partition <1 || partition > pOf){
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName()+" must be between >1 and <="+pOf);

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

must be >=1 ?

core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java Outdated
}

private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
final int numPartitions = randomIntBetween(1, 4);

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

maybe start at 2, numPartitions=1 should fail, no ?

core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java Outdated
assertFalse(foundTerms.contains(bucket.getKeyAsNumber()));
foundTerms.add(bucket.getKeyAsNumber());
}
}

This comment has been minimized.

@jimczi
...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -250,13 +307,27 @@ public IncludeExclude(long[] includeValues, long[] excludeValues) {
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
}

public IncludeExclude(int partition, int pOf) {
this.incZeroBasedPartition = partition - 1;

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

Check that partition >= 1 && pof > partition ? It's only checked when we (de)serialize from json.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
if(ofObject instanceof Integer){
pOf = (Integer) ofObject;
if (pOf < 2){
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName()+" must be >1");

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

+1 for the 0-based partitions, this would be consistent with the from and the slice.

core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java Outdated
}

private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
final int numPartitions = randomIntBetween(1, 4);

This comment has been minimized.

@jimczi

jimczi Nov 21, 2016

Member

same here, randomIntBetween(2, 4) ?

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 21, 2016

Thanks for the review comments.
One thing left which is the 5.0.1 version check which I'll address when I rebase to latest master before the final push so I can see that constant.

@jpountz

This comment has been minimized.

Copy link
Contributor

jpountz commented Nov 21, 2016

One thing left which is the 5.0.1 version check which I'll address when I rebase to latest master before the final push so I can see that constant.

You could add bw logic today already by following the instructions described at https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/Version.java#L101 I think?

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 21, 2016

I'll rebase and force push here

@colings86

This comment has been minimized.

Copy link
Member

colings86 commented Nov 21, 2016

My worry with this idea is that it adds more complexity to an already very complex aggregation. I like the idea but I wonder if there is a way to implement it that doesn't add more complexity

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 21, 2016

it adds more complexity to an already very complex aggregation

Do you mean the end-user complexity or the internal impl?

Internally the aggs are untouched and it just adds another means of filtering to a class already designed to encapsulate the different means of value filtering.

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 22, 2016

I wonder if there is a way to implement it that doesn't add more complexity

The problem is bucket explosions in a single request. The possible solutions are:

  1. Document partitioning - doesn't work when considering > 1 doc per term e.g. finding first and last docs for a websessionID
  2. Breadth first with client-supplied Term lists clients could filter analysis using term-exclude clauses to signal what they have already seen in previous requests but
    • breadth-first doesn't work if top term-agg sorts terms by child aggs e.g. finding last access dates for account IDs.
    • the exclude term lists can get prohibitively large.
  3. Server-maintained Term lists - similar to 2) but the server would have to maintain state in a session similar to scan/scroll to remember what had already been processed.
  4. Term partitioning the basis of this PR
  5. Map-reduce don't use the aggs framework at all but use big sorts and merges to analyze all the data in a way that scales with data volumes.
  6. Entity-centric indexing aggregate data using custom update scripts and periodic updates to maintain a position e.g. each each web session's duration or last access.
@jpountz

This comment has been minimized.

Copy link
Contributor

jpountz commented Nov 22, 2016

As far as complexity is concerned, I agree aggs can't afford more complexity but I think this PR is not too bad since it is contained and focuses on an include impl, without adding more APIs or making existing APIs more complex.

@jpountz
Copy link
Contributor

jpountz left a comment

I left a couple minor comments. Since the promise of this change is to be able to process large amounts of data in smaller requests, I am wondering that we should improve the heuristic that guesses the best execution mode to use a sparse way to store counts (GLOBAL_ORDINALS_HASH) when an include is set? If I read the code correctly, we would still use the term ordinals as bucket ids otherwise, which is probably wasteful if a partition is set? In general the change looks good.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -250,13 +304,30 @@ public IncludeExclude(long[] includeValues, long[] excludeValues) {
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
}

public IncludeExclude(int partition, int numPartitions) {
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException(" Parition must be >=0 and < numPartition which is "+numPartitions);

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

s/Parition/Partition/
Also the leading space in the error message looks unnecessary?

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -283,6 +354,13 @@ public IncludeExclude(StreamInput in) throws IOException {
} else {
excludeValues = null;
}
if (in.getVersion().after(Version.V_5_0_0)) {

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

you can have a look at SimpleQueryStringBuilder.VERSION_5_1_0_UNRELEASED to see how to do it

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -436,12 +518,24 @@ public boolean token(String currentFieldName, XContentParser.Token token, XConte
if (token == XContentParser.Token.START_OBJECT) {
if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {

// This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0
// Regexes should be "include":"foo.*"
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.text());
}

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

can you throw an exception here too? I'm worried of users providing the partition numbers as strings, which would be ignored

Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

can you set a very high size parameter on the terms aggregation to make sure we got all terms?

core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java Outdated
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertFalse(foundTerms.contains(bucket.getKeyAsNumber()));
foundTerms.add(bucket.getKeyAsNumber());

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

this can be collapsed into assertTrue(foundTerms.add(bucket.getKeyAsNumber()))

core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java Outdated
}
}
}
}

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

no multivalued test like numerics?

}
}
}
}

This comment has been minimized.

@jpountz

jpountz Nov 22, 2016

Contributor

can you use underscore case in this example to be more consistent with the rest of the docs?

@markharwood markharwood force-pushed the markharwood:fix/21487 branch Nov 23, 2016

@colings86

This comment has been minimized.

Copy link
Member

colings86 commented Nov 24, 2016

I have had a chat with @markharwood and I am ok with this be merged given the logic is almost exclusively contained within the IncludeExclude part of the terms agg.

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
@@ -517,6 +644,11 @@ public boolean isRegexBased() {
return include != null || exclude != null;
}

public boolean isPartitionFilter() {

This comment has been minimized.

@colings86

colings86 Nov 24, 2016

Member

nit: I wonder if we should name this isPartitionBased() to be consistent with the isRegexBased()?

@jimczi

jimczi approved these changes Nov 24, 2016

Copy link
Member

jimczi left a comment

LGTM !

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 24, 2016

Ok with this @jpountz ? If so which branches shall I target?

@jpountz
Copy link
Contributor

jpountz left a comment

LGTM

// Additionally, if using partitioned terms the regular global
// ordinals would be sparse so we opt for hash
if (Aggregator.descendsFromBucketAggregator(parent) ||
(includeExclude != null && includeExclude.isPartitionBased())) {

This comment has been minimized.

@jpountz

jpountz Nov 24, 2016

Contributor

++

...src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java Outdated
if (numPartitionsObject instanceof Integer) {
numPartitions = (Integer) numPartitionsObject;
if (numPartitions < 2) {
throw new IllegalArgumentException(PARTITION_FIELD.getPreferredName() + " must be >1");

This comment has been minimized.

@jpountz

jpountz Nov 24, 2016

Contributor

s/PARTITION_FIELD/NUM_PARTITIONS_FIELD/

@jpountz

This comment has been minimized.

Copy link
Contributor

jpountz commented Nov 24, 2016

If so which branches shall I target?

master and 5.x I would say, meaning we should update the version constant to be 5_2_0 since a branch for 5.1 has been cut

@jpountz

This comment has been minimized.

Copy link
Contributor

jpountz commented Nov 24, 2016

You might want to have a look at the changes introduced to unreleased versions at #21760.

Aggregations - support for partitioning set of terms used in aggregat…
…ions so that multiple requests can be done without trying to compute everything in one request.

Closes #21487

@markharwood markharwood force-pushed the markharwood:fix/21487 branch to ba08af5 Nov 24, 2016

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 24, 2016

Rebased and squashed.
We now have a Version.V_5_2_0_UNRELEASED constant in master so using that.
Fixed the error message with the wrong field name too.
I think this is good to go now.

@jpountz

This comment has been minimized.

Copy link
Contributor

jpountz commented Nov 24, 2016

+1

@markharwood

This comment has been minimized.

Copy link
Contributor Author

markharwood commented Nov 24, 2016

Pushed to master aa60e5c and 5.x d78ae86

@clintongormley clintongormley changed the title Aggregations - support for partitioning set of terms Support for partitioning set of terms Jan 24, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment