Skip to content

Commit

Permalink
Skip range optimization if it'd be slower (#65097)
Browse files Browse the repository at this point in the history
Don't run the `date_histogram` and `range` aggregations as a `filters`
aggregation if the cost of the filters is high. This should prevent
the optimization from de-optimizing when it bumps into runtime fields
which don't have index structures to speed their queries. For runtime
fields we're better off running `date_histogram` and `range` using the
native `range` aggregator. We detect this situation using `cost` on
the `BulkScorer` from the queries to keep the change general. So it'll
detect other sorts of queries that might be a poor choice for
optimization.
  • Loading branch information
nik9000 committed Nov 25, 2020
1 parent 427930d commit 91cbb9d
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -626,7 +627,11 @@ public void testFilterByFilter() throws InterruptedException, IOException {
);
assertThat(((Number) delegate.get("ranges")).longValue(), equalTo(1L));
assertThat(delegate.get("delegate"), equalTo("FiltersAggregator.FilterByFilter"));
assertThat(delegate.get("delegate_debug"), equalTo(Map.of("segments_with_deleted_docs", 0)));
Map<?, ?> delegateDebug = (Map<?, ?>) delegate.get("delegate_debug");
assertThat(delegateDebug, hasEntry("segments_with_deleted_docs", 0));
assertThat(delegateDebug, hasEntry("max_cost", (long) RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2));
assertThat(delegateDebug, hasEntry("estimated_cost", (long) RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2));
assertThat((long) delegateDebug.get("estimate_cost_time"), greaterThanOrEqualTo(0L)); // ~1,276,734 nanos is normal
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public static FiltersAggregator build(
* and we don't collect "other" buckets. Collecting {@link FilterByFilter}
* is generally going to be much faster than the {@link Compatible} aggregator.
*/
public static FiltersAggregator buildFilterOrderOrNull(
public static FilterByFilter buildFilterOrderOrNull(
String name,
AggregatorFactories factories,
String[] keys,
Expand Down Expand Up @@ -269,12 +269,25 @@ public InternalAggregation buildEmptyAggregation() {
* {@link Compatible} but doesn't support when there is a parent aggregator
* or any child aggregators.
*/
private static class FilterByFilter extends FiltersAggregator {
public static class FilterByFilter extends FiltersAggregator {
private final Query[] filters;
private Weight[] filterWeights;
private final boolean profiling;
private long estimatedCost = -1;
/**
* The maximum allowed estimated cost. Defaults to {@code -1} meaning no
* max but can be set. Used for emitting debug info.
*/
private long maxCost = -1;
private long estimateCostTime;
private Weight[] weights;
/**
* If {@link #estimateCost} was called then this'll contain a
* scorer per leaf per filter. If it wasn't then this'll be {@code null}.
*/
private BulkScorer[][] scorers;
private int segmentsWithDeletedDocs;

FilterByFilter(
private FilterByFilter(
String name,
String[] keys,
Query[] filters,
Expand All @@ -286,6 +299,57 @@ private static class FilterByFilter extends FiltersAggregator {
) throws IOException {
super(name, AggregatorFactories.EMPTY, keys, keyed, null, context, parent, cardinality, metadata);
this.filters = filters;
this.profiling = context.getProfilers() != null;
}

/**
* Estimate the number of documents that this aggregation must visit. We'll
* stop counting once we've passed {@code maxEstimatedCost} if we aren't profiling.
*/
public long estimateCost(long maxCost) throws IOException {
this.maxCost = maxCost;
if (estimatedCost != -1) {
return estimatedCost;
}
long limit = profiling ? Long.MAX_VALUE : maxCost;
long start = profiling ? System.nanoTime() : 0;
estimatedCost = 0;
weights = buildWeights(topLevelQuery(), filters);
List<LeafReaderContext> leaves = searcher().getIndexReader().leaves();
/*
* Its important that we save a copy of the BulkScorer because for
* queries like PointInRangeQuery building the scorer can be a big
* chunk of the run time.
*/
scorers = new BulkScorer[leaves.size()][];
for (LeafReaderContext ctx : leaves) {
scorers[ctx.ord] = new BulkScorer[filters.length];
for (int f = 0; f < filters.length; f++) {
scorers[ctx.ord][f] = weights[f].bulkScorer(ctx);
if (scorers[ctx.ord][f] == null) {
// Doesn't find anything in this leaf
continue;
}
if (estimatedCost >= 0 && estimatedCost <= limit) {
// If we've overflowed or are past the limit skip the cost
estimatedCost += scorers[ctx.ord][f].cost();
}
}
}
if (profiling) {
estimateCostTime = System.nanoTime() - start;
}
// If we've overflowed use Long.MAX_VALUE
return estimatedCost < 0 ? Long.MAX_VALUE : estimatedCost;
}

/**
* Are the scorers cached?
* <p>
* Package private for testing.
*/
boolean scorersCached() {
return scorers != null;
}

/**
Expand All @@ -297,12 +361,19 @@ private static class FilterByFilter extends FiltersAggregator {
*/
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (filterWeights == null) {
filterWeights = buildWeights(topLevelQuery(), filters);
if (weights == null) {
weights = buildWeights(topLevelQuery(), filters);
}
Bits live = ctx.reader().getLiveDocs();
for (int filterOrd = 0; filterOrd < filters.length; filterOrd++) {
BulkScorer scorer = filterWeights[filterOrd].bulkScorer(ctx);
BulkScorer scorer;
if (scorers == null) {
// No cached scorers
scorer = weights[filterOrd].bulkScorer(ctx);
} else {
// Scorers cached when calling estimateCost
scorer = scorers[ctx.ord][filterOrd];
}
if (scorer == null) {
// the filter doesn't match any docs
continue;
Expand All @@ -319,6 +390,12 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("segments_with_deleted_docs", segmentsWithDeletedDocs);
if (estimatedCost != -1) {
// -1 means we didn't estimate it.
add.accept("estimated_cost", estimatedCost);
add.accept("max_cost", maxCost);
add.accept("estimate_cost_time", estimateCostTime);
}
}
}

Expand Down Expand Up @@ -402,7 +479,7 @@ protected Weight[] buildWeights(Query topLevelQuery, Query filters[]) throws IOE
* top level query on a date and a filter on a date. This kind of thing
* is very common when visualizing logs and metrics.
*/
private Query filterMatchingBoth(Query lhs, Query rhs) {
static Query filterMatchingBoth(Query lhs, Query rhs) {
if (lhs instanceof MatchAllDocsQuery) {
return rhs;
}
Expand All @@ -424,7 +501,7 @@ private Query filterMatchingBoth(Query lhs, Query rhs) {
return builder.build();
}

private Query unwrap(Query query) {
private static Query unwrap(Query query) {
if (query instanceof IndexSortSortedNumericDocValuesRangeQuery) {
query = ((IndexSortSortedNumericDocValuesRangeQuery) query).getFallbackQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
rangeFactory,
ranges,
averageDocsPerRange,
null, // null here because we didn't try filters at all
keyed,
context,
parent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -288,7 +289,7 @@ public static Aggregator build(
* which is good enough because that is the most embarrassing scenario.
*/
double averageDocsPerRange = ((double) context.searcher().getIndexReader().maxDoc()) / ranges.length;
Aggregator adapted = adaptIntoFiltersOrNull(
FromFilters<?> adapted = adaptIntoFiltersOrNull(
name,
factories,
valuesSourceConfig,
Expand All @@ -301,8 +302,23 @@ public static Aggregator build(
cardinality,
metadata
);
Map<String, Object> filtersDebug = null;
if (adapted != null) {
return adapted;
long maxEstimatedFiltersCost = context.searcher().getIndexReader().maxDoc();
long estimatedFiltersCost = adapted.estimateCost(maxEstimatedFiltersCost);
if (estimatedFiltersCost <= maxEstimatedFiltersCost) {
return adapted;
}
/*
* Looks like it'd be more expensive to use the filter-by-filter
* aggregator. Oh well. Snapshot the the filter-by-filter
* aggregator's debug information if we're profiling bececause it
* is useful even if the aggregator isn't.
*/
if (context.getProfilers() != null) {
filtersDebug = new HashMap<>();
adapted.delegate().collectDebugInfo(filtersDebug::put);
}
}
return buildWithoutAttemptedToAdaptToFilters(
name,
Expand All @@ -312,6 +328,7 @@ public static Aggregator build(
rangeFactory,
ranges,
averageDocsPerRange,
filtersDebug,
keyed,
context,
parent,
Expand All @@ -320,7 +337,7 @@ public static Aggregator build(
);
}

public static Aggregator adaptIntoFiltersOrNull(
public static FromFilters<?> adaptIntoFiltersOrNull(
String name,
AggregatorFactories factories,
ValuesSourceConfig valuesSourceConfig,
Expand Down Expand Up @@ -383,7 +400,7 @@ public static Aggregator adaptIntoFiltersOrNull(
context.getQueryShardContext()
);
}
FiltersAggregator delegate = FiltersAggregator.buildFilterOrderOrNull(
FiltersAggregator.FilterByFilter delegate = FiltersAggregator.buildFilterOrderOrNull(
name,
factories,
keys,
Expand Down Expand Up @@ -424,6 +441,7 @@ public static Aggregator buildWithoutAttemptedToAdaptToFilters(
InternalRange.Factory<?, ?> rangeFactory,
Range[] ranges,
double averageDocsPerRange,
Map<String, Object> filtersDebug,
boolean keyed,
SearchContext context,
Aggregator parent,
Expand All @@ -439,6 +457,7 @@ public static Aggregator buildWithoutAttemptedToAdaptToFilters(
rangeFactory,
ranges,
averageDocsPerRange,
filtersDebug,
keyed,
context,
parent,
Expand All @@ -454,6 +473,7 @@ public static Aggregator buildWithoutAttemptedToAdaptToFilters(
rangeFactory,
ranges,
averageDocsPerRange,
filtersDebug,
keyed,
context,
parent,
Expand All @@ -468,11 +488,23 @@ public static Aggregator buildWithoutAttemptedToAdaptToFilters(
private final boolean keyed;
private final InternalRange.Factory rangeFactory;
private final double averageDocsPerRange;
private final Map<String, Object> filtersDebug;

private RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
InternalRange.Factory rangeFactory, Range[] ranges, double averageDocsPerRange, boolean keyed, SearchContext context,
Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata) throws IOException {

private RangeAggregator(
String name,
AggregatorFactories factories,
ValuesSource.Numeric valuesSource,
DocValueFormat format,
InternalRange.Factory rangeFactory,
Range[] ranges,
double averageDocsPerRange,
Map<String, Object> filtersDebug,
boolean keyed,
SearchContext context,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, cardinality.multiply(ranges.length), metadata);
assert valuesSource != null;
this.valuesSource = valuesSource;
Expand All @@ -481,6 +513,7 @@ private RangeAggregator(String name, AggregatorFactories factories, ValuesSource
this.rangeFactory = rangeFactory;
this.ranges = ranges;
this.averageDocsPerRange = averageDocsPerRange;
this.filtersDebug = filtersDebug;
}

@Override
Expand Down Expand Up @@ -540,6 +573,9 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("ranges", ranges.length);
add.accept("average_docs_per_range", averageDocsPerRange);
if (filtersDebug != null) {
add.accept("filters_debug", filtersDebug);
}
}

public static class Unmapped<R extends RangeAggregator.Range> extends NonCollectingAggregator {
Expand Down Expand Up @@ -591,6 +627,7 @@ private static class NoOverlap extends RangeAggregator {
Factory rangeFactory,
Range[] ranges,
double averageDocsPerRange,
Map<String, Object> filtersDebug,
boolean keyed,
SearchContext context,
Aggregator parent,
Expand All @@ -605,6 +642,7 @@ private static class NoOverlap extends RangeAggregator {
rangeFactory,
ranges,
averageDocsPerRange,
filtersDebug,
keyed,
context,
parent,
Expand Down Expand Up @@ -641,6 +679,7 @@ private static class Overlap extends RangeAggregator {
Factory rangeFactory,
Range[] ranges,
double averageDocsPerRange,
Map<String, Object> filtersDebug,
boolean keyed,
SearchContext context,
Aggregator parent,
Expand All @@ -655,6 +694,7 @@ private static class Overlap extends RangeAggregator {
rangeFactory,
ranges,
averageDocsPerRange,
filtersDebug,
keyed,
context,
parent,
Expand Down Expand Up @@ -747,6 +787,13 @@ private static class FromFilters<B extends InternalRange.Bucket> extends Adaptin
this.averageDocsPerRange = averageDocsPerRange;
}

/**
* Estimate the number of documents that this aggregation must visit.
*/
long estimateCost(long maxEstimatedCost) throws IOException {
return ((FiltersAggregator.FilterByFilter) delegate()).estimateCost(maxEstimatedCost);
}

@Override
protected InternalAggregation adapt(InternalAggregation delegateResult) {
InternalFilters filters = (InternalFilters) delegateResult;
Expand Down

0 comments on commit 91cbb9d

Please sign in to comment.