From d26d7721ea2ca14c578c55a769f496917b7006d6 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sun, 23 Feb 2020 17:13:55 -0500 Subject: [PATCH] Continue realizing sorting by aggregations (backport of #52298) (#52667) This drops more of the `instanceof`s from `AggregationPath`. There are still a couple in `AggregationPath`. And I ended up moving two into `BucketsAggregator`, but I think this is still an improvement! --- .../search/aggregations/Aggregator.java | 49 ++++++++ .../bucket/BucketsAggregator.java | 22 ++++ .../bucket/DeferringBucketCollector.java | 11 ++ .../bucket/terms/TermsAggregator.java | 8 +- .../InternalNumericMetricsAggregation.java | 2 +- .../metrics/NumericMetricsAggregator.java | 19 ++++ .../aggregations/support/AggregationPath.java | 105 +++--------------- .../aggregation/ProfilingAggregator.java | 12 ++ .../bucket/terms/TermsAggregatorTests.java | 2 +- .../aggregation/AggregationProfilerIT.java | 5 +- 10 files changed, 140 insertions(+), 95 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index 16f8aaf8f52de..69381b93e49be 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -28,9 +28,11 @@ import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Iterator; /** * An Aggregator. @@ -91,6 +93,53 @@ public static boolean descendsFromBucketAggregator(Aggregator parent) { */ public abstract Aggregator subAggregator(String name); + /** + * Resolve the next step of the sort path as though this aggregation + * supported sorting. This is usually the "first step" when resolving + * a sort path because most aggs that support sorting their buckets + * aren't valid in the middle of a sort path. + *

+ * For example, the {@code terms} aggs supports sorting its buckets, but + * that sort path itself can't contain a different {@code terms} + * aggregation. + */ + public final Aggregator resolveSortPathOnValidAgg(AggregationPath.PathElement next, Iterator path) { + Aggregator n = subAggregator(next.name); + if (n == null) { + throw new IllegalArgumentException("The provided aggregation [" + next + "] either does not exist, or is " + + "a pipeline aggregation and cannot be used to sort the buckets."); + } + if (false == path.hasNext()) { + return n; + } + if (next.key != null) { + throw new IllegalArgumentException("Key only allowed on last aggregation path element but got [" + next + "]"); + } + return n.resolveSortPath(path.next(), path); + } + + /** + * Resolve a sort path to the target. + *

+ * The default implementation throws an exception but we override it on aggregations that support sorting. + */ + public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator path) { + throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " + + "that is built out of zero or more single-bucket aggregations within the path and a final " + + "single-bucket or a metrics aggregation at the path end. [" + name() + "] is not single-bucket."); + } + + /** + * Validates the "key" portion of a sort on this aggregation. + *

+ * The default implementation throws an exception but we override it on aggregations that support sorting. + */ + public void validateSortPathKey(String key) { + throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " + + "that is built out of zero or more single-bucket aggregations within the path and a final " + + "single-bucket or a metrics aggregation at the path end."); + } + /** * Build an aggregation for data that has been collected into {@code bucket}. */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index a4ef4286447c1..e53e2d7885af2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -28,10 +28,12 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.IntConsumer; @@ -163,4 +165,24 @@ public final void close() { } } + @Override + public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator path) { + if (this instanceof SingleBucketAggregator) { + return resolveSortPathOnValidAgg(next, path); + } + return super.resolveSortPath(next, path); + } + + @Override + public void validateSortPathKey(String key) { + if (false == this instanceof SingleBucketAggregator) { + super.validateSortPathKey(key); + return; + } + if (key != null && false == "doc_count".equals(key)) { + throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " + + "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + + ".doc_count\")"); + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index 7151a6f33d9fe..e26037771d059 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -25,9 +25,11 @@ import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Iterator; /** * A {@link BucketCollector} that records collected doc IDs and buckets and @@ -120,6 +122,15 @@ public void postCollection() throws IOException { "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); } + @Override + public Aggregator resolveSortPath(PathElement next, Iterator path) { + return in.resolveSortPath(next, path); + } + + @Override + public void validateSortPathKey(String key) { + in.validateSortPathKey(key); + } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index 189c2ee796eb4..abdd36b3d27a6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -43,6 +43,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; import java.io.IOException; import java.util.Comparator; @@ -249,7 +250,12 @@ private boolean subAggsNeedScore() { */ public Comparator bucketComparator(AggregationPath path, boolean asc) { - final Aggregator aggregator = path.resolveAggregator(this); + Aggregator agg = path.resolveAggregator(this); + // TODO Move this method into Aggregator or AggregationPath. + if (agg instanceof ProfilingAggregator) { + agg = ProfilingAggregator.unwrap(agg); + } + final Aggregator aggregator = agg; final String key = path.lastPathElement().key; if (aggregator instanceof SingleBucketAggregator) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java index 05b72dff319c5..ceb78667e50ac 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java @@ -107,7 +107,7 @@ public Object getProperty(List path) { @Override public final double sortValue(String key) { if (key == null) { - throw new IllegalArgumentException("Missing value key in [" + key+ "] which refers to a multi-value metric aggregation"); + throw new IllegalArgumentException("Missing value key in [" + key + "] which refers to a multi-value metric aggregation"); } return value(key); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java index 9e49a65cc06d8..24e95ea20d966 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -41,6 +41,14 @@ protected SingleValue(String name, SearchContext context, Aggregator parent, Lis } public abstract double metric(long owningBucketOrd); + + @Override + public void validateSortPathKey(String key) { + if (key != null && false == "value".equals(key)) { + throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " + + "Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")"); + } + } } public abstract static class MultiValue extends NumericMetricsAggregator { @@ -53,5 +61,16 @@ protected MultiValue(String name, SearchContext context, Aggregator parent, List public abstract boolean hasMetric(String name); public abstract double metric(String name, long owningBucketOrd); + + @Override + public void validateSortPathKey(String key) { + if (key == null) { + throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified."); + } + if (false == hasMetric(key)) { + throw new IllegalArgumentException( + "Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]"); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java index 2e14e308a4a89..2453a17a36747 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java @@ -182,7 +182,7 @@ public List getPathElementsAsStringList() { return stringPathElements; } - public AggregationPath subPath(int offset, int length) { + private AggregationPath subPath(int offset, int length) { List subTokens = new ArrayList<>(pathElements.subList(offset, offset + length)); return new AggregationPath(subTokens); } @@ -196,38 +196,29 @@ public double resolveValue(InternalAggregations aggregations) { assert path.hasNext(); return aggregations.sortValue(path.next(), path); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid order path [" + this + "]. " + e.getMessage(), e); + throw new IllegalArgumentException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e); } } /** - * Resolves the aggregator pointed by this path using the given root as a point of reference. - * - * @param root The point of reference of this path - * @return The aggregator pointed by this path starting from the given aggregator as a point of reference + * Resolves the {@linkplain Aggregator} pointed to by this path against + * the given root {@linkplain Aggregator}. */ public Aggregator resolveAggregator(Aggregator root) { - Aggregator aggregator = root; - for (int i = 0; i < pathElements.size(); i++) { - AggregationPath.PathElement token = pathElements.get(i); - aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(token.name)); - assert (aggregator instanceof SingleBucketAggregator && i <= pathElements.size() - 1) - || (aggregator instanceof NumericMetricsAggregator && i == pathElements.size() - 1) : - "this should be picked up before aggregation execution - on validate"; - } - return aggregator; + Iterator path = pathElements.iterator(); + assert path.hasNext(); + return root.resolveSortPathOnValidAgg(path.next(), path); } /** - * Resolves the topmost aggregator pointed by this path using the given root as a point of reference. - * - * @param root The point of reference of this path - * @return The first child aggregator of the root pointed by this path + * Resolves the {@linkplain Aggregator} pointed to by the first element + * of this path against the given root {@linkplain Aggregator}. */ public Aggregator resolveTopmostAggregator(Aggregator root) { AggregationPath.PathElement token = pathElements.get(0); + // TODO both unwrap and subAggregator are only used here! Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name)); - assert (aggregator instanceof SingleBucketAggregator ) + assert (aggregator instanceof SingleBucketAggregator) || (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate"; return aggregator; } @@ -239,76 +230,10 @@ public Aggregator resolveTopmostAggregator(Aggregator root) { * @throws AggregationExecutionException on validation error */ public void validate(Aggregator root) throws AggregationExecutionException { - Aggregator aggregator = root; - for (int i = 0; i < pathElements.size(); i++) { - String name = pathElements.get(i).name; - aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(name)); - if (aggregator == null) { - throw new AggregationExecutionException("Invalid aggregator order path [" + this + "]. The " + - "provided aggregation [" + name + "] either does not exist, or is a pipeline aggregation " + - "and cannot be used to sort the buckets."); - } - - if (i < pathElements.size() - 1) { - - // we're in the middle of the path, so the aggregator can only be a single-bucket aggregator - - if (!(aggregator instanceof SingleBucketAggregator)) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Buckets can only be sorted on a sub-aggregator path " + - "that is built out of zero or more single-bucket aggregations within the path and a final " + - "single-bucket or a metrics aggregation at the path end. Sub-path [" + - subPath(0, i + 1) + "] points to non single-bucket aggregation"); - } - - if (pathElements.get(i).key != null) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Buckets can only be sorted on a sub-aggregator path " + - "that is built out of zero or more single-bucket aggregations within the path and a " + - "final single-bucket or a metrics aggregation at the path end. Sub-path [" + - subPath(0, i + 1) + "] points to non single-bucket aggregation"); - } - } - } - boolean singleBucket = aggregator instanceof SingleBucketAggregator; - if (!singleBucket && !(aggregator instanceof NumericMetricsAggregator)) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Buckets can only be sorted on a sub-aggregator path " + - "that is built out of zero or more single-bucket aggregations within the path and a final " + - "single-bucket or a metrics aggregation at the path end."); - } - - AggregationPath.PathElement lastToken = lastPathElement(); - - if (singleBucket) { - if (lastToken.key != null && !"doc_count".equals(lastToken.key)) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Ordering on a single-bucket aggregation can only be done on its doc_count. " + - "Either drop the key (a la \"" + lastToken.name + "\") or change it to \"doc_count\" (a la \"" + lastToken.name + - ".doc_count\")"); - } - return; // perfectly valid to sort on single-bucket aggregation (will be sored on its doc_count) - } - - if (aggregator instanceof NumericMetricsAggregator.SingleValue) { - if (lastToken.key != null && !"value".equals(lastToken.key)) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Ordering on a single-value metrics aggregation can only be done on its value. " + - "Either drop the key (a la \"" + lastToken.name + "\") or change it to \"value\" (a la \"" + lastToken.name + - ".value\")"); - } - return; // perfectly valid to sort on single metric aggregation (will be sorted on its associated value) - } - - // the aggregator must be of a multi-value metrics type - if (lastToken.key == null) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. When ordering on a multi-value metrics aggregation a metric name must be specified"); - } - - if (!((NumericMetricsAggregator.MultiValue) aggregator).hasMetric(lastToken.key)) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + - "]. Unknown metric name [" + lastToken.key + "] on multi-value metrics aggregation [" + lastToken.name + "]"); + try { + resolveAggregator(root).validateSortPathKey(lastPathElement().key); + } catch (IllegalArgumentException e) { + throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e); } } diff --git a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java index 16388fa789aff..b7fbf9a077284 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java @@ -24,10 +24,12 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.Timer; import java.io.IOException; +import java.util.Iterator; public class ProfilingAggregator extends Aggregator { @@ -70,6 +72,16 @@ public Aggregator subAggregator(String name) { return delegate.subAggregator(name); } + @Override + public Aggregator resolveSortPath(PathElement next, Iterator path) { + return delegate.resolveSortPath(next, path); + } + + @Override + public void validateSortPathKey(String key) { + delegate.validateSortPathKey(key); + } + @Override public InternalAggregation buildAggregation(long bucket) throws IOException { Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_AGGREGATION); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 611e7d916c9c9..7ef271fbe5055 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -1237,7 +1237,7 @@ public void testOrderByPipelineAggregation() throws Exception { AggregationExecutionException e = expectThrows(AggregationExecutionException.class, () -> createAggregator(termsAgg, indexSearcher, fieldType)); - assertEquals("Invalid aggregator order path [script]. The provided aggregation [script] " + + assertEquals("Invalid aggregation order path [script]. The provided aggregation [script] " + "either does not exist, or is a pipeline aggregation and cannot be used to sort the buckets.", e.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java index 197e82ea3a47b..9c72fbe0fcc6f 100644 --- a/server/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java @@ -28,19 +28,20 @@ import org.elasticsearch.search.profile.ProfileResult; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.test.ESIntegTestCase; + import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue;