From 20ec7dc7f8c4b780329bd55ec1e58bfcbad057d7 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 22 May 2026 16:10:56 +0800 Subject: [PATCH] [feature](be/fe) Add exponential_moving_average aggregate function (#63499) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem Summary: Doris lacked an exponential moving average aggregate function. This implements `exponential_moving_average(half_decay, value, time)` ported from ClickHouse's ExponentialMovingAverage aggregate. Algorithm: - State stores (value, time, half_decay) as doubles. - On add(): scale existing value to the new timestamp via `2^(-dt/half_decay)`, then accumulate. Store half_decay in state so it is available during merge(). - On merge(): advance both states to the later timestamp, then sum. This is commutative and associative, so row order doesn't matter. - Result: `value * (1 - 2^(-1/half_decay))` (normalised by sum of weights). Changes: - BE: `aggregate_function_ema.h/cpp` — state struct + function class, registered in `aggregate_function_simple_factory.cpp`. - FE: `ExponentialMovingAverage.java` — 3-arg DOUBLE signature, visitor method added to `AggregateFunctionVisitor`, registered in `BuiltinAggregateFunctions`. - Regression test: `query_p0/aggregate/exponential_moving_average/`. ### Release note New aggregate function `exponential_moving_average(half_decay, value, time)` computes the exponential moving average over a stream of (value, time) pairs with the given half-decay parameter. ### Check List (For Author) - Test: Regression test added (query_p0/aggregate/exponential_moving_average) - Behavior changed: No (new function) - Does this need documentation: [docs](https://github.com/apache/doris-website/pull/3726) --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../aggregate/aggregate_function_ema.cpp | 31 ++++ .../exprs/aggregate/aggregate_function_ema.h | 173 ++++++++++++++++++ .../aggregate_function_simple_factory.cpp | 2 + .../catalog/BuiltinAggregateFunctions.java | 2 + .../agg/ExponentialMovingAverage.java | 122 ++++++++++++ .../visitor/AggregateFunctionVisitor.java | 5 + .../exponential_moving_average.out | 31 ++++ .../exponential_moving_average.groovy | 158 ++++++++++++++++ 8 files changed, 524 insertions(+) create mode 100644 be/src/exprs/aggregate/aggregate_function_ema.cpp create mode 100644 be/src/exprs/aggregate/aggregate_function_ema.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java create mode 100644 regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out create mode 100644 regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy diff --git a/be/src/exprs/aggregate/aggregate_function_ema.cpp b/be/src/exprs/aggregate/aggregate_function_ema.cpp new file mode 100644 index 00000000000000..b3dde6c06bb426 --- /dev/null +++ b/be/src/exprs/aggregate/aggregate_function_ema.cpp @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/aggregate/aggregate_function_ema.h" + +#include "exprs/aggregate/aggregate_function_simple_factory.h" +#include "exprs/aggregate/helpers.h" + +namespace doris { + +void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory) { + factory.register_function_both( + "exponential_moving_average", + creator_without_type::creator); +} + +} // namespace doris diff --git a/be/src/exprs/aggregate/aggregate_function_ema.h b/be/src/exprs/aggregate/aggregate_function_ema.h new file mode 100644 index 00000000000000..edafb973bb01d1 --- /dev/null +++ b/be/src/exprs/aggregate/aggregate_function_ema.h @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file is adapted from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp + +#pragma once + +#include +#include + +#include "core/assert_cast.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_number.h" +#include "core/types.h" +#include "exprs/aggregate/aggregate_function.h" + +namespace doris { +class Arena; +class BufferReadable; +class BufferWritable; +class IColumn; + +/** + * Exponentially smoothed moving average over time. + * + * Each value corresponds to a timeunit index. The half_decay parameter is the + * time lag at which exponential weights decay by one-half. + * + * State is a (value, time) pair representing the exponentially accumulated sum + * at a reference time. To get the average, divide by sumWeights(half_decay). + * + * Formula: + * scale(dt, x) = 2^(-dt/x) + * sumWeights(x) = 1 / (1 - 2^(-1/x)) + * add(v, t): merge current state with point (v, t) + * merge(a, b): move both to the later time, then sum values + * get(): value / sumWeights(half_decay) + * + * Usage: exponential_moving_average(half_decay, value, timeunit) + * - half_decay: constant double, the half-life period in timeunit units + * - value: numeric column to average + * - timeunit: numeric time index (not raw timestamp; use intDiv if needed) + * Returns DOUBLE. + */ +struct ExponentialMovingAverageData { + double value = 0.0; + double time = 0.0; + double half_decay = 0.0; + + static double scale(double time_passed, double hd) { return std::exp2(-time_passed / hd); } + + static double sum_weights(double hd) { return 1.0 / (1.0 - std::exp2(-1.0 / hd)); } + + void add(double new_value, double current_time, double hd) { + half_decay = hd; + ExponentialMovingAverageData other; + other.value = new_value; + other.time = current_time; + merge_point(other, hd); + } + + void merge_point(const ExponentialMovingAverageData& other, double hd) { + if (time > other.time) { + value = value + other.value * scale(time - other.time, hd); + } else if (time < other.time) { + value = other.value + value * scale(other.time - time, hd); + time = other.time; + } else { + value = value + other.value; + } + } + + void merge(const ExponentialMovingAverageData& rhs) { + double hd = half_decay != 0.0 ? half_decay : rhs.half_decay; + if (hd == 0.0) { + return; + } + half_decay = hd; + merge_point(rhs, hd); + } + + double get() const { + if (half_decay == 0.0) { + return 0.0; + } + return value / sum_weights(half_decay); + } + + void write(BufferWritable& buf) const { + buf.write_binary(value); + buf.write_binary(time); + buf.write_binary(half_decay); + } + + void read(BufferReadable& buf) { + buf.read_binary(value); + buf.read_binary(time); + buf.read_binary(half_decay); + } + + void reset() { + value = 0.0; + time = 0.0; + half_decay = 0.0; + } +}; + +class AggregateFunctionExponentialMovingAverage final + : public IAggregateFunctionDataHelper, + MultiExpression, + NullableAggregateFunction { +public: + AggregateFunctionExponentialMovingAverage(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper( + argument_types_) {} + + String get_name() const override { return "exponential_moving_average"; } + + DataTypePtr get_return_type() const override { return std::make_shared(); } + + void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } + + void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, + Arena&) const override { + const double half_decay = + assert_cast(*columns[0]) + .get_data()[row_num]; + const double new_value = + assert_cast(*columns[1]) + .get_data()[row_num]; + const double current_time = + assert_cast(*columns[2]) + .get_data()[row_num]; + this->data(place).add(new_value, current_time, half_decay); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, + Arena&) const override { + this->data(place).merge(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena&) const override { + this->data(place).read(buf); + } + + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { + assert_cast(to).get_data().push_back(this->data(place).get()); + } +}; + +} // namespace doris diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp index c2f4c6295a18c0..9fc311cdf08380 100644 --- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp +++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp @@ -80,6 +80,7 @@ void register_aggregate_function_percentile_reservoir(AggregateFunctionSimpleFac void register_aggregate_function_ai_agg(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_bool_union(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_sem(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory); AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { static std::once_flag oc; @@ -137,6 +138,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_ai_agg(instance); register_aggregate_function_bool_union(instance); register_aggregate_function_sem(instance); + register_aggregate_function_ema(instance); // Register foreach and foreachv2 functions register_aggregate_function_combinator_foreach(instance); register_aggregate_function_combinator_foreachv2(instance); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java index f98c3e14df8f26..66140acc790d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum; import org.apache.doris.nereids.trees.expressions.functions.agg.Covar; import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp; +import org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd; @@ -132,6 +133,7 @@ private BuiltinAggregateFunctions() { agg(CollectSet.class, "collect_set", "group_uniq_array"), agg(Corr.class, "corr"), agg(CorrWelford.class, "corr_welford"), + agg(ExponentialMovingAverage.class, "exponential_moving_average"), agg(Count.class, "count"), agg(CountByEnum.class, "count_by_enum"), agg(Covar.class, "covar", "covar_pop"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java new file mode 100644 index 00000000000000..caeb3b82dc7daf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.agg; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.DoubleType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Exponential Moving Average aggregate function. + * + *

Computes the exponentially smoothed moving average over time-indexed values. + * The half_decay parameter controls the half-life period: the time after which the + * exponential weight of a past value decays by a factor of 1/2. + * + *

Signature: {@code exponential_moving_average(half_decay DOUBLE, value DOUBLE, + * timeunit DOUBLE) -> DOUBLE} + * + *

The timeunit argument is a numeric time index, not a raw timestamp. For + * timestamp columns use {@code intDiv(toUnixTimestamp(ts), interval_seconds)}. + */ +public class ExponentialMovingAverage extends NullableAggregateFunction + implements ExplicitlyCastableSignature { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(DoubleType.INSTANCE) + .args(DoubleType.INSTANCE, DoubleType.INSTANCE, DoubleType.INSTANCE) + ); + + /** + * Constructor with 3 arguments: (half_decay, value, timeunit). + */ + public ExponentialMovingAverage(Expression halfDecay, Expression value, Expression timeunit) { + this(false, halfDecay, value, timeunit); + } + + /** + * Constructor with distinct flag and 3 arguments. + */ + public ExponentialMovingAverage(boolean distinct, Expression halfDecay, + Expression value, Expression timeunit) { + this(distinct, false, halfDecay, value, timeunit); + } + + /** + * Full constructor. + */ + public ExponentialMovingAverage(boolean distinct, boolean alwaysNullable, + Expression halfDecay, Expression value, Expression timeunit) { + super("exponential_moving_average", distinct, alwaysNullable, halfDecay, value, timeunit); + } + + /** Constructor for withChildren and reuse signature. */ + private ExponentialMovingAverage(NullableAggregateFunctionParams functionParams) { + super(functionParams); + } + + @Override + public void checkLegalityBeforeTypeCoercion() { + if (!getArgument(0).isConstant()) { + throw new AnalysisException("The half_decay argument of " + + getName() + " must be a constant"); + } + if (!getArgumentType(0).isNumericType()) { + throw new AnalysisException("The half_decay argument of " + + getName() + " must be numeric"); + } + if (!getArgumentType(1).isNumericType()) { + throw new AnalysisException("The value argument of " + + getName() + " must be numeric"); + } + if (!getArgumentType(2).isNumericType()) { + throw new AnalysisException("The timeunit argument of " + + getName() + " must be numeric"); + } + } + + @Override + public ExponentialMovingAverage withDistinctAndChildren(boolean distinct, + List children) { + Preconditions.checkArgument(children.size() == 3); + return new ExponentialMovingAverage(getFunctionParams(distinct, children)); + } + + @Override + public ExponentialMovingAverage withAlwaysNullable(boolean alwaysNullable) { + return new ExponentialMovingAverage(getAlwaysNullableFunctionParams(alwaysNullable)); + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitExponentialMovingAverage(this, context); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java index e8b3ae193db48d..251012c16fec5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum; import org.apache.doris.nereids.trees.expressions.functions.agg.Covar; import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp; +import org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd; @@ -125,6 +126,10 @@ default R visitBitmapAgg(BitmapAgg bitmapAgg, C context) { return visitAggregateFunction(bitmapAgg, context); } + default R visitExponentialMovingAverage(ExponentialMovingAverage ema, C context) { + return visitNullableAggregateFunction(ema, context); + } + default R visitBitmapIntersect(BitmapIntersect bitmapIntersect, C context) { return visitAggregateFunction(bitmapIntersect, context); } diff --git a/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out new file mode 100644 index 00000000000000..2ca841b794fb3c --- /dev/null +++ b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !basic -- +92.25779635374204 + +-- !single_row -- +47.5 + +-- !group_by -- +1 11.25 +2 50 + +-- !half_decay_zero -- +0 + +-- !negative -- +2.196699141100893 + +-- !null -- +11.25 + +-- !empty -- +\N + +-- !dup_time -- +15.46875 + +-- !window -- +0 5 +1 7.5 +2 8.75 + diff --git a/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy new file mode 100644 index 00000000000000..18fb38bd2ec8fa --- /dev/null +++ b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("exponential_moving_average") { + // Prepare main test table (dropped first to keep the environment for debugging) + sql "drop table if exists ema_test;" + sql """ + create table ema_test ( + id int, + v double, + t double + ) + duplicate key (id) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_test values + (1, 95, 1), (2, 95, 2), (3, 95, 3), (4, 96, 4), (5, 96, 5), + (6, 96, 6), (7, 96, 7), (8, 97, 8), (9, 97, 9), (10, 97, 10), + (11, 97, 11), (12, 98, 12), (13, 98, 13), (14, 98, 14), (15, 98, 15), + (16, 99, 16), (17, 99, 17), (18, 99, 18), (19, 100, 19), (20, 100, 20); + """ + + // Basic aggregate: result matches ClickHouse's exponentialMovingAverage(5)(v, t) + qt_basic """ + select exponential_moving_average(5.0, v, t) from ema_test; + """ + + // Single-row: EMA of one value (v=95, t=1) with half_decay=1 + // state={95,1}, sum_weights(1)=1/(1-0.5)=2, result=95/2=47.5 + qt_single_row """ + select exponential_moving_average(1.0, v, t) from ema_test where id = 1; + """ + + // GROUP BY: two groups, group 1 has two rows, group 2 has one row + sql "drop table if exists ema_group_test;" + sql """ + create table ema_group_test ( + grp int, + v double, + t double + ) + duplicate key (grp) + distributed by hash(grp) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_group_test values + (1, 10, 1), (1, 20, 3), (2, 100, 1); + """ + order_qt_group_by """ + select grp, exponential_moving_average(1.0, v, t) + from ema_group_test group by grp order by grp; + """ + + // half_decay = 0 edge case + qt_half_decay_zero """ + select exponential_moving_average(0.0, v, t) from ema_test where id = 1; + """ + + // Negative values + sql "drop table if exists ema_neg_test;" + sql """ + create table ema_neg_test ( + id int, + v double, + t double + ) + duplicate key (id) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_neg_test values (1, -10, 1), (2, 10, 5); + """ + qt_negative """ + select exponential_moving_average(2.0, v, t) from ema_neg_test; + """ + + // NULL handling + sql "drop table if exists ema_null_test;" + sql """ + create table ema_null_test ( + id int, + v double null, + t double null + ) + duplicate key (id) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_null_test values (1, 10, 1), (2, null, 2), (3, 20, 3); + """ + qt_null """ + select exponential_moving_average(1.0, v, t) from ema_null_test; + """ + + // Empty result set + qt_empty """ + select exponential_moving_average(1.0, v, t) from ema_null_test where v > 100; + """ + + // Duplicate times: same time values are summed directly + sql "drop table if exists ema_dup_time_test;" + sql """ + create table ema_dup_time_test ( + id int, + v double, + t double + ) + duplicate key (id) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_dup_time_test values (1, 10, 5), (2, 20, 5), (3, 30, 10); + """ + qt_dup_time """ + select exponential_moving_average(1.0, v, t) from ema_dup_time_test; + """ + + // Cumulative window function + sql "drop table if exists ema_window_test;" + sql """ + create table ema_window_test ( + id int, + t double, + v double + ) + duplicate key (id) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ + insert into ema_window_test values (1, 0, 10), (2, 1, 10), (3, 2, 10); + """ + order_qt_window """ + select t, exponential_moving_average(1.0, v, t) + over (order by t rows between unbounded preceding and current row) as ema + from ema_window_test order by t; + """ +}