Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5b93ed9
add capability and initial CSV test (WIP)
not-napoleon Aug 27, 2025
7e15b58
tests (that don't pass)
not-napoleon Aug 29, 2025
6ded62e
Merge branch 'main' into esql-group-by-all-over-time
not-napoleon Aug 29, 2025
90e8144
tests and tracing notes
not-napoleon Sep 3, 2025
a49979c
Merge branch 'main' into esql-group-by-all-over-time
not-napoleon Sep 5, 2025
60d8343
added rate agg to tests
not-napoleon Sep 8, 2025
611058f
add test gates
not-napoleon Sep 9, 2025
7504f02
detect when group by all should kick in
not-napoleon Sep 9, 2025
9517ba3
add outline for the rest of the steps
not-napoleon Sep 9, 2025
a953df9
why does this throw?
not-napoleon Sep 10, 2025
317da80
load dimension and metric data in the tests
not-napoleon Sep 10, 2025
b61fedb
build the values aggs for the dimensions
not-napoleon Sep 10, 2025
ffd140d
work around output changed verification
not-napoleon Sep 10, 2025
7733326
notes for future me, and others
not-napoleon Sep 10, 2025
2ddc444
draft of analyzer rule
not-napoleon Sep 16, 2025
9867913
wire up the rule
not-napoleon Sep 17, 2025
f8f523e
many tests passing
not-napoleon Sep 17, 2025
0a2104b
Merge branch 'main' into esql-group-by-all-over-time
not-napoleon Sep 17, 2025
0719784
failing for a new reason
not-napoleon Sep 17, 2025
80247dc
[CI] Auto commit changes from spotless
Sep 17, 2025
0dea349
misc cleanups
not-napoleon Sep 18, 2025
77213a4
Merge branch 'main' into esql-group-by-all-over-time
not-napoleon Sep 18, 2025
aabcf4c
more testing
not-napoleon Sep 18, 2025
1240a21
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-group-by…
not-napoleon Sep 18, 2025
09b8df8
forgot to add this test yesterday
not-napoleon Sep 19, 2025
bd38eea
[CI] Auto commit changes from spotless
Sep 19, 2025
22e0973
[CI] Update transport version definitions
Oct 2, 2025
1ea008e
Merge branch 'main' into esql-group-by-all-over-time
leontyevdv Oct 29, 2025
97c7f60
Merge branch 'main' into esql-group-by-all-over-time
leontyevdv Oct 30, 2025
5e5e915
Fix unit tests
leontyevdv Oct 30, 2025
39428b7
Merge branch 'main' into feature/esql-group-by-all
leontyevdv Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -721,3 +721,24 @@ mx:integer | tbucket:datetime
;




Group by all for over time agg with no wrapper, no time bucket
required_capability: metrics_command
required_capability: metrics_group_by_all

TS k8s
| STATS count = count_over_time(network.cost)
;

count:long | cluster:keyword | pod:keyword
13 | staging | one
20 | staging | two
24 | staging | three
29 | qa | one
29 | qa | two
24 | qa | three
23 | prod | one
16 | prod | two
22 | prod | three
;
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,10 @@ public enum Cap {
*/
@Deprecated
METRICS_COMMAND(Build.current().isSnapshot()),
/**
* Enables automatically grouping by all dimension fields in TS mode queries
*/
METRICS_GROUP_BY_ALL(),

/**
* Are the {@code documents_found} and {@code values_loaded} fields available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
new ResolveLookupTables(),
new ResolveFunctions(),
new ResolveInference(),
new DateMillisToNanosInEsRelation()
new DateMillisToNanosInEsRelation(),
new TimeSeriesGroupByAll()
),
new Batch<>(
"Resolution",
Expand Down Expand Up @@ -1235,7 +1236,7 @@ private LogicalPlan resolveKeep(Project p, List<Attribute> childOutput) {
private LogicalPlan resolveDrop(Drop drop, List<Attribute> childOutput) {
List<NamedExpression> resolvedProjections = new ArrayList<>(childOutput);

for (var ne : drop.removals()) {
for (NamedExpression ne : drop.removals()) {
List<? extends NamedExpression> resolved;

if (ne instanceof UnresolvedNamePattern np) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.analysis;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
import org.elasticsearch.xpack.esql.rule.Rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* This rule implements the "group by all" logic for time series aggregations. It is intended to work in conjunction with
* {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate}, and should be run before that
* rule. This rule adds output columns corresponding to the dimensions on the indices involved in the query, as discovered
* by the {@link org.elasticsearch.xpack.esql.session.IndexResolver}. Despite the name, this does not acutally group on the
* dimension values, for efficiency reasons.
* <p>
* This rule will operate on "bare" over time aggregations,
*/
public class TimeSeriesGroupByAll extends Rule<LogicalPlan, LogicalPlan> {
@Override
public LogicalPlan apply(LogicalPlan logicalPlan) {
return logicalPlan.transformUp(node -> node instanceof TimeSeriesAggregate, this::rule);
}

public LogicalPlan rule(TimeSeriesAggregate aggregate) {
// Flag to check if we should apply this rule.
boolean hasTopLevelOverTimeAggs = false;
Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
// the new `Value(dimension)` aggregation functions we intend to add to the query, along with the translated over time aggs
List<NamedExpression> newAggregateFunctions = new ArrayList<>();
for (NamedExpression agg : aggregate.aggregates()) {
// We assume that all the aggregate functions in aggregates are wrapped as Aliases. I don't know why
// this should be the case, but it is ubiquitous throughout the planner.
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
if (af instanceof TimeSeriesAggregateFunction tsAgg) {
hasTopLevelOverTimeAggs = true;
newAggregateFunctions.add(new Alias(alias.source(), alias.name(), tsAgg.perTimeSeriesAggregation()));
}
af.forEachDown(TimeSeriesAggregateFunction.class, tsAgg -> {
if (tsAgg instanceof Rate) {
hasRateAggregates.set(Boolean.TRUE);
}
});
// TODO: Deal with mixed top level and wrapped aggs case
}
}
if (hasTopLevelOverTimeAggs == false) {
// If there are no top level time series aggregations, there's no need for this rule to apply
return aggregate;
}

// Grouping parameters for the new aggregation node.
List<Expression> groupings = new ArrayList<>();

Holder<Attribute> tsid = new Holder<>();
Holder<Attribute> timestamp = new Holder<>();
Set<Attribute> dimensions = new HashSet<>();
getTsFields(aggregate, tsid, timestamp, dimensions);
if (tsid.get() == null) {
tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
}
if (timestamp.get() == null) {
throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
}

// NOCOMMIT - this is redundant with behavior in TranslateTSA
// NOCOMMIT - This behavior is tangential to this rule; maybe we should break it out into a separate rule?
// Add the _tsid to the EsRelation Leaf, if it's not there already
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
IndexMode indexMode = hasRateAggregates.get() ? r.indexMode() : IndexMode.STANDARD;
if (r.output().contains(tsid.get()) == false) {
return new EsRelation(
r.source(),
r.indexPattern(),
indexMode,
r.indexNameWithModes(),
CollectionUtils.combine(r.output(), tsid.get())
);
} else {
return new EsRelation(r.source(), r.indexPattern(), indexMode, r.indexNameWithModes(), r.output());
}
});
// Group the new aggregations by tsid. This is equivalent to grouping by all dimensions.
// NOTE - we do not add the tsid to the aggregates list because we do not want to include it in the final output
groupings.add(tsid.get());
// Add the time bucket grouping for the new agg
// NOCOMMIT - validation rule for groupings?
groupings.addAll(aggregate.groupings());
for (Attribute dimension : dimensions) {
// We add the dimensions as Values aggs here as an optimization. Grouping by the _tsid should already ensure
// one row per unique combination of dimensions, and collecting those values in a Values aggregation is less
// computation than hashing them for a grouping operation.
// NOCOMMIT - This is also redundant. TranslateTSA already turns all the inner groupings into values aggs. Just do this there.
newAggregateFunctions.add(new Alias(dimension.source(), dimension.name(), new Values(dimension.source(), dimension)));
}
TimeSeriesAggregate newAggregate = new TimeSeriesAggregate(
aggregate.source(),
newChild,
groupings,
newAggregateFunctions,
null,
true
);
return newAggregate;
}

private static void getTsFields(
TimeSeriesAggregate aggregate,
Holder<Attribute> tsid,
Holder<Attribute> timestamp,
Set<Attribute> dimensions
) {
aggregate.forEachDown(EsRelation.class, r -> {
for (Attribute attr : r.output()) {
if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid.set(attr);
}
if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp.set(attr);
}
if (attr.isDimension()) {
dimensions.add(attr);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
import java.util.List;
import java.util.Objects;

/**
* Drop is an intermediary object used during the {@link org.elasticsearch.xpack.esql.analysis.Analyzer} phase of query planning.
* DROP commands are parsed into Drop objects, which the {@link org.elasticsearch.xpack.esql.analysis.Analyzer.ResolveRefs} rule then
* rewrites into {@link org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject} plans, along with other projection-like commands.
* As such, Drop is neither serializable nor able to be mapped to a corresponding physical plan.
*/
public class Drop extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic {
private final List<NamedExpression> removals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,36 @@ public class TimeSeriesAggregate extends Aggregate {

private final Bucket timeBucket;

private final boolean hasTopLevelOverTimeFunctions;

public TimeSeriesAggregate(
Source source,
LogicalPlan child,
List<Expression> groupings,
List<? extends NamedExpression> aggregates,
Bucket timeBucket
) {
this(source, child, groupings, aggregates, timeBucket, false);
}

public TimeSeriesAggregate(
Source source,
LogicalPlan child,
List<Expression> groupings,
List<? extends NamedExpression> aggregates,
Bucket timeBucket,
boolean hasTopLevelOverTimeFunctions
) {
super(source, child, groupings, aggregates);
this.timeBucket = timeBucket;
this.hasTopLevelOverTimeFunctions = hasTopLevelOverTimeFunctions;
}

public TimeSeriesAggregate(StreamInput in) throws IOException {
super(in);
this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp));
// Shouldn't need to be serialized; at least not yet
this.hasTopLevelOverTimeFunctions = false;
}

@Override
Expand All @@ -67,24 +83,28 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(timeBucket);
}

public boolean hasTopLevelOverTimeFunctions() {
return hasTopLevelOverTimeFunctions;
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected NodeInfo<Aggregate> info() {
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket);
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, hasTopLevelOverTimeFunctions);
}

@Override
public TimeSeriesAggregate replaceChild(LogicalPlan newChild) {
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket);
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, hasTopLevelOverTimeFunctions);
}

@Override
public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, hasTopLevelOverTimeFunctions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.List;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;

public class TimeseriesGroupByAllPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests {
public TimeseriesGroupByAllPhysicalPlannerTests(String name, Configuration config) {
super(name, config);
}

public void testSimple() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
assumeTrue("requires metrics group by all", EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL.isEnabled());

String testQuery = """
TS k8s
| STATS count = count_over_time(network.cost)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery);

List<PhysicalPlan> aggs = actualPlan.collect(node -> node instanceof TimeSeriesAggregateExec);
for (PhysicalPlan agg : aggs) {
if (agg instanceof TimeSeriesAggregateExec tsAgg) {
assertEquals(3, tsAgg.aggregates().size());
// Check the first child to unwrap the alias
Count countFn = as(tsAgg.aggregates().get(0).children().get(0), Count.class);
Values clusterFn = as(tsAgg.aggregates().get(1).children().get(0), Values.class);
Values podFn = as(tsAgg.aggregates().get(2).children().get(0), Values.class);
}
}
}
}
Loading