Skip to content

Commit

Permalink
Continue realizing sorting by aggregations (backport of #52298) (#52667)
Browse files Browse the repository at this point in the history
This drops more of the `instanceof`s from `AggregationPath`. There are
still a couple in `AggregationPath`. And I ended up moving two into
`BucketsAggregator`, but I think this is still an improvement!
  • Loading branch information
nik9000 committed Feb 23, 2020
1 parent a0aa808 commit d26d772
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Iterator;

/**
* An Aggregator.
Expand Down Expand Up @@ -91,6 +93,53 @@ public static boolean descendsFromBucketAggregator(Aggregator parent) {
*/
public abstract Aggregator subAggregator(String name);

/**
* Resolve the next step of the sort path as though this aggregation
* supported sorting. This is usually the "first step" when resolving
* a sort path because most aggs that support sorting their buckets
* aren't valid in the middle of a sort path.
* <p>
* For example, the {@code terms} aggs supports sorting its buckets, but
* that sort path itself can't contain a different {@code terms}
* aggregation.
*/
public final Aggregator resolveSortPathOnValidAgg(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
Aggregator n = subAggregator(next.name);
if (n == null) {
throw new IllegalArgumentException("The provided aggregation [" + next + "] either does not exist, or is "
+ "a pipeline aggregation and cannot be used to sort the buckets.");
}
if (false == path.hasNext()) {
return n;
}
if (next.key != null) {
throw new IllegalArgumentException("Key only allowed on last aggregation path element but got [" + next + "]");
}
return n.resolveSortPath(path.next(), path);
}

/**
* Resolve a sort path to the target.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. [" + name() + "] is not single-bucket.");
}

/**
* Validates the "key" portion of a sort on this aggregation.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public void validateSortPathKey(String key) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}

/**
* Build an aggregation for data that has been collected into {@code bucket}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntConsumer;
Expand Down Expand Up @@ -163,4 +165,24 @@ public final void close() {
}
}

@Override
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
if (this instanceof SingleBucketAggregator) {
return resolveSortPathOnValidAgg(next, path);
}
return super.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
if (false == this instanceof SingleBucketAggregator) {
super.validateSortPathKey(key);
return;
}
if (key != null && false == "doc_count".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
".doc_count\")");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Iterator;

/**
* A {@link BucketCollector} that records collected doc IDs and buckets and
Expand Down Expand Up @@ -120,6 +122,15 @@ public void postCollection() throws IOException {
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return in.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
in.validateSortPathKey(key);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.aggregation.ProfilingAggregator;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -249,7 +250,12 @@ private boolean subAggsNeedScore() {
*/
public Comparator<Bucket> bucketComparator(AggregationPath path, boolean asc) {

final Aggregator aggregator = path.resolveAggregator(this);
Aggregator agg = path.resolveAggregator(this);
// TODO Move this method into Aggregator or AggregationPath.
if (agg instanceof ProfilingAggregator) {
agg = ProfilingAggregator.unwrap(agg);
}
final Aggregator aggregator = agg;
final String key = path.lastPathElement().key;

if (aggregator instanceof SingleBucketAggregator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Object getProperty(List<String> path) {
@Override
public final double sortValue(String key) {
if (key == null) {
throw new IllegalArgumentException("Missing value key in [" + key+ "] which refers to a multi-value metric aggregation");
throw new IllegalArgumentException("Missing value key in [" + key + "] which refers to a multi-value metric aggregation");
}
return value(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ protected SingleValue(String name, SearchContext context, Aggregator parent, Lis
}

public abstract double metric(long owningBucketOrd);

@Override
public void validateSortPathKey(String key) {
if (key != null && false == "value".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")");
}
}
}

public abstract static class MultiValue extends NumericMetricsAggregator {
Expand All @@ -53,5 +61,16 @@ protected MultiValue(String name, SearchContext context, Aggregator parent, List
public abstract boolean hasMetric(String name);

public abstract double metric(String name, long owningBucketOrd);

@Override
public void validateSortPathKey(String key) {
if (key == null) {
throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified.");
}
if (false == hasMetric(key)) {
throw new IllegalArgumentException(
"Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public List<String> getPathElementsAsStringList() {
return stringPathElements;
}

public AggregationPath subPath(int offset, int length) {
private AggregationPath subPath(int offset, int length) {
List<PathElement> subTokens = new ArrayList<>(pathElements.subList(offset, offset + length));
return new AggregationPath(subTokens);
}
Expand All @@ -196,38 +196,29 @@ public double resolveValue(InternalAggregations aggregations) {
assert path.hasNext();
return aggregations.sortValue(path.next(), path);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid order path [" + this + "]. " + e.getMessage(), e);
throw new IllegalArgumentException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
}
}

/**
* Resolves the aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The aggregator pointed by this path starting from the given aggregator as a point of reference
* Resolves the {@linkplain Aggregator} pointed to by this path against
* the given root {@linkplain Aggregator}.
*/
public Aggregator resolveAggregator(Aggregator root) {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
AggregationPath.PathElement token = pathElements.get(i);
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator && i <= pathElements.size() - 1)
|| (aggregator instanceof NumericMetricsAggregator && i == pathElements.size() - 1) :
"this should be picked up before aggregation execution - on validate";
}
return aggregator;
Iterator<PathElement> path = pathElements.iterator();
assert path.hasNext();
return root.resolveSortPathOnValidAgg(path.next(), path);
}

/**
* Resolves the topmost aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The first child aggregator of the root pointed by this path
* Resolves the {@linkplain Aggregator} pointed to by the first element
* of this path against the given root {@linkplain Aggregator}.
*/
public Aggregator resolveTopmostAggregator(Aggregator root) {
AggregationPath.PathElement token = pathElements.get(0);
// TODO both unwrap and subAggregator are only used here!
Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator )
assert (aggregator instanceof SingleBucketAggregator)
|| (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate";
return aggregator;
}
Expand All @@ -239,76 +230,10 @@ public Aggregator resolveTopmostAggregator(Aggregator root) {
* @throws AggregationExecutionException on validation error
*/
public void validate(Aggregator root) throws AggregationExecutionException {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
String name = pathElements.get(i).name;
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(name));
if (aggregator == null) {
throw new AggregationExecutionException("Invalid aggregator order path [" + this + "]. The " +
"provided aggregation [" + name + "] either does not exist, or is a pipeline aggregation " +
"and cannot be used to sort the buckets.");
}

if (i < pathElements.size() - 1) {

// we're in the middle of the path, so the aggregator can only be a single-bucket aggregator

if (!(aggregator instanceof SingleBucketAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}

if (pathElements.get(i).key != null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a " +
"final single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
}
}
boolean singleBucket = aggregator instanceof SingleBucketAggregator;
if (!singleBucket && !(aggregator instanceof NumericMetricsAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}

AggregationPath.PathElement lastToken = lastPathElement();

if (singleBucket) {
if (lastToken.key != null && !"doc_count".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"doc_count\" (a la \"" + lastToken.name +
".doc_count\")");
}
return; // perfectly valid to sort on single-bucket aggregation (will be sored on its doc_count)
}

if (aggregator instanceof NumericMetricsAggregator.SingleValue) {
if (lastToken.key != null && !"value".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"value\" (a la \"" + lastToken.name +
".value\")");
}
return; // perfectly valid to sort on single metric aggregation (will be sorted on its associated value)
}

// the aggregator must be of a multi-value metrics type
if (lastToken.key == null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. When ordering on a multi-value metrics aggregation a metric name must be specified");
}

if (!((NumericMetricsAggregator.MultiValue) aggregator).hasMetric(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Unknown metric name [" + lastToken.key + "] on multi-value metrics aggregation [" + lastToken.name + "]");
try {
resolveAggregator(root).validateSortPathKey(lastPathElement().key);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Timer;

import java.io.IOException;
import java.util.Iterator;

public class ProfilingAggregator extends Aggregator {

Expand Down Expand Up @@ -70,6 +72,16 @@ public Aggregator subAggregator(String name) {
return delegate.subAggregator(name);
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return delegate.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
delegate.validateSortPathKey(key);
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_AGGREGATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ public void testOrderByPipelineAggregation() throws Exception {

AggregationExecutionException e = expectThrows(AggregationExecutionException.class,
() -> createAggregator(termsAgg, indexSearcher, fieldType));
assertEquals("Invalid aggregator order path [script]. The provided aggregation [script] " +
assertEquals("Invalid aggregation order path [script]. The provided aggregation [script] " +
"either does not exist, or is a pipeline aggregation and cannot be used to sort the buckets.",
e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
Expand Down

0 comments on commit d26d772

Please sign in to comment.