From 5dcb2bfa527f7ef084e3cd3ad02f08198c247261 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 9 Sep 2025 16:39:43 -0700 Subject: [PATCH 1/3] Validate time-series aggregation --- .../src/main/resources/tsdb-mapping.json | 4 + .../xpack/esql/plan/logical/Aggregate.java | 34 ++--- .../xpack/esql/plan/logical/InlineStats.java | 28 +++- .../plan/logical/TimeSeriesAggregate.java | 134 ++++++++++++++++++ .../xpack/esql/analysis/VerifierTests.java | 83 +++++++++-- 5 files changed, 245 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json index 39b1b10edd916..74c8a7ad5e6d8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json @@ -10,6 +10,10 @@ "name": { "type": "keyword" }, + "host": { + "type": "keyword", + "time_series_dimension": true + }, "network": { "properties": { "connections": { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 5e265c30c57ba..1063fe63d48ba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; @@ -29,7 +28,6 @@ import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression; -import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate; import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction; import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; @@ -229,18 +227,18 @@ public void postAnalysisVerification(Failures failures) { // traverse the tree to find invalid matches checkInvalidNamedExpressionUsage(exp, groupings, groupRefs, failures, 0); }); - if (anyMatch(l -> l instanceof EsRelation relation && relation.indexMode() == IndexMode.TIME_SERIES)) { - aggregates.forEach(a -> checkRateAggregates(a, 0, failures)); - } else { - forEachExpression( - TimeSeriesAggregateFunction.class, - r -> failures.add(fail(r, "time_series aggregate[{}] can only be used with the TS command", r.sourceText())) - ); - } + checkTimeSeriesAggregates(failures); checkCategorizeGrouping(failures); checkMultipleScoreAggregations(failures); } + protected void checkTimeSeriesAggregates(Failures failures) { + forEachExpression( + TimeSeriesAggregateFunction.class, + r -> failures.add(fail(r, "time_series aggregate[{}] can only be used with the TS command", r.sourceText())) + ); + } + private void checkMultipleScoreAggregations(Failures failures) { Holder hasScoringAggs = new Holder<>(); forEachExpression(FilteredExpression.class, fe -> { @@ -339,22 +337,6 @@ private void checkCategorizeGrouping(Failures failures) { }))); } - private static void checkRateAggregates(Expression expr, int nestedLevel, Failures failures) { - if (expr instanceof AggregateFunction) { - nestedLevel++; - } - if (expr instanceof Rate r) { - if (nestedLevel != 2) { - failures.add( - fail(expr, "the rate aggregate [{}] can only be used with the TS command and inside another aggregate", r.sourceText()) - ); - } - } - for (Expression child : expr.children()) { - checkRateAggregates(child, nestedLevel, failures); - } - } - // traverse the expression and look either for an agg function or a grouping match // stop either when no children are left, the leafs are literals or a reference attribute is given private static void checkInvalidNamedExpressionUsage( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index be34631ec8149..50a916132f9a9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -11,12 +11,16 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; +import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; @@ -29,6 +33,7 @@ import java.util.Objects; import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; /** @@ -38,7 +43,13 @@ * underlying aggregate. *

*/ -public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateLogicalPlan, TelemetryAware, SortAgnostic { +public class InlineStats extends UnaryPlan + implements + NamedWriteable, + SurrogateLogicalPlan, + TelemetryAware, + SortAgnostic, + PostAnalysisVerificationAware { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "InlineStats", @@ -147,4 +158,19 @@ public boolean equals(Object obj) { InlineStats other = (InlineStats) obj; return Objects.equals(aggregate, other.aggregate); } + + @Override + public void postAnalysisVerification(Failures failures) { + Holder seenAggregations = new Holder<>(Boolean.FALSE); + child().forEachDown(c -> { + if (c instanceof TimeSeriesAggregate) { + seenAggregations.set(Boolean.TRUE); + } + if (c instanceof EsRelation r && r.indexMode() == IndexMode.TIME_SERIES) { + if (seenAggregations.get() == false) { + failures.add(fail(this, "InlineStats [{}] in time-series is only allowed after an aggregation", this.sourceText())); + } + } + }); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index 7e6a3f5caec25..3ca580011ec89 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -10,15 +10,28 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; +import org.elasticsearch.xpack.esql.common.Failures; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; +import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; +import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.esql.common.Failure.fail; /** * An extension of {@link Aggregate} to perform time-series aggregation per time-series, such as rate or _over_time. @@ -106,4 +119,125 @@ public boolean equals(Object obj) { && Objects.equals(child(), other.child()) && Objects.equals(timeBucket, other.timeBucket); } + + @Override + public void postAnalysisVerification(Failures failures) { + super.postAnalysisVerification(failures); + child().forEachDown(p -> { + // reject `TS metrics | SORT BY ... | STATS ...` + if (p instanceof OrderBy orderBy) { + failures.add( + fail( + orderBy, + "sorting [{}] between the time-series source and the first aggregation [{}] is not allowed", + orderBy.sourceText(), + this.sourceText() + ) + ); + } + // reject `TS metrics | LIMIT ... | STATS ...` + if (p instanceof Limit limit) { + failures.add( + fail( + limit, + "limiting [{}] the time-series source before the first aggregation [{}] is not allowed; " + + "filter data with a WHERE command instead", + limit.sourceText(), + this.sourceText() + ) + ); + } + }); + } + + @Override + protected void checkTimeSeriesAggregates(Failures failures) { + List overTimeFunctions = new ArrayList<>(); + for (NamedExpression aggregate : aggregates) { + if (aggregate instanceof Alias alias && Alias.unwrap(alias) instanceof AggregateFunction outer) { + if (outer instanceof TimeSeriesAggregateFunction ts) { + outer.field() + .forEachDown( + AggregateFunction.class, + nested -> failures.add( + fail( + this, + "cannot use aggregate function [{}] inside over-time aggregation function [{}]", + nested.sourceText(), + outer.sourceText() + ) + ) + ); + // reject `TS metrics | STATS rate(requests)` + // TODO: support this + failures.add( + fail( + ts, + "over-time aggregate function [{}] can only be used with the TS command and inside another aggregate function", + ts.sourceText() + ) + ); + overTimeFunctions.add(ts); + } else { + outer.field().forEachDown(AggregateFunction.class, nested -> { + if (nested instanceof TimeSeriesAggregateFunction == false) { + fail( + this, + "cannot use aggregate function [{}] inside aggregation function [{}];" + + "only over-time aggregation function can be used inside another aggregation function", + nested.sourceText(), + outer.sourceText() + ); + } + nested.field() + .forEachDown( + AggregateFunction.class, + nested2 -> failures.add( + fail( + this, + "cannot use aggregate function [{}] inside over-time aggregation function [{}]", + nested.sourceText(), + nested2.sourceText() + ) + ) + ); + }); + } + } + } + // reject `TS metrics | STATS rate(requests) BY host, TBUCKET(1 hour)` + if (overTimeFunctions.isEmpty() == false) { + Holder timestamp = new Holder<>(); + forEachDown(EsRelation.class, r -> { + for (Attribute attr : r.output()) { + if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { + timestamp.set(attr); + } + } + }); + List nonTimeBucketGroupings = new ArrayList<>(); + for (Expression g : groupings) { + boolean timeBucket = g.anyMatch( + c -> (c instanceof Bucket b && b.field().equals(timestamp.get()) + || (c instanceof TBucket tb && tb.field().equals(timestamp.get()))) + ); + if (timeBucket == false) { + nonTimeBucketGroupings.add(g); + } + } + if (nonTimeBucketGroupings.isEmpty() == false) { + for (TimeSeriesAggregateFunction af : overTimeFunctions) { + failures.add( + fail( + this, + "cannot use over-time aggregate function [{}] with groupings [{}] other than the time bucket; " + + "drop the groupings or provide an outer aggregation", + af.sourceText(), + nonTimeBucketGroupings.stream().map(Node::sourceText).collect(Collectors.joining(", ")) + ) + ); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 3b884921d05be..c7977b01fba4e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1192,26 +1192,51 @@ public void testNotAllowRateOutsideMetrics() { ); } - public void testRateNotEnclosedInAggregate() { + public void testOverTimeAggregate() { assumeTrue("requires metric command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); assertThat( error("TS tests | STATS rate(network.bytes_in)", tsdb), - equalTo("1:18: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command and inside another aggregate") + equalTo( + "1:18: over-time aggregate function [rate(network.bytes_in)] " + + "can only be used with the TS command and inside another aggregate function" + ) + ); + assertThat( + error("TS tests | STATS avg_over_time(network.connections)", tsdb), + equalTo( + "1:18: over-time aggregate function [avg_over_time(network.connections)] " + + "can only be used with the TS command and inside another aggregate function" + ) ); assertThat( error("TS tests | STATS avg(rate(network.bytes_in)), rate(network.bytes_in)", tsdb), - equalTo("1:47: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command and inside another aggregate") + equalTo( + "1:47: over-time aggregate function [rate(network.bytes_in)] " + + "can only be used with the TS command and inside another aggregate function" + ) ); + assertThat(error("TS tests | STATS max(avg(rate(network.bytes_in)))", tsdb), equalTo(""" - 1:22: nested aggregations [avg(rate(network.bytes_in))] not allowed inside other aggregations\ - [max(avg(rate(network.bytes_in)))] - line 1:26: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command\ - and inside another aggregate""")); + 1:22: nested aggregations [avg(rate(network.bytes_in))] \ + not allowed inside other aggregations [max(avg(rate(network.bytes_in)))] + line 1:12: cannot use aggregate function [avg(rate(network.bytes_in))] \ + inside over-time aggregation function [rate(network.bytes_in)]""")); + assertThat(error("TS tests | STATS max(avg(rate(network.bytes_in)))", tsdb), equalTo(""" - 1:22: nested aggregations [avg(rate(network.bytes_in))] not allowed inside other aggregations\ - [max(avg(rate(network.bytes_in)))] - line 1:26: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command\ - and inside another aggregate""")); + 1:22: nested aggregations [avg(rate(network.bytes_in))] \ + not allowed inside other aggregations [max(avg(rate(network.bytes_in)))] + line 1:12: cannot use aggregate function [avg(rate(network.bytes_in))] \ + inside over-time aggregation function [rate(network.bytes_in)]""")); + + assertThat(error("TS tests | STATS rate(network.bytes_in) BY host", tsdb), equalTo(""" + 1:18: over-time aggregate function [rate(network.bytes_in)] \ + can only be used with the TS command and inside another aggregate function + line 1:12: cannot use over-time aggregate function [rate(network.bytes_in)] \ + with groupings [host] other than the time bucket; drop the groupings or provide an outer aggregation""")); + + assertThat(error("TS tests | STATS rate(network.bytes_in) BY bucket(@timestamp, 1 hour)", tsdb), equalTo(""" + 1:18: over-time aggregate function [rate(network.bytes_in)] \ + can only be used with the TS command and inside another aggregate function""")); } public void testWeightedAvg() { @@ -2504,6 +2529,42 @@ public void testInvalidTBucketCalls() { } } + public void testSortInTimeSeries() { + assumeTrue("requires TS", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + assertThat( + error("TS test | SORT host | STATS avg(last_over_time(network.connections))", tsdb), + equalTo( + "1:11: sorting [SORT host] between the time-series source " + + "and the first aggregation [STATS avg(last_over_time(network.connections))] is not allowed" + ) + ); + assertThat( + error("TS test | LIMIT 10 | STATS avg(network.connections)", tsdb), + equalTo( + "1:11: limiting [LIMIT 10] the time-series source before the first aggregation " + + "[STATS avg(network.connections)] is not allowed; filter data with a WHERE command instead" + ) + ); + assertThat(error("TS test | SORT host | LIMIT 10 | STATS avg(network.connections)", tsdb), equalTo(""" + 1:23: limiting [LIMIT 10] the time-series source \ + before the first aggregation [STATS avg(network.connections)] is not allowed; filter data with a WHERE command instead + line 1:11: sorting [SORT host] between the time-series source \ + and the first aggregation [STATS avg(network.connections)] is not allowed""")); + } + + public void testInlineStatsInTimeSeries() { + assumeTrue("requires TS", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + assumeTrue("requires inline stats", EsqlCapabilities.Cap.INLINESTATS_V11.isEnabled()); + assertThat( + error("TS test | INLINESTATS avg(network.connections)", tsdb), + equalTo("1:11: InlineStats [INLINESTATS avg(network.connections)] in time-series is only allowed after an aggregation") + ); + assertThat( + error("TS test | INLINESTATS v = avg(network.connections) | STATS max(v)", tsdb), + equalTo("1:11: InlineStats [INLINESTATS v = avg(network.connections)] in time-series is only allowed after an aggregation") + ); + } + private void checkVectorFunctionsNullArgs(String functionInvocation) throws Exception { query("from test | eval similarity = " + functionInvocation, fullTextAnalyzer); } From 927722da5c630d33f85e38cd3974b33f432dfdce Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 17 Sep 2025 21:55:55 -0700 Subject: [PATCH 2/3] More fix --- .../xpack/esql/plan/logical/InlineStats.java | 30 +------ .../plan/logical/TimeSeriesAggregate.java | 90 +++++++++---------- .../xpack/esql/analysis/VerifierTests.java | 50 ++++------- 3 files changed, 64 insertions(+), 106 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 50a916132f9a9..889c544599185 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -11,16 +11,12 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; -import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; @@ -33,7 +29,6 @@ import java.util.Objects; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; /** @@ -43,13 +38,7 @@ * underlying aggregate. *

*/ -public class InlineStats extends UnaryPlan - implements - NamedWriteable, - SurrogateLogicalPlan, - TelemetryAware, - SortAgnostic, - PostAnalysisVerificationAware { +public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateLogicalPlan, TelemetryAware, SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "InlineStats", @@ -129,7 +118,7 @@ private JoinConfig joinConfig() { } } } - return new JoinConfig(JoinTypes.LEFT, namedGroupings, leftFields, rightFields); + return new JoinConfig(JoinTypes.LEFT, leftFields, rightFields, null); } @Override @@ -158,19 +147,4 @@ public boolean equals(Object obj) { InlineStats other = (InlineStats) obj; return Objects.equals(aggregate, other.aggregate); } - - @Override - public void postAnalysisVerification(Failures failures) { - Holder seenAggregations = new Holder<>(Boolean.FALSE); - child().forEachDown(c -> { - if (c instanceof TimeSeriesAggregate) { - seenAggregations.set(Boolean.TRUE); - } - if (c instanceof EsRelation r && r.indexMode() == IndexMode.TIME_SERIES) { - if (seenAggregations.get() == false) { - failures.add(fail(this, "InlineStats [{}] in time-series is only allowed after an aggregation", this.sourceText())); - } - } - }); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index 3ca580011ec89..9026129ff084f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -12,24 +12,19 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Alias; -import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; -import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket; +import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.common.Failure.fail; @@ -147,14 +142,52 @@ public void postAnalysisVerification(Failures failures) { ) ); } + // reject LOOKUP_JOIN + if (p instanceof LookupJoin lookupJoin) { + failures.add( + fail( + lookupJoin, + "lookup join [{}] in the time-series source before the first aggregation [{}] is not allowed", + lookupJoin.sourceText(), + this.sourceText() + ) + ); + } + // reject ENRICH + if (p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { + failures.add( + fail( + enrich, + "coordinator enrich [{}] in the time-series source before the first aggregation [{}] is not allowed", + enrich.sourceText(), + this.sourceText() + ) + ); + } + // reject CHANGE_POINT + if (p instanceof ChangePoint changePoint) { + failures.add( + fail( + changePoint, + "change_point [{}] in the time-series source before the first aggregation [{}] is not allowed", + changePoint.sourceText(), + this.sourceText() + ) + ); + } }); } @Override protected void checkTimeSeriesAggregates(Failures failures) { - List overTimeFunctions = new ArrayList<>(); for (NamedExpression aggregate : aggregates) { if (aggregate instanceof Alias alias && Alias.unwrap(alias) instanceof AggregateFunction outer) { + if (outer instanceof Count count && count.field().foldable()) { + // COUNT(*) + failures.add( + fail(count, "count_star [{}] can't be used with TS command; use count on a field instead", outer.sourceText()) + ); + } if (outer instanceof TimeSeriesAggregateFunction ts) { outer.field() .forEachDown( @@ -162,7 +195,7 @@ protected void checkTimeSeriesAggregates(Failures failures) { nested -> failures.add( fail( this, - "cannot use aggregate function [{}] inside over-time aggregation function [{}]", + "cannot use aggregate function [{}] inside time-series aggregation function [{}]", nested.sourceText(), outer.sourceText() ) @@ -173,18 +206,17 @@ protected void checkTimeSeriesAggregates(Failures failures) { failures.add( fail( ts, - "over-time aggregate function [{}] can only be used with the TS command and inside another aggregate function", + "time-series aggregate function [{}] can only be used with the TS command and inside another aggregate function", ts.sourceText() ) ); - overTimeFunctions.add(ts); } else { outer.field().forEachDown(AggregateFunction.class, nested -> { if (nested instanceof TimeSeriesAggregateFunction == false) { fail( this, "cannot use aggregate function [{}] inside aggregation function [{}];" - + "only over-time aggregation function can be used inside another aggregation function", + + "only time-series aggregation function can be used inside another aggregation function", nested.sourceText(), outer.sourceText() ); @@ -205,39 +237,5 @@ protected void checkTimeSeriesAggregates(Failures failures) { } } } - // reject `TS metrics | STATS rate(requests) BY host, TBUCKET(1 hour)` - if (overTimeFunctions.isEmpty() == false) { - Holder timestamp = new Holder<>(); - forEachDown(EsRelation.class, r -> { - for (Attribute attr : r.output()) { - if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { - timestamp.set(attr); - } - } - }); - List nonTimeBucketGroupings = new ArrayList<>(); - for (Expression g : groupings) { - boolean timeBucket = g.anyMatch( - c -> (c instanceof Bucket b && b.field().equals(timestamp.get()) - || (c instanceof TBucket tb && tb.field().equals(timestamp.get()))) - ); - if (timeBucket == false) { - nonTimeBucketGroupings.add(g); - } - } - if (nonTimeBucketGroupings.isEmpty() == false) { - for (TimeSeriesAggregateFunction af : overTimeFunctions) { - failures.add( - fail( - this, - "cannot use over-time aggregate function [{}] with groupings [{}] other than the time bucket; " - + "drop the groupings or provide an outer aggregation", - af.sourceText(), - nonTimeBucketGroupings.stream().map(Node::sourceText).collect(Collectors.joining(", ")) - ) - ); - } - } - } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index c7977b01fba4e..b55fe8d7df53e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1173,7 +1173,6 @@ public void testAggsResolutionWithUnresolvedGroupings() { } public void testNotAllowRateOutsideMetrics() { - assumeTrue("requires metric command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); assertThat( error("FROM tests | STATS avg(rate(network.bytes_in))", tsdb), equalTo("1:24: time_series aggregate[rate(network.bytes_in)] can only be used with the TS command") @@ -1192,27 +1191,26 @@ public void testNotAllowRateOutsideMetrics() { ); } - public void testOverTimeAggregate() { - assumeTrue("requires metric command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + public void testTimeseriesAggregate() { assertThat( error("TS tests | STATS rate(network.bytes_in)", tsdb), equalTo( - "1:18: over-time aggregate function [rate(network.bytes_in)] " - + "can only be used with the TS command and inside another aggregate function" + "1:18: time-series aggregate function [rate(network.bytes_in)] can only be used with the TS command " + + "and inside another aggregate function" ) ); assertThat( error("TS tests | STATS avg_over_time(network.connections)", tsdb), equalTo( - "1:18: over-time aggregate function [avg_over_time(network.connections)] " - + "can only be used with the TS command and inside another aggregate function" + "1:18: time-series aggregate function [avg_over_time(network.connections)] can only be used " + + "with the TS command and inside another aggregate function" ) ); assertThat( error("TS tests | STATS avg(rate(network.bytes_in)), rate(network.bytes_in)", tsdb), equalTo( - "1:47: over-time aggregate function [rate(network.bytes_in)] " - + "can only be used with the TS command and inside another aggregate function" + "1:47: time-series aggregate function [rate(network.bytes_in)] can only be used " + + "with the TS command and inside another aggregate function" ) ); @@ -1228,15 +1226,17 @@ public void testOverTimeAggregate() { line 1:12: cannot use aggregate function [avg(rate(network.bytes_in))] \ inside over-time aggregation function [rate(network.bytes_in)]""")); - assertThat(error("TS tests | STATS rate(network.bytes_in) BY host", tsdb), equalTo(""" - 1:18: over-time aggregate function [rate(network.bytes_in)] \ - can only be used with the TS command and inside another aggregate function - line 1:12: cannot use over-time aggregate function [rate(network.bytes_in)] \ - with groupings [host] other than the time bucket; drop the groupings or provide an outer aggregation""")); - - assertThat(error("TS tests | STATS rate(network.bytes_in) BY bucket(@timestamp, 1 hour)", tsdb), equalTo(""" - 1:18: over-time aggregate function [rate(network.bytes_in)] \ - can only be used with the TS command and inside another aggregate function""")); + assertThat( + error("TS tests | STATS rate(network.bytes_in) BY bucket(@timestamp, 1 hour)", tsdb), + equalTo( + "1:18: time-series aggregate function [rate(network.bytes_in)] can only be used " + + "with the TS command and inside another aggregate function" + ) + ); + assertThat( + error("TS tests | STATS COUNT(*)", tsdb), + equalTo("1:18: count_star [COUNT(*)] can't be used with TS command; use count on a field instead") + ); } public void testWeightedAvg() { @@ -2530,7 +2530,6 @@ public void testInvalidTBucketCalls() { } public void testSortInTimeSeries() { - assumeTrue("requires TS", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); assertThat( error("TS test | SORT host | STATS avg(last_over_time(network.connections))", tsdb), equalTo( @@ -2552,19 +2551,6 @@ public void testSortInTimeSeries() { and the first aggregation [STATS avg(network.connections)] is not allowed""")); } - public void testInlineStatsInTimeSeries() { - assumeTrue("requires TS", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); - assumeTrue("requires inline stats", EsqlCapabilities.Cap.INLINESTATS_V11.isEnabled()); - assertThat( - error("TS test | INLINESTATS avg(network.connections)", tsdb), - equalTo("1:11: InlineStats [INLINESTATS avg(network.connections)] in time-series is only allowed after an aggregation") - ); - assertThat( - error("TS test | INLINESTATS v = avg(network.connections) | STATS max(v)", tsdb), - equalTo("1:11: InlineStats [INLINESTATS v = avg(network.connections)] in time-series is only allowed after an aggregation") - ); - } - private void checkVectorFunctionsNullArgs(String functionInvocation) throws Exception { query("from test | eval similarity = " + functionInvocation, fullTextAnalyzer); } From 69b95e47c4d65a8c6903b6f532b08d03d5406360 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 17 Sep 2025 22:09:40 -0700 Subject: [PATCH 3/3] More fix --- .../plan/logical/TimeSeriesAggregate.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index 9026129ff084f..a1ec5b2b171d4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -142,34 +142,34 @@ public void postAnalysisVerification(Failures failures) { ) ); } - // reject LOOKUP_JOIN + // reject `TS metrics | LOOKUP JOIN ... | STATS ...` if (p instanceof LookupJoin lookupJoin) { failures.add( fail( lookupJoin, - "lookup join [{}] in the time-series source before the first aggregation [{}] is not allowed", + "lookup join [{}] in the time-series before the first aggregation [{}] is not allowed", lookupJoin.sourceText(), this.sourceText() ) ); } - // reject ENRICH - if (p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { + // reject `TS metrics | ENRICH ... | STATS ...` + if (p instanceof Enrich enrich) { failures.add( fail( enrich, - "coordinator enrich [{}] in the time-series source before the first aggregation [{}] is not allowed", + "enrich [{}] in the time-series before the first aggregation [{}] is not allowed", enrich.sourceText(), this.sourceText() ) ); } - // reject CHANGE_POINT + // reject `TS metrics | CHANGE POINT ... | STATS ...` if (p instanceof ChangePoint changePoint) { failures.add( fail( changePoint, - "change_point [{}] in the time-series source before the first aggregation [{}] is not allowed", + "change_point [{}] in the time-series the first aggregation [{}] is not allowed", changePoint.sourceText(), this.sourceText() ) @@ -183,7 +183,7 @@ protected void checkTimeSeriesAggregates(Failures failures) { for (NamedExpression aggregate : aggregates) { if (aggregate instanceof Alias alias && Alias.unwrap(alias) instanceof AggregateFunction outer) { if (outer instanceof Count count && count.field().foldable()) { - // COUNT(*) + // reject `TS metrics | STATS COUNT(*)` failures.add( fail(count, "count_star [{}] can't be used with TS command; use count on a field instead", outer.sourceText()) ); @@ -206,7 +206,8 @@ protected void checkTimeSeriesAggregates(Failures failures) { failures.add( fail( ts, - "time-series aggregate function [{}] can only be used with the TS command and inside another aggregate function", + "time-series aggregate function [{}] can only be used with the TS command " + + "and inside another aggregate function", ts.sourceText() ) );