Skip to content

Commit

Permalink
Add shard_min_doc_count parameter for significant terms similar to …
Browse files Browse the repository at this point in the history
…`shard_size`

Significant terms internally maintain a priority queue per shard with a size potentially
lower than the number of terms. This queue uses the score as criterion to determine if
a bucket is kept or not. If many terms with low subsetDF score very high
but the `min_doc_count` is set high, this might result in no terms being
returned because the pq is filled with low frequent terms which are all sorted
out in the end.

This can be avoided by increasing the `shard_size` parameter to a higher value.
However, it is not immediately clear to which value this parameter must be set
because we can not know how many terms with low frequency are scored higher that
the high frequent terms that we are actually interested in.

On the other hand, if there is no routing of docs to shards involved, we can maybe
assume that the documents of classes and also the terms therein are distributed evenly
across shards. In that case it might be easier to not add documents to the pq that have
subsetDF <= `shard_min_doc_count` which can be set to something like
`min_doc_count`/number of shards  because we would assume that even when summing up
the subsetDF across shards `min_doc_count` will not be reached.

closes #5998
closes #6041
  • Loading branch information
brwe committed May 7, 2014
1 parent f554178 commit 7944369
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 25 deletions.
Expand Up @@ -319,10 +319,23 @@ It is possible to only return terms that match more than a configured number of

The above aggregation would only return tags which have been found in 10 hits or more. Default value is `3`.




Terms that score highly will be collected on a shard level and merged with the terms collected from other shards in a second step. However, the shard does not have the information about the global term frequencies available. The decision if a term is added to a candidate list depends only on the score computed on the shard using local shard frequencies, not the global frequencies of the word. The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`. This might cause many (globally) high frequent terms to be missing in the final result if low frequent but high scoring terms populated the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards. However, this increases memory consumption and network traffic.

The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the `shard_min_doc_count`. If your dictionary contains many low frequent words and you are not interested in these (for example misspellings), then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a resonable certainty not reach the required `min_doc_count` even after merging the local frequencies. `shard_min_doc_count` is set to `1` per default and has no effect unless you explicitly set it.




WARNING: Setting `min_doc_count` to `1` is generally not advised as it tends to return terms that
are typos or other bizarre curiosities. Finding more than one instance of a term helps
reinforce that, while still rare, the term was not the result of a one-off accident. The
default value of 3 is used to provide a minimum weight-of-evidence.
Setting `shard_min_doc_count` too high will cause significant candidate terms to be filtered out on a shard level. This value should be set much lower than `min_doc_count/#shards`.




===== Filtering Values
Expand Down
Expand Up @@ -42,15 +42,17 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri

protected long numCollectedDocs;
protected final SignificantTermsAggregatorFactory termsAggFactory;
protected long shardMinDocCount;

public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
long estimatedBucketCount, long maxOrd, int requiredSize, int shardSize, long minDocCount,
long estimatedBucketCount, long maxOrd, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount,
AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, requiredSize, shardSize,
minDocCount, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
this.shardMinDocCount = shardMinDocCount;
}

@Override
Expand Down Expand Up @@ -99,8 +101,9 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
// Back at the central reducer these properties will be updated with
// global stats
spare.updateScore();

spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
if (spare.subsetDf >= shardMinDocCount) {
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
Expand Down Expand Up @@ -133,8 +136,8 @@ public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {

private final LongHash bucketOrds;

public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggFactory);
public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, aggregationContext, parent, termsAggFactory);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.bigArrays());
}

Expand Down
Expand Up @@ -39,15 +39,17 @@
public class SignificantLongTermsAggregator extends LongTermsAggregator {

public SignificantLongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, @Nullable ValueFormat format,
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount,
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, format, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
this.shardMinDocCount = shardMinDocCount;
}

protected long numCollectedDocs;
private final SignificantTermsAggregatorFactory termsAggFactory;
protected long shardMinDocCount;

@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
Expand Down Expand Up @@ -81,7 +83,9 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) {
spare.updateScore();

spare.bucketOrd = i;
spare = (SignificantLongTerms.Bucket) ordered.insertWithOverflow(spare);
if (spare.subsetDf >= shardMinDocCount) {
spare = (SignificantLongTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
Expand Down
Expand Up @@ -45,15 +45,17 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {

protected long numCollectedDocs;
protected final SignificantTermsAggregatorFactory termsAggFactory;
protected long shardMinDocCount;

public SignificantStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource,
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount,
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount,
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {

super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize,
minDocCount, includeExclude, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
this.shardMinDocCount = shardMinDocCount;
}

@Override
Expand Down Expand Up @@ -90,7 +92,9 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
spare.updateScore();

spare.bucketOrd = i;
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
if (spare.subsetDf >= shardMinDocCount) {
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
}
}

final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
Expand Down Expand Up @@ -130,9 +134,9 @@ public static class WithOrdinals extends SignificantStringTermsAggregator {
private LongArray ordinalToBucket;

public WithOrdinals(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource,
long esitmatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext,
long esitmatedBucketCount, int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, AggregationContext aggregationContext,
Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, esitmatedBucketCount, requiredSize, shardSize, minDocCount, null, aggregationContext, parent, termsAggFactory);
super(name, factories, valuesSource, esitmatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, null, aggregationContext, parent, termsAggFactory);
this.valuesSource = valuesSource;
}

Expand Down
Expand Up @@ -52,9 +52,9 @@ public enum ExecutionMode {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, termsAggregatorFactory);
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, includeExclude, aggregationContext, parent, termsAggregatorFactory);
}

@Override
Expand All @@ -67,12 +67,12 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, aggregationContext, parent, termsAggregatorFactory);
}

@Override
Expand All @@ -85,15 +85,15 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
ValuesSource.Bytes.WithOrdinals valueSourceWithOrdinals = (ValuesSource.Bytes.WithOrdinals) valuesSource;
IndexSearcher indexSearcher = aggregationContext.searchContext().searcher();
long maxOrd = valueSourceWithOrdinals.globalMaxOrd(indexSearcher);
return new GlobalOrdinalsSignificantTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
return new GlobalOrdinalsSignificantTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, requiredSize, shardSize, minDocCount, shardMinDocCount, aggregationContext, parent, termsAggregatorFactory);
}

@Override
Expand All @@ -106,12 +106,12 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, aggregationContext, parent, termsAggregatorFactory);
}

@Override
Expand All @@ -136,7 +136,7 @@ public static ExecutionMode fromString(String value) {
}

abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
int requiredSize, int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory);

abstract boolean needsGlobalOrdinals();
Expand All @@ -150,6 +150,7 @@ public String toString() {
private final int requiredSize;
private final int shardSize;
private final long minDocCount;
private final long shardMinDocCount;
private final IncludeExclude includeExclude;
private final String executionHint;
private String indexedFieldName;
Expand All @@ -159,13 +160,14 @@ public String toString() {
private Filter filter;

public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, int requiredSize,
int shardSize, long minDocCount, IncludeExclude includeExclude,
int shardSize, long minDocCount, long shardMinDocCount, IncludeExclude includeExclude,
String executionHint, Filter filter) {

super(name, SignificantStringTerms.TYPE.name(), valueSourceConfig);
this.requiredSize = requiredSize;
this.shardSize = shardSize;
this.minDocCount = minDocCount;
this.shardMinDocCount = shardMinDocCount;
this.includeExclude = includeExclude;
this.executionHint = executionHint;
if (!valueSourceConfig.unmapped()) {
Expand Down Expand Up @@ -211,7 +213,7 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount
}
assert execution != null;
valuesSource.setNeedsGlobalOrdinals(execution.needsGlobalOrdinals());
return execution.create(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this);
return execution.create(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, includeExclude, aggregationContext, parent, this);
}

if (includeExclude != null) {
Expand All @@ -224,7 +226,7 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount
if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) {
throw new UnsupportedOperationException("No support for examining floating point numerics");
}
return new SignificantLongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this);
return new SignificantLongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), estimatedBucketCount, requiredSize, shardSize, minDocCount, shardMinDocCount, aggregationContext, parent, this);
}

throw new AggregationExecutionException("sigfnificant_terms aggregation cannot be applied to field [" + config.fieldContext().field() +
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class SignificantTermsBuilder extends AggregationBuilder<SignificantTerms
private int requiredSize = SignificantTermsParser.DEFAULT_REQUIRED_SIZE;
private int shardSize = SignificantTermsParser.DEFAULT_SHARD_SIZE;
private int minDocCount = SignificantTermsParser.DEFAULT_MIN_DOC_COUNT;
private int shardMinDocCount = SignificantTermsParser.DEFAULT_SHARD_MIN_DOC_COUNT;
private String executionHint;

public SignificantTermsBuilder(String name) {
Expand All @@ -62,6 +63,11 @@ public SignificantTermsBuilder minDocCount(int minDocCount) {
return this;
}

public SignificantTermsBuilder shardMinDocCount(int shardMinDocCount) {
this.shardMinDocCount = shardMinDocCount;
return this;
}

public SignificantTermsBuilder executionHint(String executionHint) {
this.executionHint = executionHint;
return this;
Expand All @@ -76,6 +82,9 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param
if (minDocCount != SignificantTermsParser.DEFAULT_MIN_DOC_COUNT) {
builder.field("minDocCount", minDocCount);
}
if (shardMinDocCount != SignificantTermsParser.DEFAULT_SHARD_MIN_DOC_COUNT) {
builder.field("shardMinDocCount", shardMinDocCount);
}
if (requiredSize != SignificantTermsParser.DEFAULT_REQUIRED_SIZE) {
builder.field("size", requiredSize);
}
Expand Down

0 comments on commit 7944369

Please sign in to comment.