Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move pipeline agg validation to coordinating node #53669

Merged
merged 5 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public ActionRequestValidationException validate() {
addValidationError("[request_cache] cannot be used in a scroll context", validationException);
}
}
if (source != null) {
if (source.aggregations() != null) {
validationException = source.aggregations().validate(validationException);
}
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -283,8 +284,6 @@ public boolean mustVisitAllDocs() {
return false;
}



public Builder addAggregator(AggregationBuilder factory) {
if (!names.add(factory.name)) {
throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]");
Expand All @@ -298,23 +297,59 @@ public Builder addPipelineAggregator(PipelineAggregationBuilder pipelineAggregat
return this;
}

/**
* Validate the root of the aggregation tree.
*/
public ActionRequestValidationException validate(ActionRequestValidationException e) {
PipelineAggregationBuilder.ValidationContext context =
PipelineAggregationBuilder.ValidationContext.forTreeRoot(aggregationBuilders, pipelineAggregatorBuilders, e);
validatePipelines(context);
return validateChildren(context.getValidationException());
}

/**
* Validate a the pipeline aggregations in this factory.
nik9000 marked this conversation as resolved.
Show resolved Hide resolved
*/
private void validatePipelines(PipelineAggregationBuilder.ValidationContext context) {
List<PipelineAggregationBuilder> orderedPipelineAggregators;
try {
orderedPipelineAggregators = resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
} catch (IllegalArgumentException iae) {
context.addValidationError(iae.getMessage());
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the validations to keep running down the tree, so we can tell the user all the problems at once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tempted but I think the tree is pretty borked at this point and you'll end up with duplicate error messages all about the same thing. And I figured we were just returning a single error message right now so it probably isn't worse than it was before and we could do it later if we wanted it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me 👍

}
for (PipelineAggregationBuilder builder : orderedPipelineAggregators) {
builder.validate(context);
}
}

/**
* Validate a the children of this factory.
*/
private ActionRequestValidationException validateChildren(ActionRequestValidationException e) {
for (AggregationBuilder agg : aggregationBuilders) {
PipelineAggregationBuilder.ValidationContext context =
PipelineAggregationBuilder.ValidationContext.forInsideTree(agg, e);
agg.factoriesBuilder.validatePipelines(context);
e = agg.factoriesBuilder.validateChildren(context.getValidationException());
}
return e;
}

public AggregatorFactories build(QueryShardContext queryShardContext, AggregatorFactory parent) throws IOException {
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
return EMPTY;
}
List<PipelineAggregationBuilder> orderedpipelineAggregators = null;
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorBuilders, this.aggregationBuilders);
for (PipelineAggregationBuilder builder : orderedpipelineAggregators) {
builder.validate(parent, aggregationBuilders, pipelineAggregatorBuilders);
}
List<PipelineAggregationBuilder> orderedPipelineAggregators =
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];

int i = 0;
for (AggregationBuilder agg : aggregationBuilders) {
aggFactories[i] = agg.build(queryShardContext, parent);
++i;
}
return new AggregatorFactories(aggFactories, orderedpipelineAggregators);
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
}

private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;

/**
* A factory that knows how to create an {@link PipelineAggregator} of a
Expand Down Expand Up @@ -64,11 +70,145 @@ public final String[] getBucketsPaths() {
}

/**
* Internal: Validates the state of this factory (makes sure the factory is properly
* configured)
* Makes sure this builder is properly configured.
*/
protected abstract void validate(AggregatorFactory parent, Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregatorBuilders);
protected abstract void validate(ValidationContext context);
public abstract static class ValidationContext {
/**
* Build the context for the root of the aggregation tree.
*/
public static ValidationContext forTreeRoot(Collection<AggregationBuilder> siblingAggregations,
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
ActionRequestValidationException validationFailuresSoFar) {
return new ForTreeRoot(siblingAggregations, siblingPipelineAggregations, validationFailuresSoFar);
}

/**
* Build the context for a node inside the aggregation tree.
*/
public static ValidationContext forInsideTree(AggregationBuilder parent,
ActionRequestValidationException validationFailuresSoFar) {
return new ForInsideTree(parent, validationFailuresSoFar);
}


private ActionRequestValidationException e;

private ValidationContext(ActionRequestValidationException validationFailuresSoFar) {
this.e = validationFailuresSoFar;
}

private static class ForTreeRoot extends ValidationContext {
private final Collection<AggregationBuilder> siblingAggregations;
private final Collection<PipelineAggregationBuilder> siblingPipelineAggregations;

ForTreeRoot(Collection<AggregationBuilder> siblingAggregations,
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
ActionRequestValidationException validationFailuresSoFar) {
super(validationFailuresSoFar);
this.siblingAggregations = Objects.requireNonNull(siblingAggregations);
this.siblingPipelineAggregations = Objects.requireNonNull(siblingPipelineAggregations);
}

@Override
public Collection<AggregationBuilder> getSiblingAggregations() {
return siblingAggregations;
}

@Override
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
return siblingPipelineAggregations;
}

@Override
public void validateParentAggSequentiallyOrdered(String type, String name) {
addValidationError(type + " aggregation [" + name
+ "] must have a histogram, date_histogram or auto_date_histogram as parent but doesn't have a parent");
}
}

private static class ForInsideTree extends ValidationContext {
private final AggregationBuilder parent;

ForInsideTree(AggregationBuilder parent, ActionRequestValidationException validationFailuresSoFar) {
super(validationFailuresSoFar);
this.parent = Objects.requireNonNull(parent);
}

@Override
public Collection<AggregationBuilder> getSiblingAggregations() {
return parent.getSubAggregations();
}

@Override
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
return parent.getPipelineAggregations();
}

@Override
public void validateParentAggSequentiallyOrdered(String type, String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yes, this thing. Would be nice someday if we could get rid of these instanceofs with some kind of isSequential() method on the agg.

Battle for another day :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+++++++++++++

if (parent instanceof HistogramAggregationBuilder) {
HistogramAggregationBuilder histoParent = (HistogramAggregationBuilder) parent;
if (histoParent.minDocCount() != 0) {
addValidationError(
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof DateHistogramAggregationBuilder) {
DateHistogramAggregationBuilder histoParent = (DateHistogramAggregationBuilder) parent;
if (histoParent.minDocCount() != 0) {
addValidationError(
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof AutoDateHistogramAggregationBuilder) {
// Nothing to check
} else {
addValidationError(
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
}
}
}

/**
* Aggregations that are siblings to the aggregation being validated.
*/
public abstract Collection<AggregationBuilder> getSiblingAggregations();

/**
* Pipeline aggregations that are siblings to the aggregation being validated.
*/
public abstract Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations();

/**
* Add a validation error to this context. All validation errors
* are accumulated in a list and, if there are any, the request
* is not executed and the entire list is returned as the error
* response.
*/
public void addValidationError(String error) {
e = ValidateActions.addValidationError(error, e);
}

/**
* Add a validation error about the {@code buckets_path}.
*/
public void addBucketPathValidationError(String error) {
addValidationError(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + ' ' + error);
}

/**
* Validates that the parent is sequentially ordered.
*/
public abstract void validateParentAggSequentiallyOrdered(String type, String name);

/**
* The validation exception, if there is one. It'll be {@code null}
* if the context wasn't provided with any exception on creation
* and none were added.
*/
public ActionRequestValidationException getValidationException() {
return e;
}
}

/**
* Creates the pipeline aggregator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -79,16 +73,6 @@ public String type() {
return type;
}

/**
* Validates the state of this factory (makes sure the factory is properly
* configured)
*/
@Override
public final void validate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
doValidate(parent, factories, pipelineAggregatorFactories);
}

protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);

/**
Expand All @@ -102,32 +86,6 @@ public final PipelineAggregator create() {
return aggregator;
}

public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
}

/**
* Validates pipeline aggregations that need sequentially ordered data.
*/
public static void validateSequentiallyOrderedParentAggs(AggregatorFactory parent, String type, String name) {
if ((parent instanceof HistogramAggregatorFactory || parent instanceof DateHistogramAggregatorFactory
|| parent instanceof AutoDateHistogramAggregatorFactory) == false) {
throw new IllegalStateException(
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
}
if (parent instanceof HistogramAggregatorFactory) {
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
} else if (parent instanceof DateHistogramAggregatorFactory) {
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
}
}
}

@SuppressWarnings("unchecked")
@Override
public PAB setMetaData(Map<String, Object> metaData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -107,28 +104,27 @@ public GapPolicy gapPolicy() {
protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);

@Override
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]");
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
return;
}
// Need to find the first agg name in the buckets path to check its a
// multi bucket agg: aggs are split with '>' and can optionally have a
// metric name after them by using '.' so need to split on both to get
// just the agg name
final String firstAgg = bucketsPaths[0].split("[>\\.]")[0];
Optional<AggregationBuilder> aggBuilder = aggBuilders.stream().filter((builder) -> builder.getName().equals(firstAgg))
Optional<AggregationBuilder> aggBuilder = context.getSiblingAggregations().stream()
.filter(builder -> builder.getName().equals(firstAgg))
.findAny();
if (aggBuilder.isPresent()) {
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
throw new IllegalArgumentException("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
}
} else {
throw new IllegalArgumentException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
if (aggBuilder.isEmpty()) {
context.addBucketPathValidationError("aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
return;
}
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
context.addValidationError("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
}
}

Expand Down
Loading