diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index d3dbd058cd639..53e70a106104a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -1335,16 +1335,17 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State def testMinMaxWithChar(): Unit = { val data = List( - rowOf(1, "a"), - rowOf(1, "b"), - rowOf(2, "d"), - rowOf(2, "c") + rowOf(1, "a", "gg"), + rowOf(1, "b", "hh"), + rowOf(2, "d", "j"), + rowOf(2, "c", "i") ) val dataId = TestValuesTableFactory.registerData(data) tEnv.executeSql(s""" |CREATE TABLE src( | `id` INT, - | `char` CHAR(1) + | `char1` CHAR(1), + | `char2` CHAR(2) |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId' @@ -1353,14 +1354,14 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State val sql = """ - |select `id`, count(*), min(`char`), max(`char`) from src group by `id` + |select `id`, count(*), min(`char1`), max(`char1`), min(`char2`), max(`char2`) from src group by `id` """.stripMargin val sink = new TestingRetractSink() tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink) env.execute() - val expected = List("1,2,a,b", "2,2,c,d") + val expected = List("1,2,a,b,gg,hh", "2,2,c,d,i,j") assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } @@ -1368,24 +1369,25 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State def testRetractMinMaxWithChar(): Unit = { val data = List( - changelogRow("+I", Int.box(1), "a"), - changelogRow("+I", Int.box(1), "b"), - changelogRow("+I", Int.box(1), "c"), - changelogRow("-D", Int.box(1), "c"), - changelogRow("-D", Int.box(1), "a"), - changelogRow("+I", Int.box(2), "a"), - changelogRow("+I", Int.box(2), "b"), - changelogRow("+I", Int.box(2), "c"), - changelogRow("-U", Int.box(2), "b"), - changelogRow("+U", Int.box(2), "d"), - changelogRow("-U", Int.box(2), "a"), - changelogRow("+U", Int.box(2), "b") + changelogRow("+I", Int.box(1), "a", "ee"), + changelogRow("+I", Int.box(1), "b", "ff"), + changelogRow("+I", Int.box(1), "c", "gg"), + changelogRow("-D", Int.box(1), "c", "gg"), + changelogRow("-D", Int.box(1), "a", "ee"), + changelogRow("+I", Int.box(2), "a", "e"), + changelogRow("+I", Int.box(2), "b", "f"), + changelogRow("+I", Int.box(2), "c", "g"), + changelogRow("-U", Int.box(2), "b", "f"), + changelogRow("+U", Int.box(2), "d", "h"), + changelogRow("-U", Int.box(2), "a", "e"), + changelogRow("+U", Int.box(2), "b", "f") ) val dataId = TestValuesTableFactory.registerData(data) tEnv.executeSql(s""" |CREATE TABLE src( | `id` INT, - | `char` CHAR(1) + | `char1` CHAR(1), + | `char2` CHAR(2) |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', @@ -1395,14 +1397,14 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State val sql = """ - |select `id`, count(*), min(`char`), max(`char`) from src group by `id` + |select `id`, count(*), min(`char1`), max(`char1`), min(`char2`), max(`char2`) from src group by `id` """.stripMargin val sink = new TestingRetractSink() tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink) env.execute() - val expected = List("1,1,b,b", "2,3,b,d") + val expected = List("1,1,b,b,ff,ff", "2,3,b,d,f,h") assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) }