Skip to content

Commit

Permalink
Move pipeline agg validation to coordinating node (backport of #53669) (
Browse files Browse the repository at this point in the history
#54019)

This moves the pipeline aggregation validation from the data node to the
coordinating node so that we, eventually, can stop sending pipeline
aggregations to the data nodes entirely. In fact, it moves it into the
"request validation" stage so multiple errors can be accumulated and
sent back to the requester for the entire request. We can't always take
advantage of that, but it'll be nice for folks not to have to play
whack-a-mole with validation.

This is implemented by replacing `PipelineAggretionBuilder#validate`
with:
```
protected abstract void validate(ValidationContext context);
```

The `ValidationContext` handles the accumulation of validation failures,
provides access to the aggregation's siblings, and implements a few
validation utility methods.
  • Loading branch information
nik9000 committed Mar 23, 2020
1 parent 43199a8 commit b9bfba2
Show file tree
Hide file tree
Showing 39 changed files with 608 additions and 602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ public ActionRequestValidationException validate() {
addValidationError("[request_cache] cannot be used in a scroll context", validationException);
}
}
if (source != null) {
if (source.aggregations() != null) {
validationException = source.aggregations().validate(validationException);
}
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -283,8 +284,6 @@ public boolean mustVisitAllDocs() {
return false;
}



public Builder addAggregator(AggregationBuilder factory) {
if (!names.add(factory.name)) {
throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]");
Expand All @@ -298,23 +297,59 @@ public Builder addPipelineAggregator(PipelineAggregationBuilder pipelineAggregat
return this;
}

/**
* Validate the root of the aggregation tree.
*/
public ActionRequestValidationException validate(ActionRequestValidationException e) {
PipelineAggregationBuilder.ValidationContext context =
PipelineAggregationBuilder.ValidationContext.forTreeRoot(aggregationBuilders, pipelineAggregatorBuilders, e);
validatePipelines(context);
return validateChildren(context.getValidationException());
}

/**
* Validate a the pipeline aggregations in this factory.
*/
private void validatePipelines(PipelineAggregationBuilder.ValidationContext context) {
List<PipelineAggregationBuilder> orderedPipelineAggregators;
try {
orderedPipelineAggregators = resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
} catch (IllegalArgumentException iae) {
context.addValidationError(iae.getMessage());
return;
}
for (PipelineAggregationBuilder builder : orderedPipelineAggregators) {
builder.validate(context);
}
}

/**
* Validate a the children of this factory.
*/
private ActionRequestValidationException validateChildren(ActionRequestValidationException e) {
for (AggregationBuilder agg : aggregationBuilders) {
PipelineAggregationBuilder.ValidationContext context =
PipelineAggregationBuilder.ValidationContext.forInsideTree(agg, e);
agg.factoriesBuilder.validatePipelines(context);
e = agg.factoriesBuilder.validateChildren(context.getValidationException());
}
return e;
}

public AggregatorFactories build(QueryShardContext queryShardContext, AggregatorFactory parent) throws IOException {
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
return EMPTY;
}
List<PipelineAggregationBuilder> orderedpipelineAggregators = null;
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorBuilders, this.aggregationBuilders);
for (PipelineAggregationBuilder builder : orderedpipelineAggregators) {
builder.validate(parent, aggregationBuilders, pipelineAggregatorBuilders);
}
List<PipelineAggregationBuilder> orderedPipelineAggregators =
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];

int i = 0;
for (AggregationBuilder agg : aggregationBuilders) {
aggFactories[i] = agg.build(queryShardContext, parent);
++i;
}
return new AggregatorFactories(aggFactories, orderedpipelineAggregators);
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
}

private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;

/**
* A factory that knows how to create an {@link PipelineAggregator} of a
Expand Down Expand Up @@ -64,11 +70,145 @@ public final String[] getBucketsPaths() {
}

/**
* Internal: Validates the state of this factory (makes sure the factory is properly
* configured)
* Makes sure this builder is properly configured.
*/
protected abstract void validate(AggregatorFactory parent, Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregatorBuilders);
protected abstract void validate(ValidationContext context);
public abstract static class ValidationContext {
/**
* Build the context for the root of the aggregation tree.
*/
public static ValidationContext forTreeRoot(Collection<AggregationBuilder> siblingAggregations,
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
ActionRequestValidationException validationFailuresSoFar) {
return new ForTreeRoot(siblingAggregations, siblingPipelineAggregations, validationFailuresSoFar);
}

/**
* Build the context for a node inside the aggregation tree.
*/
public static ValidationContext forInsideTree(AggregationBuilder parent,
ActionRequestValidationException validationFailuresSoFar) {
return new ForInsideTree(parent, validationFailuresSoFar);
}


private ActionRequestValidationException e;

private ValidationContext(ActionRequestValidationException validationFailuresSoFar) {
this.e = validationFailuresSoFar;
}

private static class ForTreeRoot extends ValidationContext {
private final Collection<AggregationBuilder> siblingAggregations;
private final Collection<PipelineAggregationBuilder> siblingPipelineAggregations;

ForTreeRoot(Collection<AggregationBuilder> siblingAggregations,
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
ActionRequestValidationException validationFailuresSoFar) {
super(validationFailuresSoFar);
this.siblingAggregations = Objects.requireNonNull(siblingAggregations);
this.siblingPipelineAggregations = Objects.requireNonNull(siblingPipelineAggregations);
}

@Override
public Collection<AggregationBuilder> getSiblingAggregations() {
return siblingAggregations;
}

@Override
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
return siblingPipelineAggregations;
}

@Override
public void validateParentAggSequentiallyOrdered(String type, String name) {
addValidationError(type + " aggregation [" + name
+ "] must have a histogram, date_histogram or auto_date_histogram as parent but doesn't have a parent");
}
}

private static class ForInsideTree extends ValidationContext {
private final AggregationBuilder parent;

ForInsideTree(AggregationBuilder parent, ActionRequestValidationException validationFailuresSoFar) {
super(validationFailuresSoFar);
this.parent = Objects.requireNonNull(parent);
}

@Override
public Collection<AggregationBuilder> getSiblingAggregations() {
return parent.getSubAggregations();
}

@Override
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
return parent.getPipelineAggregations();
}

@Override
public void validateParentAggSequentiallyOrdered(String type, String name) {
if (parent instanceof HistogramAggregationBuilder) {
HistogramAggregationBuilder histoParent = (HistogramAggregationBuilder) parent;
if (histoParent.minDocCount() != 0) {
addValidationError(
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof DateHistogramAggregationBuilder) {
DateHistogramAggregationBuilder histoParent = (DateHistogramAggregationBuilder) parent;
if (histoParent.minDocCount() != 0) {
addValidationError(
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof AutoDateHistogramAggregationBuilder) {
// Nothing to check
} else {
addValidationError(
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
}
}
}

/**
* Aggregations that are siblings to the aggregation being validated.
*/
public abstract Collection<AggregationBuilder> getSiblingAggregations();

/**
* Pipeline aggregations that are siblings to the aggregation being validated.
*/
public abstract Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations();

/**
* Add a validation error to this context. All validation errors
* are accumulated in a list and, if there are any, the request
* is not executed and the entire list is returned as the error
* response.
*/
public void addValidationError(String error) {
e = ValidateActions.addValidationError(error, e);
}

/**
* Add a validation error about the {@code buckets_path}.
*/
public void addBucketPathValidationError(String error) {
addValidationError(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + ' ' + error);
}

/**
* Validates that the parent is sequentially ordered.
*/
public abstract void validateParentAggSequentiallyOrdered(String type, String name);

/**
* The validation exception, if there is one. It'll be {@code null}
* if the context wasn't provided with any exception on creation
* and none were added.
*/
public ActionRequestValidationException getValidationException() {
return e;
}
}

/**
* Creates the pipeline aggregator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -79,16 +73,6 @@ public String type() {
return type;
}

/**
* Validates the state of this factory (makes sure the factory is properly
* configured)
*/
@Override
public final void validate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
doValidate(parent, factories, pipelineAggregatorFactories);
}

protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);

/**
Expand All @@ -102,32 +86,6 @@ public final PipelineAggregator create() {
return aggregator;
}

public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
}

/**
* Validates pipeline aggregations that need sequentially ordered data.
*/
public static void validateSequentiallyOrderedParentAggs(AggregatorFactory parent, String type, String name) {
if ((parent instanceof HistogramAggregatorFactory || parent instanceof DateHistogramAggregatorFactory
|| parent instanceof AutoDateHistogramAggregatorFactory) == false) {
throw new IllegalStateException(
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
}
if (parent instanceof HistogramAggregatorFactory) {
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof DateHistogramAggregatorFactory) {
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
}
}

@SuppressWarnings("unchecked")
@Override
public PAB setMetaData(Map<String, Object> metaData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -107,28 +104,27 @@ public GapPolicy gapPolicy() {
protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);

@Override
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]");
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
return;
}
// Need to find the first agg name in the buckets path to check its a
// multi bucket agg: aggs are split with '>' and can optionally have a
// metric name after them by using '.' so need to split on both to get
// just the agg name
final String firstAgg = bucketsPaths[0].split("[>\\.]")[0];
Optional<AggregationBuilder> aggBuilder = aggBuilders.stream().filter((builder) -> builder.getName().equals(firstAgg))
Optional<AggregationBuilder> aggBuilder = context.getSiblingAggregations().stream()
.filter(builder -> builder.getName().equals(firstAgg))
.findAny();
if (aggBuilder.isPresent()) {
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
throw new IllegalArgumentException("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
}
} else {
throw new IllegalArgumentException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
if (false == aggBuilder.isPresent()) {
context.addBucketPathValidationError("aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
return;
}
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
context.addValidationError("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
}
}

Expand Down
Loading

0 comments on commit b9bfba2

Please sign in to comment.