From 7a43b004e35706397b696b6061307c30834783bb Mon Sep 17 00:00:00 2001 From: xuyang Date: Mon, 17 Jan 2022 12:27:02 +0800 Subject: [PATCH] [FLINK-25476][table-planner] support CHAR type in function MAX and MIN --- .../plan/utils/AggFunctionFactory.scala | 12 +++---- .../MaxWithRetractAggFunctionTest.java | 26 +++++++++++++++ .../MinWithRetractAggFunctionTest.java | 26 +++++++++++++++ .../runtime/stream/sql/AggregateITCase.scala | 33 +++++++++++++++++++ 4 files changed, 91 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index cbabddae03a44..861f537f6c24c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFu import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction import org.apache.flink.table.planner.functions.sql.{SqlFirstLastValueAggFunction, SqlListAggFunction} import org.apache.flink.table.planner.functions.utils.AggSqlFunction -import org.apache.flink.table.runtime.functions.aggregate.{BuiltInAggregateFunction, CollectAggFunction, FirstValueAggFunction, FirstValueWithRetractAggFunction, JsonArrayAggFunction, JsonObjectAggFunction, LagAggFunction, LastValueAggFunction, LastValueWithRetractAggFunction, ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction, MaxWithRetractAggFunction, MinWithRetractAggFunction} +import org.apache.flink.table.runtime.functions.aggregate._ import org.apache.flink.table.runtime.functions.aggregate.BatchApproxCountDistinctAggFunctions._ import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.LogicalTypeRoot._ @@ -273,8 +273,8 @@ class AggFunctionFactory( val valueType = argTypes(0) if (aggCallNeedRetractions(index)) { valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE | + case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | CHAR | + DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => new MinWithRetractAggFunction(argTypes(0)) case t => @@ -382,8 +382,8 @@ class AggFunctionFactory( val valueType = argTypes(0) if (aggCallNeedRetractions(index)) { valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE | + case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | CHAR | + DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => new MaxWithRetractAggFunction(argTypes(0)) case t => @@ -407,7 +407,7 @@ class AggFunctionFactory( new MaxAggFunction.DoubleMaxAggFunction case BOOLEAN => new MaxAggFunction.BooleanMaxAggFunction - case VARCHAR => + case VARCHAR | CHAR => new MaxAggFunction.StringMaxAggFunction case DATE => new MaxAggFunction.DateMaxAggFunction diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunctionTest.java index 06527f6f0d975..4f450308b823e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunctionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.functions.aggregate.MaxWithRetractAggFunction.MaxWithRetractAccumulator; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DoubleType; @@ -317,6 +318,31 @@ protected List getExpectedResults() { } } + /** Test for {@link CharType}. */ + public static final class CharMaxWithRetractAggFunctionTest + extends MaxWithRetractAggFunctionTestBase { + + @Override + protected List> getInputValueSets() { + return Arrays.asList( + Arrays.asList('b', 'c', null, 'd', 'e', null), + Arrays.asList(null, null), + Arrays.asList(null, 'w'), + Arrays.asList('d', 'a')); + } + + @Override + protected List getExpectedResults() { + return Arrays.asList('e', null, 'w', 'd'); + } + + @Override + protected AggregateFunction> + getAggregator() { + return new MaxWithRetractAggFunction<>(DataTypes.CHAR(1).getLogicalType()); + } + } + /** Test for {@link TimestampType}. */ @Nested final class TimestampMaxWithRetractAggFunctionTest diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MinWithRetractAggFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MinWithRetractAggFunctionTest.java index 3efb6f95b7507..aff96f3fc979a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MinWithRetractAggFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/MinWithRetractAggFunctionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.functions.aggregate.MinWithRetractAggFunction.MinWithRetractAccumulator; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DoubleType; @@ -317,6 +318,31 @@ protected List getExpectedResults() { } } + /** Test for {@link CharType}. */ + public static final class CharMinWithRetractAggFunctionTest + extends MinWithRetractAggFunctionTestBase { + + @Override + protected List> getInputValueSets() { + return Arrays.asList( + Arrays.asList('b', 'c', null, 'd', 'e', null), + Arrays.asList(null, null), + Arrays.asList(null, 'w'), + Arrays.asList('d', 'a')); + } + + @Override + protected List getExpectedResults() { + return Arrays.asList('b', null, 'w', 'a'); + } + + @Override + protected AggregateFunction> + getAggregator() { + return new MinWithRetractAggFunction<>(DataTypes.CHAR(1).getLogicalType()); + } + } + /** Test for {@link TimestampType}. */ @Nested final class TimestampMinWithRetractAggFunctionTest diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index c2783437f069a..9927bcd794a64 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -1331,6 +1331,39 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + @TestTemplate + def testMinMaxWithChar(): Unit = { + val data = + List( + rowOf(1, "a"), + rowOf(1, "b"), + rowOf(2, "d"), + rowOf(2, "c") + ) + val dataId = TestValuesTableFactory.registerData(data) + tEnv.executeSql(s""" + |CREATE TABLE src( + | `id` INT, + | `char` CHAR(1) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + |""".stripMargin) + + val sql = + """ + |select `id`, count(*), min(`char`), max(`char`) from src group by `id` + """.stripMargin + + val sink = new TestingRetractSink() + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink) + env.execute() + + val expected = List("1,2,a,b", "2,2,c,d") + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + @TestTemplate def testCollectOnClusteredFields(): Unit = { val data = List(