Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// to be unbounded and most instances may only aggregate few
// documents, so use hashed based
// global ordinals to keep the bucket ords dense.
if (Aggregator.descendsFromBucketAggregator(parent)) {
// Additionally, if using partitioned terms the regular global
// ordinals would be sparse so we opt for hash
if (Aggregator.descendsFromBucketAggregator(parent) ||
(includeExclude != null && includeExclude.isPartitionBased())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
} else {
if (factories == AggregatorFactories.EMPTY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms.support;

import com.carrotsearch.hppc.BitMixer;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;

Expand All @@ -35,6 +36,7 @@
import org.apache.lucene.util.automaton.Operations;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.search.DocValueFormat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
Expand All @@ -61,15 +64,34 @@ public class IncludeExclude implements Writeable, ToXContent {
private static final ParseField INCLUDE_FIELD = new ParseField("include");
private static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
private static final ParseField PATTERN_FIELD = new ParseField("pattern");
private static final ParseField PARTITION_FIELD = new ParseField("partition");
private static final ParseField NUM_PARTITIONS_FIELD = new ParseField("num_partitions");

// The includeValue and excludeValue ByteRefs which are the result of the parsing
// process are converted into a LongFilter when used on numeric fields
// in the index.
public static class LongFilter {
public abstract static class LongFilter {
public abstract boolean accept(long value);

}

public class PartitionedLongFilter extends LongFilter {
private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

@Override
public boolean accept(long value) {
// hash the value to keep even distributions
final long hashCode = BitMixer.mix64(value);
return Math.floorMod(hashCode, incNumPartitions) == incZeroBasedPartition;
}
}


public static class SetBackedLongFilter extends LongFilter {
private LongSet valids;
private LongSet invalids;

private LongFilter(int numValids, int numInvalids) {
private SetBackedLongFilter(int numValids, int numInvalids) {
if (numValids > 0) {
valids = new LongHashSet(numValids);
}
Expand All @@ -96,6 +118,13 @@ public abstract static class StringFilter {
public abstract boolean accept(BytesRef value);
}

class PartitionedStringFilter extends StringFilter {
@Override
public boolean accept(BytesRef value) {
return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition;
}
}

static class AutomatonBackedStringFilter extends StringFilter {

private final ByteRunAutomaton runAutomaton;
Expand Down Expand Up @@ -138,6 +167,25 @@ public abstract static class OrdinalsFilter {

}

class PartitionedOrdinalsFilter extends OrdinalsFilter {

@Override
public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException {
final long numOrds = globalOrdinals.getValueCount();
final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds);
final TermsEnum termEnum = globalOrdinals.termsEnum();

BytesRef term = termEnum.next();
while (term != null) {
if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) {
acceptedGlobalOrdinals.set(termEnum.ord());
}
term = termEnum.next();
}
return acceptedGlobalOrdinals;
}
}

static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter {

private final CompiledAutomaton compiled;
Expand Down Expand Up @@ -205,6 +253,8 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws

private final RegExp include, exclude;
private final SortedSet<BytesRef> includeValues, excludeValues;
private final int incZeroBasedPartition;
private final int incNumPartitions;

/**
* @param include The regular expression pattern for the terms to be included
Expand All @@ -218,6 +268,8 @@ public IncludeExclude(RegExp include, RegExp exclude) {
this.exclude = exclude;
this.includeValues = null;
this.excludeValues = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
}

public IncludeExclude(String include, String exclude) {
Expand All @@ -234,6 +286,8 @@ public IncludeExclude(SortedSet<BytesRef> includeValues, SortedSet<BytesRef> exc
}
this.include = null;
this.exclude = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
this.includeValues = includeValues;
this.excludeValues = excludeValues;
}
Expand All @@ -250,13 +304,30 @@ public IncludeExclude(long[] includeValues, long[] excludeValues) {
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
}

public IncludeExclude(int partition, int numPartitions) {
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions);
}
this.incZeroBasedPartition = partition;
this.incNumPartitions = numPartitions;
this.include = null;
this.exclude = null;
this.includeValues = null;
this.excludeValues = null;

}



/**
* Read from a stream.
*/
public IncludeExclude(StreamInput in) throws IOException {
if (in.readBoolean()) {
includeValues = null;
excludeValues = null;
incZeroBasedPartition = 0;
incNumPartitions = 0;
String includeString = in.readOptionalString();
include = includeString == null ? null : new RegExp(includeString);
String excludeString = in.readOptionalString();
Expand All @@ -283,6 +354,13 @@ public IncludeExclude(StreamInput in) throws IOException {
} else {
excludeValues = null;
}
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
incNumPartitions = in.readVInt();
incZeroBasedPartition = in.readVInt();
} else {
incNumPartitions = 0;
incZeroBasedPartition = 0;
}
}

@Override
Expand All @@ -309,6 +387,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(value);
}
}
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
out.writeVInt(incNumPartitions);
out.writeVInt(incZeroBasedPartition);
}
}
}

Expand Down Expand Up @@ -436,11 +518,26 @@ public boolean token(String currentFieldName, XContentParser.Token token, XConte
if (token == XContentParser.Token.START_OBJECT) {
if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {

// This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0
// Regexes should be "include":"foo.*"
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.text());
} else {
throw new ElasticsearchParseException(
"Unknown string parameter in Include/Exclude clause: " + currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) {
otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue());
} else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.intValue());
} else {
throw new ElasticsearchParseException(
"Unknown numeric parameter in Include/Exclude clause: " + currentFieldName);
}
}
}
Expand Down Expand Up @@ -480,15 +577,43 @@ private Set<BytesRef> parseArrayToSet(XContentParser parser) throws IOException
public IncludeExclude createIncludeExclude(Map<ParseField, Object> otherOptions) {
Object includeObject = otherOptions.get(INCLUDE_FIELD);
String include = null;
int partition = -1;
int numPartitions = -1;
SortedSet<BytesRef> includeValues = null;
if (includeObject != null) {
if (includeObject instanceof String) {
include = (String) includeObject;
} else if (includeObject instanceof SortedSet) {
includeValues = (SortedSet<BytesRef>) includeObject;
} else if (includeObject instanceof Integer) {
partition = (Integer) includeObject;
Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD);
if (numPartitionsObject instanceof Integer) {
numPartitions = (Integer) numPartitionsObject;
if (numPartitions < 2) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1");
}
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException(
PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions);
}
} else {
if (numPartitionsObject == null) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing");
}
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer");
}
}
}
Object excludeObject = otherOptions.get(EXCLUDE_FIELD);
if (numPartitions >0 ){
if(excludeObject!=null){
throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes");
}
return new IncludeExclude(partition, numPartitions);
}


String exclude = null;
SortedSet<BytesRef> excludeValues = null;
if (excludeObject != null) {
Expand Down Expand Up @@ -517,6 +642,10 @@ public boolean isRegexBased() {
return include != null || exclude != null;
}

public boolean isPartitionBased() {
return incNumPartitions > 0;
}

private Automaton toAutomaton() {
Automaton a = null;
if (include != null) {
Expand All @@ -538,6 +667,9 @@ public StringFilter convertToStringFilter(DocValueFormat format) {
if (isRegexBased()) {
return new AutomatonBackedStringFilter(toAutomaton());
}
if (isPartitionBased()){
return new PartitionedStringFilter();
}
return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
}

Expand All @@ -559,13 +691,22 @@ public OrdinalsFilter convertToOrdinalsFilter(DocValueFormat format) {
if (isRegexBased()) {
return new AutomatonBackedOrdinalsFilter(toAutomaton());
}
if (isPartitionBased()){
return new PartitionedOrdinalsFilter();
}

return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
}

public LongFilter convertToLongFilter(DocValueFormat format) {

if(isPartitionBased()){
return new PartitionedLongFilter();
}

int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids);
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) {
for (BytesRef val : includeValues) {
result.addAccept(format.parseLong(val.utf8ToString(), false, null));
Expand All @@ -580,9 +721,13 @@ public LongFilter convertToLongFilter(DocValueFormat format) {
}

public LongFilter convertToDoubleFilter() {
if(isPartitionBased()){
return new PartitionedLongFilter();
}

int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids);
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) {
for (BytesRef val : includeValues) {
double dval = Double.parseDouble(val.utf8ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.max.Max;
Expand All @@ -48,10 +49,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -359,6 +362,43 @@ private void testIncludeExcludeResults(double[] includes, double[] excludes, dou
assertThat(bucket.getDocCount(), equalTo(1L));
}
}

public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
}

public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
}

private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
// Find total number of unique terms
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(allResponse);
Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you set a very high size parameter on the terms aggregation to make sure we got all terms?


// Gather terms using partitioned aggregations
final int numPartitions = randomIntBetween(2, 4);
Set<Number> foundTerms = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertTrue(foundTerms.add(bucket.getKeyAsNumber()));
}
}
assertEquals(expectedCardinality, foundTerms.size());
}

public void testSingleValueFieldOrderedByTermAsc() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down
Loading