Skip to content

Commit

Permalink
[FLINK-25476][table-planner] Support CHAR type in built-in function M…
Browse files Browse the repository at this point in the history
…AX and MIN

This closes #18375
  • Loading branch information
xuyangzhong authored and lsyldliu committed Dec 22, 2023
1 parent 7d8f62f commit 085859e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFu
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction
import org.apache.flink.table.planner.functions.sql.{SqlFirstLastValueAggFunction, SqlListAggFunction}
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
import org.apache.flink.table.runtime.functions.aggregate.{BuiltInAggregateFunction, CollectAggFunction, FirstValueAggFunction, FirstValueWithRetractAggFunction, JsonArrayAggFunction, JsonObjectAggFunction, LagAggFunction, LastValueAggFunction, LastValueWithRetractAggFunction, ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction, MaxWithRetractAggFunction, MinWithRetractAggFunction}
import org.apache.flink.table.runtime.functions.aggregate._
import org.apache.flink.table.runtime.functions.aggregate.BatchApproxCountDistinctAggFunctions._
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.LogicalTypeRoot._
Expand Down Expand Up @@ -273,8 +273,8 @@ class AggFunctionFactory(
val valueType = argTypes(0)
if (aggCallNeedRetractions(index)) {
valueType.getTypeRoot match {
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL |
TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | CHAR |
DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
new MinWithRetractAggFunction(argTypes(0))
case t =>
Expand Down Expand Up @@ -382,8 +382,8 @@ class AggFunctionFactory(
val valueType = argTypes(0)
if (aggCallNeedRetractions(index)) {
valueType.getTypeRoot match {
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL |
TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | CHAR |
DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
new MaxWithRetractAggFunction(argTypes(0))
case t =>
Expand All @@ -407,7 +407,7 @@ class AggFunctionFactory(
new MaxAggFunction.DoubleMaxAggFunction
case BOOLEAN =>
new MaxAggFunction.BooleanMaxAggFunction
case VARCHAR =>
case VARCHAR | CHAR =>
new MaxAggFunction.StringMaxAggFunction
case DATE =>
new MaxAggFunction.DateMaxAggFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,83 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testMinMaxWithChar(): Unit = {
val data =
List(
rowOf(1, "a", "gg"),
rowOf(1, "b", "hh"),
rowOf(2, "d", "j"),
rowOf(2, "c", "i")
)
val dataId = TestValuesTableFactory.registerData(data)
tEnv.executeSql(s"""
|CREATE TABLE src(
| `id` INT,
| `char1` CHAR(1),
| `char2` CHAR(2)
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId'
|)
|""".stripMargin)

val sql =
"""
|select `id`, count(*), min(`char1`), max(`char1`), min(`char2`), max(`char2`) from src group by `id`
""".stripMargin

val sink = new TestingRetractSink()
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()

val expected = List("1,2,a,b,gg,hh", "2,2,c,d,i,j")
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testRetractMinMaxWithChar(): Unit = {
val data =
List(
changelogRow("+I", Int.box(1), "a", "ee"),
changelogRow("+I", Int.box(1), "b", "ff"),
changelogRow("+I", Int.box(1), "c", "gg"),
changelogRow("-D", Int.box(1), "c", "gg"),
changelogRow("-D", Int.box(1), "a", "ee"),
changelogRow("+I", Int.box(2), "a", "e"),
changelogRow("+I", Int.box(2), "b", "f"),
changelogRow("+I", Int.box(2), "c", "g"),
changelogRow("-U", Int.box(2), "b", "f"),
changelogRow("+U", Int.box(2), "d", "h"),
changelogRow("-U", Int.box(2), "a", "e"),
changelogRow("+U", Int.box(2), "b", "f")
)
val dataId = TestValuesTableFactory.registerData(data)
tEnv.executeSql(s"""
|CREATE TABLE src(
| `id` INT,
| `char1` CHAR(1),
| `char2` CHAR(2)
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'changelog-mode' = 'I,UA,UB,D'
|)
|""".stripMargin)

val sql =
"""
|select `id`, count(*), min(`char1`), max(`char1`), min(`char2`), max(`char2`) from src group by `id`
""".stripMargin

val sink = new TestingRetractSink()
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()

val expected = List("1,1,b,b,ff,ff", "2,3,b,d,f,h")
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testCollectOnClusteredFields(): Unit = {
val data = List(
Expand Down

0 comments on commit 085859e

Please sign in to comment.