Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,13 @@ aggregate:
Divides the rows for each window partition into `n` buckets ranging from 1 to at most `n`.
If the number of rows in the window partition doesn't divide evenly into the number of buckets, then the remainder values are distributed one per bucket, starting with the first bucket.
For example, with 6 rows and 4 buckets, the bucket values would be as follows: 1 1 2 2 3 4
- sql: ARRAY_AGG([ ALL | DISTINCT ] expression [ RESPECT NULLS | IGNORE NULLS ])
table: FIELD.arrayAgg
description: |
By default or with keyword `ALL` and, return an array that concatenates the input rows
and returns `NULL` if there are no input rows. Use `DISTINCT` for one unique instance of each value.
By default `NULL` values are respected, use `IGNORE NULLS` to skip `NULL` values.
The `ORDER BY` clause is currently not supported.
- sql: JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
table: jsonObjectAgg(JsonOnNull, keyExpression, valueExpression)
description: |
Expand Down
6 changes: 6 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,12 @@ aggregate:
将窗口分区中的所有数据按照顺序划分为 n 个分组,返回分配给各行数据的分组编号(从 1 开始,最大为 n)。
如果不能均匀划分为 n 个分组,则剩余值从第 1 个分组开始,为每一分组分配一个。
比如某个窗口分区有 6 行数据,划分为 4 个分组,则各行的分组编号为:1,1,2,2,3,4。
- sql: ARRAY_AGG([ ALL | DISTINCT ] expression [ RESPECT NULLS | IGNORE NULLS ])
table: FIELD.arrayAgg
description: |
默认情况下或使用关键字ALL,返回输入行中表达式所组成的数组,并且如果没有输入行,则返回 `NULL`。使用 `DISTINCT` 则对所有值去重后计算。
默认情况下`NULL` 值不会被忽略,使用 `IGNORE NULLS` 忽略 `NULL` 值。
目前尚不支持 `ORDER BY` 子句。
- sql: JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
table: jsonObjectAgg(JsonOnNull, keyExpression, valueExpression)
description: |
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ arithmetic functions
Expression.var_pop
Expression.var_samp
Expression.collect
Expression.array_agg
Expression.alias
Expression.cast
Expression.try_cast
Expand Down
4 changes: 4 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,10 @@ def var_samp(self) -> 'Expression':
def collect(self) -> 'Expression':
return _unary_op("collect")(self)

@property
def array_agg(self) -> 'Expression':
return _unary_op("arrayAgg")(self)

def alias(self, name: str, *extra_names: str) -> 'Expression[T]':
"""
Specifies a name for an expression i.e. a field.
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def test_expression(self):
self.assertEqual('varPop(a)', str(expr1.var_pop))
self.assertEqual('varSamp(a)', str(expr1.var_samp))
self.assertEqual('collect(a)', str(expr1.collect))
self.assertEqual('ARRAY_AGG(a)', str(expr1.array_agg))
self.assertEqual("as(a, 'a', 'b', 'c')", str(expr1.alias('a', 'b', 'c')))
self.assertEqual('cast(a, INT)', str(expr1.cast(DataTypes.INT())))
self.assertEqual('asc(a)', str(expr1.asc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
"AFTER"
"ALWAYS"
"APPLY"
"ARRAY_AGG"
"ASC"
"ASSERTION"
"ASSIGNMENT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,23 @@ void testDescribeCatalog() {
sql("desc catalog a").ok("DESCRIBE CATALOG `A`");
}

// ignore test methods that we don't support
// BEGIN
// ARRAY_AGG
@Disabled
@Test
void testArrayAgg() {}
void testArrayAgg() {
sql("select\n"
+ " array_agg(ename respect nulls order by deptno, ename) as c1,\n"
+ " array_agg(ename order by deptno, ename desc) as c2,\n"
+ " array_agg(distinct ename) as c3,\n"
+ " array_agg(ename) as c4\n"
+ "from emp group by gender")
.ok(
"SELECT"
+ " ARRAY_AGG(`ENAME` ORDER BY `DEPTNO`, `ENAME`) RESPECT NULLS AS `C1`,"
+ " ARRAY_AGG(`ENAME` ORDER BY `DEPTNO`, `ENAME` DESC) AS `C2`,"
+ " ARRAY_AGG(DISTINCT `ENAME`) AS `C3`,"
+ " ARRAY_AGG(`ENAME`) AS `C4`\n"
+ "FROM `EMP`\n"
+ "GROUP BY `GENDER`");
}

// DESCRIBE SCHEMA
@Disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_AGG;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONCAT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
Expand Down Expand Up @@ -527,6 +528,11 @@ public OutType collect() {
return toApiSpecificExpression(unresolvedCall(COLLECT, toExpr()));
}

/** Returns array aggregate of a given expression. */
public OutType arrayAgg() {
return toApiSpecificExpression(unresolvedCall(ARRAY_AGG, toExpr()));
}

/**
* Returns a new value being cast to {@code toType}. A cast error throws an exception and fails
* the job. When performing a cast operation that may fail, like {@link DataTypes#STRING()} to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,13 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(argument(0))
.build();

public static final BuiltInFunctionDefinition ARRAY_AGG =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_AGG")
.kind(AGGREGATE)
.outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ARRAY))
.build();

// --------------------------------------------------------------------------------------------
// String functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class SqlAggFunctionVisitor extends ExpressionDefaultVisitor<SqlAggFuncti
BuiltInFunctionDefinitions.VAR_SAMP, FlinkSqlOperatorTable.VAR_SAMP);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.COLLECT, FlinkSqlOperatorTable.COLLECT);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.ARRAY_AGG, FlinkSqlOperatorTable.ARRAY_AGG);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL,
FlinkSqlOperatorTable.JSON_OBJECTAGG_NULL_ON_NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.calcite.sql.SqlPrefixOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlBasicAggFunction;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
Expand Down Expand Up @@ -1140,6 +1142,22 @@ public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
public static final SqlAggFunction APPROX_COUNT_DISTINCT =
SqlStdOperatorTable.APPROX_COUNT_DISTINCT;

/**
* Use the definitions in Flink instead of {@link SqlLibraryOperators#ARRAY_AGG}, because we
* return nullable ARRAY type. Order by clause like <code>ARRAY_AGG(x ORDER BY x, y)</code> for
* aggregate function is not supported yet, because the row data cannot be obtained inside the
* aggregate function.
*/
public static final SqlAggFunction ARRAY_AGG =
SqlBasicAggFunction.create(
SqlKind.ARRAY_AGG,
ReturnTypes.cascade(
ReturnTypes.TO_ARRAY, SqlTypeTransforms.TO_NULLABLE),
OperandTypes.ANY)
.withFunctionType(SqlFunctionCategory.SYSTEM)
.withSyntax(SqlSyntax.FUNCTION)
.withAllowsNullTreatment(true);

// ARRAY OPERATORS
public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = new SqlArrayConstructor();
public static final SqlOperator ELEMENT = SqlStdOperatorTable.ELEMENT;
Expand All @@ -1154,6 +1172,8 @@ public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
// SPECIAL OPERATORS
public static final SqlOperator MULTISET_VALUE = SqlStdOperatorTable.MULTISET_VALUE;
public static final SqlOperator ROW = SqlStdOperatorTable.ROW;
public static final SqlOperator IGNORE_NULLS = SqlStdOperatorTable.IGNORE_NULLS;
public static final SqlOperator RESPECT_NULLS = SqlStdOperatorTable.RESPECT_NULLS;
public static final SqlOperator OVERLAPS = SqlStdOperatorTable.OVERLAPS;
public static final SqlOperator LITERAL_CHAIN = SqlStdOperatorTable.LITERAL_CHAIN;
public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class AggFunctionFactory(
case a: SqlAggFunction if a.getKind == SqlKind.COLLECT =>
createCollectAggFunction(argTypes)

case a: SqlAggFunction if a.getKind == SqlKind.ARRAY_AGG =>
createArrayAggFunction(argTypes, call.ignoreNulls)

case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_OBJECTAGG =>
val onNull = fn.asInstanceOf[SqlJsonObjectAggAggFunction].getNullClause
new JsonObjectAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL)
Expand Down Expand Up @@ -620,4 +623,10 @@ class AggFunctionFactory(
private def createCollectAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = {
new CollectAggFunction(argTypes(0))
}

private def createArrayAggFunction(
types: Array[LogicalType],
ignoreNulls: Boolean): UserDefinedFunction = {
new ArrayAggFunction(types(0), ignoreNulls)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ object AggregateUtil extends Enumeration {
call.getAggregation,
false,
false,
false,
call.ignoreNulls,
call.getArgList,
-1, // remove filterArg
null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.functions;

import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;

import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
import static org.apache.flink.types.RowKind.UPDATE_BEFORE;

/** Tests for built-in ARRAY_AGG aggregation functions. */
class ArrayAggFunctionITCase extends BuiltInAggregateFunctionTestBase {

@Override
Stream<TestSpec> getTestCaseSpecs() {
return Stream.of(
TestSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_AGG)
.withDescription("ARRAY changelog stream aggregation")
.withSource(
ROW(STRING(), INT()),
Copy link
Contributor

@snuyanzin snuyanzin Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing
currently it checks only ability to work with INT input for ARRAY_AGG
it would be great to have tests for other types
especially ROW, ARRAY, MAP where expected output should be ARRAY<ROW>, ARRAY<ARRAY>, ARRAY<MAP>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full types test is in ArrayAggFunctionTest.

Arrays.asList(
Row.ofKind(INSERT, "A", 1),
Row.ofKind(INSERT, "A", 2),
Row.ofKind(INSERT, "B", 2),
Row.ofKind(INSERT, "B", 2),
Row.ofKind(INSERT, "B", 3),
Row.ofKind(INSERT, "C", 3),
Row.ofKind(INSERT, "C", null),
Row.ofKind(DELETE, "C", null),
Row.ofKind(INSERT, "D", null),
Row.ofKind(INSERT, "E", 4),
Row.ofKind(INSERT, "E", 5),
Row.ofKind(DELETE, "E", 5),
Row.ofKind(UPDATE_BEFORE, "E", 4),
Row.ofKind(UPDATE_AFTER, "E", 6)))
.testResult(
source ->
"SELECT f0, array_agg(f1) FROM " + source + " GROUP BY f0",
TableApiAggSpec.groupBySelect(
Collections.singletonList($("f0")),
$("f0"),
$("f1").arrayAgg()),
ROW(STRING(), ARRAY(INT())),
ROW(STRING(), ARRAY(INT())),
Arrays.asList(
Row.of("A", new Integer[] {1, 2}),
Row.of("B", new Integer[] {2, 2, 3}),
Row.of("C", new Integer[] {3}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why here is just

Row.of("C", new Integer[] {3}),

based on input I would expect

Row.of("C", new Integer[] {3, null}),

or did I miss anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also similar query for postgres

with input(a, c) as (
select 'a', 1
union all 
select 'a', 2
union all 
select 'c', 3
union all 
select 'c', null
)
select a, array_agg(distinct c) from input group by a

gives

a|array_agg|
-+---------+
a|{1,2}    |
c|{3,NULL} |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- calcite
ARRAY_AGG([ ALL | DISTINCT ] value [ RESPECT NULLS | IGNORE NULLS ] [ ORDER BY orderItem [, orderItem ]* ] )
-- flink
ARRAY_AGG([ ALL | DISTINCT ] expression)

This function simplifies some aspects compared to calcite.

  1. Currently, the Flink parser does not support parsing RESPECT NULLS | IGNORE NULLS, but it is still possible to make changes to support it.
  2. AggregateUtil#extractDistinctInformation 848~858 ignores the ignoreNulls fields.
    AggregateCall.create(
    call.getAggregation,
    false,
    false,
    false,
    call.getArgList,
    -1, // remove filterArg
    null,
    RelCollations.EMPTY,
    call.getType,
    call.getName)
  3. ListView does not support null values.
    * <p>Note: Elements of a {@link ListView} must not be null. For heap-based state backends, {@code
    * hashCode/equals} of the original (i.e. external) class are used. However, the serialization
    * format will use internal data structures.

For that reasons I made some simplification. If we need it, I can make some attempts.

Copy link
Contributor

@snuyanzin snuyanzin Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to support it only partially however the main concern here:
I would expect more consistent result
since there is also input Row.ofKind(INSERT, "D", null), and expected value Row.of("D", null),
So I would expect for ths either containing nulls or both not containing null however not mixed

UPD: to be more clear:
after playing with Postgres and BigQuery I noticed that both respect nulls by default and for BigQuery to make it ignoring nulls it should be specified explicitely. Is there a reason why for Flink it is done differently? Probably it's better to have similar behaviour

I don't tell that we need to support RESPECT NULLS | IGNORE NULLS syntax, however need to make RESPECT NULLS default behaviour to be on same page with vendors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snuyanzin, I'm trying to support RESPECT NULLS | IGNORE NULLS syntax.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin
ListView<T> cannot hold null values, and since it is a generic List, it cannot be represented by a specific object for null values. Is there any good solution for this?

Copy link
Contributor

@dawidwys dawidwys Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a very good point. It would be nice to be compatible with SQL and other vendors.

One idea is we could keep the null indices and set those while retrieving the result. Happy to hear better solutions though.

Another is to wrap all values in a GenericRowData, but we'd need to iterate over the data to unwrap it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was also thinking about storing null indexes

Copy link
Contributor Author

@Jiabao-Sun Jiabao-Sun Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dawidwys, @snuyanzin,

I referenced LagAggFunction to use LinkedList to hold null values and it works. Now RESPECT NULLS and IGNORE NULLS are supported.

public static class LagAcc<T> {
public int offset = 1;
public T defaultValue = null;
public LinkedList<T> buffer = new LinkedList<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of the solution is that the array must fit into memory at all times. The difference with the LagAggFunction is that LAG keeps at most n elements where n is controlled by the user.

Still I am reasonably good with the LinkedList approach because it anyhow needs to fit into memory when we emit it at the end as a single record. Writing this down for awareness.

Row.of("D", new Integer[] {null}),
Row.of("E", new Integer[] {6})))
.testSqlResult(
source ->
"SELECT f0, array_agg(DISTINCT f1 IGNORE NULLS) FROM "
+ source
+ " GROUP BY f0",
ROW(STRING(), ARRAY(INT())),
Arrays.asList(
Row.of("A", new Integer[] {1, 2}),
Row.of("B", new Integer[] {2, 3}),
Row.of("C", new Integer[] {3}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here
based on input I would expect

Row.of("C", new Integer[] {3, null}),

Row.of("D", null),
Row.of("E", new Integer[] {6}))));
}
}
Loading