diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 64cdae20e5635..6b172faa5d56c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -720,4 +720,14 @@ mx:integer | tbucket:datetime 1716 | 2024-05-10T00:00:00.000Z ; - +GroupByAll +required_capability: metrics_command +required_capability: metrics_group_by_all + +TS k8s +| STATS count = count_over_time(network.cost) +; + +count:long | _tsid:string +1 | KEgPmt8JTKe7WA6iB8FLYKbvei62ccUzlcoJwx6Ltf34ddg5heNyQIA +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 13b337a668ac1..c7a57f5e87e47 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1033,6 +1033,10 @@ public enum Cap { */ @Deprecated METRICS_COMMAND(Build.current().isSnapshot()), + /** + * Enables automatically grouping by all dimension fields in TS mode queries + */ + METRICS_GROUP_BY_ALL(), /** * Are the {@code documents_found} and {@code values_loaded} fields available diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 9909a7bc1e9ef..55df9bcdc4d9c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -526,20 +526,59 @@ private Aggregate resolveAggregate(Aggregate aggregate, List children Holder changed = new Holder<>(false); List groupings = aggregate.groupings(); List aggregates = aggregate.aggregates(); + + Holder tsidAttribute = new Holder<>(); + LogicalPlan newChild = aggregate.child(); + if (aggregate instanceof TimeSeriesAggregate) { + for (Expression g : groupings) { + if (g instanceof UnresolvedAttribute ua && ua.name().equals(MetadataAttribute.TSID_FIELD)) { + boolean foundInOutput = childrenOutput.stream().anyMatch(attr -> attr.name().equals(MetadataAttribute.TSID_FIELD)); + if (foundInOutput == false) { + MetadataAttribute tsid = MetadataAttribute.create(g.source(), MetadataAttribute.TSID_FIELD); + if (tsid != null) { + tsidAttribute.set(tsid); + newChild = aggregate.child().transformUp(EsRelation.class, r -> { + if (r.indexMode() == IndexMode.TIME_SERIES && r.outputSet().contains(tsid) == false) { + return new EsRelation( + r.source(), + r.indexPattern(), + r.indexMode(), + r.indexNameWithModes(), + CollectionUtils.combine(r.output(), tsid) + ); + } + return r; + }); + childrenOutput.add(tsid); + } + } + break; + } + } + } + // first resolve groupings since the aggs might refer to them // trying to globally resolve unresolved attributes will lead to some being marked as unresolvable - if (Resolvables.resolved(groupings) == false) { + if (Resolvables.resolved(groupings) == false || newChild != aggregate.child()) { List newGroupings = new ArrayList<>(groupings.size()); for (Expression g : groupings) { - Expression resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)); - if (resolved != g) { + Expression resolved; + if (g instanceof UnresolvedAttribute ua + && ua.name().equals(MetadataAttribute.TSID_FIELD) + && tsidAttribute.get() != null) { + resolved = tsidAttribute.get(); changed.set(true); + } else { + resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)); + if (resolved != g) { + changed.set(true); + } } newGroupings.add(resolved); } groupings = newGroupings; - if (changed.get()) { - aggregate = aggregate.with(aggregate.child(), newGroupings, aggregate.aggregates()); + if (changed.get() || newChild != aggregate.child()) { + aggregate = aggregate.with(newChild, newGroupings, aggregate.aggregates()); changed.set(false); } } @@ -1235,7 +1274,7 @@ private LogicalPlan resolveKeep(Project p, List childOutput) { private LogicalPlan resolveDrop(Drop drop, List childOutput) { List resolvedProjections = new ArrayList<>(childOutput); - for (var ne : drop.removals()) { + for (NamedExpression ne : drop.removals()) { List resolved; if (ne instanceof UnresolvedNamePattern np) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java index c35d77204b130..d7c7bd226decd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.aggregate.Values; import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns; @@ -85,7 +86,19 @@ private static void verifyOutputNotChanged(QueryPlan optimizedPlan, List a instanceof TimeSeriesAggregate ts && ts.aggregates().stream().anyMatch(g -> Alias.unwrap(g) instanceof Values v && v.field().dataType() == DataType.TEXT) ); - boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec || hasTextGroupingInTimeSeries; + // TranslateTimeSeriesAggregate creates a Project with an Eval child that contains additional attributes (like _tsid) + // that weren't in the original query. + int optimizedOutputSize = optimizedPlan.output().size(); + int expectedOutputSize = expectedOutputAttributes.size(); + + boolean hasTranslatedTimeSeriesAggregate = optimizedOutputSize > expectedOutputSize + && optimizedOutputSize - expectedOutputSize == 1 + && optimizedPlan.output().stream().anyMatch(a -> a.name().equals(MetadataAttribute.TSID_FIELD)); + + boolean ignoreError = hasProjectAwayColumns + || hasLookupJoinExec + || hasTextGroupingInTimeSeries + || hasTranslatedTimeSeriesAggregate; if (ignoreError == false) { failures.add( fail( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index dbdaa2aab2df7..298b4a33147a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -212,7 +212,7 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex return newAgg.toAttribute(); }); if (changed.get()) { - secondPassAggs.add(new Alias(alias.source(), alias.name(), outerAgg, agg.id())); + secondPassAggs.add(new Alias(alias.source(), alias.name(), new Values(outerAgg.source(), outerAgg), agg.id())); } else { // TODO: reject over_time_aggregation only final Expression aggField = af.field(); @@ -240,9 +240,7 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex } } } - // time-series aggregates must be grouped by _tsid (and time-bucket) first and re-group by users key - List firstPassGroupings = new ArrayList<>(); - firstPassGroupings.add(tsid.get()); + List packDimensions = new ArrayList<>(); List secondPassGroupings = new ArrayList<>(); List unpackDimensions = new ArrayList<>(); @@ -264,12 +262,39 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex } }); NamedExpression timeBucket = timeBucketRef.get(); - for (var group : aggregate.groupings()) { + + boolean tsidInGroupings = aggregate.groupings().stream().anyMatch(expr -> { + Attribute attr = Expressions.attribute(expr); + return attr != null && attr.name().equals(MetadataAttribute.TSID_FIELD); + }); + + Expression tsidExpression = aggregate.groupings() + .stream() + .filter(g -> g instanceof Attribute attr && attr.name().equals(MetadataAttribute.TSID_FIELD)) + .findFirst() + .orElse(tsid.get()); + + Attribute tsidAttribute = Expressions.attribute(tsidExpression); + + List aggGroupings = new ArrayList<>(); + if (tsidInGroupings == false) { + aggGroupings.add(tsidExpression); + } + aggGroupings.addAll(aggregate.groupings()); + + // time-series aggregates must be grouped by _tsid (and time-bucket) first and re-group by users key + List firstPassGroupings = new ArrayList<>(); + for (var group : aggGroupings) { if (group instanceof Attribute == false) { throw new EsqlIllegalArgumentException("expected named expression for grouping; got " + group); } final Attribute g = (Attribute) group; - if (timeBucket != null && g.id().equals(timeBucket.id())) { + + if (tsidAttribute != null && g.id().equals(tsidAttribute.id())) { + var newFinalGroup = tsidAttribute.toAttribute(); + firstPassGroupings.add(newFinalGroup); + secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id())); + } else if (timeBucket != null && g.id().equals(timeBucket.id())) { var newFinalGroup = timeBucket.toAttribute(); firstPassGroupings.add(newFinalGroup); secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id())); @@ -330,6 +355,7 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex mergeExpressions(secondPassAggs, secondPassGroupings) ); Eval unpackValues = new Eval(secondPhase.source(), secondPhase, unpackDimensions); + List projects = new ArrayList<>(); for (NamedExpression agg : secondPassAggs) { projects.add(Expressions.attribute(agg)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java index b40b690514b92..4ff8d893986de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java @@ -17,6 +17,12 @@ import java.util.List; import java.util.Objects; +/** + * Drop is an intermediary object used during the {@link org.elasticsearch.xpack.esql.analysis.Analyzer} phase of query planning. + * DROP commands are parsed into Drop objects, which the {@link org.elasticsearch.xpack.esql.analysis.Analyzer.ResolveRefs} rule then + * rewrites into {@link org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject} plans, along with other projection-like commands. + * As such, Drop is neither serializable nor able to be mapped to a corresponding physical plan. + */ public class Drop extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic { private final List removals; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index d7a5772fe11ec..8e1f342456794 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -233,54 +233,30 @@ protected void checkTimeSeriesAggregates(Failures failures) { ); } } - if (outer instanceof TimeSeriesAggregateFunction ts) { - outer.field() + outer.field().forEachDown(AggregateFunction.class, nested -> { + if (nested instanceof TimeSeriesAggregateFunction == false) { + fail( + this, + "cannot use aggregate function [{}] inside aggregation function [{}];" + + "only time-series aggregation function can be used inside another aggregation function", + nested.sourceText(), + outer.sourceText() + ); + } + nested.field() .forEachDown( AggregateFunction.class, - nested -> failures.add( + nested2 -> failures.add( fail( this, - "cannot use aggregate function [{}] inside time-series aggregation function [{}]", + "cannot use aggregate function [{}] inside over-time aggregation function [{}]", nested.sourceText(), - outer.sourceText() + nested2.sourceText() ) ) ); - // reject `TS metrics | STATS rate(requests)` - // TODO: support this - failures.add( - fail( - ts, - "time-series aggregate function [{}] can only be used with the TS command " - + "and inside another aggregate function", - ts.sourceText() - ) - ); - } else { - outer.field().forEachDown(AggregateFunction.class, nested -> { - if (nested instanceof TimeSeriesAggregateFunction == false) { - fail( - this, - "cannot use aggregate function [{}] inside aggregation function [{}];" - + "only time-series aggregation function can be used inside another aggregation function", - nested.sourceText(), - outer.sourceText() - ); - } - nested.field() - .forEachDown( - AggregateFunction.class, - nested2 -> failures.add( - fail( - this, - "cannot use aggregate function [{}] inside over-time aggregation function [{}]", - nested.sourceText(), - nested2.sourceText() - ) - ) - ); - }); - } + }); + // } } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TimeSeriesAggregationsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TimeSeriesAggregationsTests.java new file mode 100644 index 0000000000000..f282904f9c9e8 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TimeSeriesAggregationsTests.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.plan.IndexPattern; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; +import org.junit.BeforeClass; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +public class TimeSeriesAggregationsTests extends AbstractLogicalPlanOptimizerTests { + + private static Map mappingK8s; + private static Analyzer k8sAnalyzer; + + @BeforeClass + public static void initK8s() { + mappingK8s = loadMapping("k8s-mappings.json"); + EsIndex k8sIndex = new EsIndex("k8s", mappingK8s, Map.of("k8s", IndexMode.TIME_SERIES)); + + IndexResolution indexResolution = IndexResolution.valid(k8sIndex); + + Map resolutions = new HashMap<>(); + resolutions.put(new IndexPattern(Source.EMPTY, indexResolution.get().name()), indexResolution); + + k8sAnalyzer = new Analyzer( + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + resolutions, + defaultLookupResolution(), + enrichResolution, + emptyInferenceResolution(), + TransportVersion.minimumCompatible() + ), + TEST_VERIFIER + ); + } + + protected LogicalPlan planK8s(String query) { + LogicalPlan analyzed = k8sAnalyzer.analyze(parser.createStatement(query)); + return logicalOptimizer.optimize(analyzed); + } + + /** + * Translation: TS k8s | STATS avg_over_time(field) → TS k8s | STATS VALUES(avg_over_time(field)) BY _tsid + *
+ * AVG_OVER_TIME translates into [Eval[[SUMOVERTIME(network.cost{f}#22,true[BOOLEAN]) / COUNTOVERTIME(network.cost{f}#22,true[BOOLEAN]) + * AS avg_over_time(network.cost)#4]]] + */ + public void testBareAvgOverTime() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS avg_over_time(network.cost) + """); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should group by _tsid", hasTsid, is(true)); + + var aggregates = tsa.aggregates(); + assertThat("Should have aggregates", aggregates.isEmpty(), is(false)); + } + + /** + * Translation: TS k8s | STATS sum_over_time(field) → TS k8s | STATS VALUES(sum_over_time(field)) BY _tsid + */ + public void testBareSumOverTime() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS sum_over_time(network.cost) + """); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should group by _tsid", hasTsid, is(true)); + + var aggregates = tsa.aggregates(); + assertThat("Should have aggregates", aggregates.isEmpty(), is(false)); + } + + /** + * Translation: TS k8s | STATS sum_over_time(field) BY TBUCKET(1h) → TS k8s | STATS VALUES(sum_over_time(field)) BY _tsid, TBUCKET(1h) + */ + public void testSumOverTimeWithTBucket() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS sum_over_time(network.cost) BY TBUCKET(1 hour) + """); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + assertThat(groupings.size(), is(2)); + + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should group by _tsid", hasTsid, is(true)); + + boolean hasBucket = groupings.stream().anyMatch(g -> g.name().equalsIgnoreCase("BUCKET")); + assertThat("Should group by bucket", hasBucket, is(true)); + } + + /** + * Translation: TS k8s | STATS rate(field) BY TBUCKET(1h) → TS k8s | STATS VALUES(rate(field)) BY _tsid, TBUCKET(1h) + */ + public void testRateWithTBucket() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS rate(network.total_bytes_out) BY TBUCKET(1 hour) + """); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + assertThat(groupings.size(), is(2)); + + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should group by _tsid", hasTsid, is(true)); + + boolean hasBucket = groupings.stream().anyMatch(g -> g.name().equalsIgnoreCase("BUCKET")); + assertThat("Should group by bucket", hasBucket, is(true)); + } + + /** + * Wrapped _OVER_TIME functions are not translated. + */ + public void testAlreadyWrappedAggregateNotModified() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan planBefore = planK8s(""" + TS k8s + | STATS MAX(rate(network.total_bytes_out)) + """); + + assertThat("Plan should be valid", planBefore, is(instanceOf(LogicalPlan.class))); + } + + /** + * Support of explicit BY _tsid. + */ + public void testExistingTsidGroupingPreserved() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s("TS k8s | STATS sum_over_time(network.cost) BY _tsid"); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + assertThat(groupings.size(), is(1)); + + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should still group by _tsid", hasTsid, is(true)); + } + + public void testCountOverTime() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled()); + LogicalPlan plan = planK8s("TS k8s | STATS count = count_over_time(network.cost)"); + + TimeSeriesAggregate tsa = findTimeSeriesAggregate(plan); + assertThat("Should have TimeSeriesAggregate", tsa, is(instanceOf(TimeSeriesAggregate.class))); + + List groupings = tsa.groupings().stream().filter(g -> g instanceof Attribute).map(g -> (Attribute) g).toList(); + assertThat(groupings.size(), is(1)); + + boolean hasTsid = groupings.stream().anyMatch(g -> g.name().equals(MetadataAttribute.TSID_FIELD)); + assertThat("Should still group by _tsid", hasTsid, is(true)); + } + + private TimeSeriesAggregate findTimeSeriesAggregate(LogicalPlan plan) { + Holder tsAggregateHolder = new Holder<>(); + plan.forEachDown(TimeSeriesAggregate.class, tsAggregateHolder::set); + return tsAggregateHolder.get(); + } +}