Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve terms aggregation to perform the segment ordinal to global ordinal lookup post segment collection #5895

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -126,7 +126,7 @@ public interface OrdinalMappingSource {

}

private static abstract class GlobalOrdinalMapping implements Ordinals.Docs {
public static abstract class GlobalOrdinalMapping implements Ordinals.Docs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to have it public anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need access to this when mapping the collected segment ordinals to global ordinals. Or can that be done via another way?


protected final Ordinals.Docs segmentOrdinals;
private final long memorySizeInBytes;
Expand Down Expand Up @@ -172,7 +172,7 @@ public final long nextOrd() {
return currentGlobalOrd = getGlobalOrd(segmentOrd);
}

protected abstract long getGlobalOrd(long segmentOrd);
public abstract long getGlobalOrd(long segmentOrd);

}

Expand Down Expand Up @@ -288,7 +288,7 @@ private GlobalOrdinalsDocs(Ordinals.Docs segmentOrdinals, MonotonicAppendingLong
}

@Override
protected long getGlobalOrd(long segmentOrd) {
public long getGlobalOrd(long segmentOrd) {
return segmentOrd + segmentOrdToGlobalOrdLookup.get(segmentOrd);
}
}
Expand Down Expand Up @@ -322,7 +322,7 @@ private GlobalOrdinalsDocs(Ordinals.Docs segmentOrdinals, long memorySizeInBytes
}

@Override
protected long getGlobalOrd(long segmentOrd) {
public long getGlobalOrd(long segmentOrd) {
return segmentOrd + segmentOrdToGlobalOrdLookup.get((int) segmentOrd);
}
}
Expand Down
Expand Up @@ -78,11 +78,8 @@ protected final void collectExistingBucket(int doc, long bucketOrd) throws IOExc
}
}

/**
* Initializes the docCounts to the specified size.
*/
public void initializeDocCounts(long maxOrd) {
docCounts = bigArrays.grow(docCounts, maxOrd);
public LongArray getDocCounts() {
return docCounts;
}

/**
Expand All @@ -97,7 +94,7 @@ protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOExc
/**
* Utility method to increment the doc counts of the given bucket (identified by the bucket ordinal)
*/
protected final void incrementBucketDocCount(int inc, long bucketOrd) throws IOException {
protected final void incrementBucketDocCount(long inc, long bucketOrd) throws IOException {
docCounts = bigArrays.grow(docCounts, bucketOrd + 1);
docCounts.increment(bucketOrd, inc);
}
Expand Down
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
Expand All @@ -37,6 +39,8 @@
import java.io.IOException;
import java.util.Arrays;

import static org.elasticsearch.index.fielddata.ordinals.InternalGlobalOrdinalsBuilder.GlobalOrdinalMapping;

/**
* An aggregator of string values that relies on global ordinals in order to build buckets.
*/
Expand All @@ -47,8 +51,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
protected Ordinals.Docs globalOrdinals;

public GlobalOrdinalsStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) {
super(name, factories, estimatedBucketCount, aggregationContext, parent, order, requiredSize, shardSize, minDocCount);
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) {
super(name, factories, maxOrd, aggregationContext, parent, order, requiredSize, shardSize, minDocCount);
this.valuesSource = valuesSource;
}

Expand All @@ -65,7 +69,6 @@ public boolean shouldCollect() {
public void setNextReader(AtomicReaderContext reader) {
globalValues = valuesSource.globalBytesValues();
globalOrdinals = globalValues.ordinals();
initializeDocCounts(globalOrdinals.getMaxOrd());
}

@Override
Expand Down Expand Up @@ -135,9 +138,10 @@ public static class WithHash extends GlobalOrdinalsStringTermsAggregator {
private final LongHash bucketOrds;

public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext,
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext,
Aggregator parent) {
super(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
// Set maxOrd to estimatedBucketCount! To be conservative with memory.
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.bigArrays());
}

Expand Down Expand Up @@ -172,4 +176,79 @@ protected void doClose() {

}

/**
* Variant of {@link GlobalOrdinalsStringTermsAggregator} that resolves global ordinals post segment collection
* instead of on the fly for each match.This is beneficial for low cardinality fields, because it can reduce
* the amount of look-ups significantly.
*/
public static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {

private final LongArray segmentDocCounts;

private Ordinals.Docs segmentOrdinals;
private LongArray current;

public LowCardinality(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount,
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) {
super(name, factories, valuesSource, estimatedBucketCount, maxOrd, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
this.segmentDocCounts = bigArrays.newLongArray(maxOrd, true);
}

@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
final int numOrds = segmentOrdinals.setDocument(doc);
for (int i = 0; i < numOrds; i++) {
final long segmentOrd = segmentOrdinals.nextOrd();
current.increment(segmentOrd, 1);
}
}

@Override
public void setNextReader(AtomicReaderContext reader) {
if (segmentOrdinals != null && segmentOrdinals.getMaxOrd() != globalOrdinals.getMaxOrd()) {
mapSegmentCountsToGlobalCounts();
}

super.setNextReader(reader);
BytesValues.WithOrdinals bytesValues = valuesSource.bytesValues();
segmentOrdinals = bytesValues.ordinals();
if (segmentOrdinals.getMaxOrd() != globalOrdinals.getMaxOrd()) {
current = segmentDocCounts;
} else {
current = getDocCounts();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice way to avoid the condition in collect

}

@Override
protected void doPostCollection() {
if (segmentOrdinals.getMaxOrd() != globalOrdinals.getMaxOrd()) {
mapSegmentCountsToGlobalCounts();
}
}

@Override
protected void doClose() {
Releasables.close(segmentDocCounts);
}

private void mapSegmentCountsToGlobalCounts() {
// There is no public method in Ordinals.Docs that allows for this mapping...
// This is the cleanest way I can think of so far
GlobalOrdinalMapping mapping = (GlobalOrdinalMapping) globalOrdinals;
for (int i = 0; i < segmentDocCounts.size(); i++) {
final long inc = segmentDocCounts.get(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do inc = segmentDocCounts.set(i, 0) to get & reset at the same time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ow nice, I didn't know that

if (inc == 0) {
continue;
}
final long globalOrd = mapping.getGlobalOrd(i);
try {
incrementBucketDocCount(inc, globalOrd);
segmentDocCounts.set(i, 0); // reset for next segment
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
}
}
}

}
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;

import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.search.aggregations.*;
Expand All @@ -39,8 +40,8 @@ public enum ExecutionMode {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
return new StringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent);
}

Expand All @@ -54,8 +55,8 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
Expand All @@ -72,12 +73,12 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new GlobalOrdinalsStringTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
return new GlobalOrdinalsStringTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
}

@Override
Expand All @@ -90,12 +91,32 @@ boolean needsGlobalOrdinals() {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new GlobalOrdinalsStringTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
}

@Override
boolean needsGlobalOrdinals() {
return true;
}
},
GLOBAL_ORDINALS_LOW_CARDINALITY(new ParseField("global_ordinals_low_cardinality")) {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new GlobalOrdinalsStringTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
if (factories != AggregatorFactories.EMPTY) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode can only be used as a leaf aggregation");
}
return new GlobalOrdinalsStringTermsAggregator.LowCardinality(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
}

@Override
Expand All @@ -120,8 +141,8 @@ public static ExecutionMode fromString(String value) {
}

abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount,
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent);
long maxOrd, InternalOrder order, int requiredSize, int shardSize, long minDocCount,
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent);

abstract boolean needsGlobalOrdinals();

Expand Down Expand Up @@ -200,6 +221,18 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount
execution = ExecutionMode.MAP;
}

final long maxOrd;
final double ratio;
if (execution == null || execution.needsGlobalOrdinals()) {
ValuesSource.Bytes.WithOrdinals valueSourceWithOrdinals = (ValuesSource.Bytes.WithOrdinals) valuesSource;
IndexSearcher indexSearcher = aggregationContext.searchContext().searcher();
maxOrd = valueSourceWithOrdinals.maxOrd(indexSearcher);
ratio = maxOrd / ((double) indexSearcher.getIndexReader().numDocs());
} else {
maxOrd = -1;
ratio = -1;
}

// Let's try to use a good default
if (execution == null) {
// if there is a parent bucket aggregator the number of instances of this aggregator is going
Expand All @@ -208,12 +241,23 @@ protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount
if (hasParentBucketAggregator(parent)) {
execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
} else {
execution = ExecutionMode.GLOBAL_ORDINALS;
if (factories == AggregatorFactories.EMPTY) {
if (ratio <= 0.5 && maxOrd <= 2048) {
// 0.5: At least we need reduce the number of global ordinals look-ups by half
// 2048: GLOBAL_ORDINALS_LOW_CARDINALITY has additional memory usage, which directly linked to maxOrd, so we need to limit.
execution = ExecutionMode.GLOBAL_ORDINALS_LOW_CARDINALITY;
} else {
execution = ExecutionMode.GLOBAL_ORDINALS;
}
} else {
execution = ExecutionMode.GLOBAL_ORDINALS;
}
}
}

assert execution != null;
valuesSource.setNeedsGlobalOrdinals(execution.needsGlobalOrdinals());
return execution.create(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent);
return execution.create(name, factories, valuesSource, estimatedBucketCount, maxOrd, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent);
}

if (includeExclude != null) {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.AtomicFieldData.Order;
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.SortedAndUnique.SortedUniqueBytesValues;
import org.elasticsearch.search.aggregations.support.values.ScriptBytesValues;
Expand Down Expand Up @@ -159,6 +161,8 @@ public static abstract class WithOrdinals extends Bytes implements TopReaderCont

public abstract BytesValues.WithOrdinals globalBytesValues();

public abstract long maxOrd(IndexSearcher indexSearcher);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be called globalMaxOrd?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should, I'll change that.


public static class FieldData extends WithOrdinals implements ReaderContextAware {

protected boolean needsHashes;
Expand Down Expand Up @@ -229,6 +233,17 @@ public BytesValues.WithOrdinals globalBytesValues() {
}
return globalBytesValues;
}

@Override
public long maxOrd(IndexSearcher indexSearcher) {
IndexReaderContext topReaderContext = indexSearcher.getTopReaderContext();
AtomicReaderContext atomicReaderContext = indexSearcher.getIndexReader().leaves().get(0);
IndexFieldData.WithOrdinals<?> globalFieldData = indexFieldData.loadGlobal(topReaderContext.reader());
AtomicFieldData.WithOrdinals afd = globalFieldData.load(atomicReaderContext);
BytesValues.WithOrdinals values = afd.getBytesValues(false);
Ordinals.Docs ordinals = values.ordinals();
return ordinals.getMaxOrd();
}
}

}
Expand Down