Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.promql.AcrossSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.promql.PlaceholderRelation;
Expand Down Expand Up @@ -185,7 +186,26 @@ private static MapResult mapFunction(PromqlCommand promqlCommand, PromqlFunction

LogicalPlan p = childResult.plan;
p = new Eval(stepBucket.source(), p, List.of(stepBucket));
p = new TimeSeriesAggregate(acrossAggregate.source(), p, groupings, aggs, null);
TimeSeriesAggregate tsAggregate = new TimeSeriesAggregate(acrossAggregate.source(), p, groupings, aggs, null);
p = tsAggregate;
// ToDouble conversion of the metric using an eval to ensure a consistent output type
Alias convertedValue = new Alias(
acrossAggregate.source(),
acrossAggregate.sourceText(),
new ToDouble(acrossAggregate.source(), p.output().getFirst().toAttribute()),
acrossAggregate.valueId()
);
p = new Eval(acrossAggregate.source(), p, List.of(convertedValue));
// Project to maintain the correct output order, as declared in AcrossSeriesAggregate#output:
// [value, step, ...groupings]
List<NamedExpression> projections = new ArrayList<>();
projections.add(convertedValue.toAttribute());
List<Attribute> output = tsAggregate.output();
for (int i = 1; i < output.size(); i++) {
projections.add(output.get(i));
}
p = new Project(acrossAggregate.source(), p, projections);

result = new MapResult(p, extras);
} else {
throw new QlIllegalArgumentException("Unsupported PromQL function call: {}", functionCall);
Expand All @@ -205,13 +225,11 @@ private static void initAggregatesAndGroupings(
Function esqlFunction = PromqlFunctionRegistry.INSTANCE.buildEsqlFunction(
acrossAggregate.functionName(),
acrossAggregate.source(),
// to double conversion of the metric to ensure a consistent output type
// TODO it's probably more efficient to wrap the function in the ToDouble
// but for some reason this doesn't work if you have an inner and outer aggregation
List.of(new ToDouble(target.source(), target))
List.of(target)
);

aggs.add(new Alias(acrossAggregate.source(), acrossAggregate.sourceText(), esqlFunction, acrossAggregate.valueId()));
Alias value = new Alias(acrossAggregate.source(), acrossAggregate.sourceText(), esqlFunction);
aggs.add(value);

// timestamp/step
aggs.add(stepBucket);
Expand Down