From 9b921d27a7f5a6a00cd61577ed9610a69d638a0d Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 28 Jul 2021 14:30:46 +0800 Subject: [PATCH] [FLINK-19739][table-runtime-blink] Fix compile exception for window aggregate in batch jobs --- .../agg/batch/HashWindowCodeGenerator.scala | 11 +++++- .../agg/batch/WindowCodeGenerator.scala | 10 +---- .../batch/sql/agg/WindowAggregateITCase.scala | 37 +++++++++++++++++++ 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala index cebfc738f7b05..a08610636c39d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala @@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.runtime.operators.sort.QuickSort import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.table.api.Types -import org.apache.flink.table.data.RowData +import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.table.data.binary.BinaryRowData +import org.apache.flink.table.data.utils.JoinedRowData import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, newName} import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect @@ -611,6 +612,14 @@ class HashWindowCodeGenerator( """.stripMargin } + private def getOutputRowClass: Class[_ <: RowData] = { + if (namedProperties.isEmpty && grouping.isEmpty && isFinal) { + classOf[GenericRowData] + } else { + classOf[JoinedRowData] + } + } + private def genOutputDirectly( windowSize: Long, inputTerm: String, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala index b28c8130d1084..59ccf2cd80bd7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala @@ -26,7 +26,7 @@ import org.apache.commons.math3.util.ArithmeticUtils import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.data.utils.JoinedRowData -import org.apache.flink.table.data.{GenericRowData, RowData} +import org.apache.flink.table.data.GenericRowData import org.apache.flink.table.expressions.ExpressionUtils.extractValue import org.apache.flink.table.expressions.{Expression, ValueLiteralExpression} import org.apache.flink.table.functions.AggregateFunction @@ -97,14 +97,6 @@ abstract class WindowCodeGenerator( (groupKeyRowType.getChildren :+ timestampInternalType).toArray, (groupKeyRowType.getFieldNames :+ "assignedTs$").toArray) - def getOutputRowClass: Class[_ <: RowData] = { - if (namedProperties.isEmpty && grouping.isEmpty) { - classOf[GenericRowData] - } else { - classOf[JoinedRowData] - } - } - private[flink] def getWindowsGroupingElementInfo( enablePreAccumulate: Boolean = true): RowType = { if (enablePreAccumulate) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala index 2e013e8da84f7..11f245ddc7d54 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala @@ -22,14 +22,18 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime import org.apache.flink.table.planner.utils.{CountAggFunction, IntAvgAggFunction, IntSumAggFunction} +import org.apache.flink.types.Row import org.junit.{Before, Test} +import java.time.LocalDateTime + class WindowAggregateITCase extends BatchTestBase { @Before @@ -641,4 +645,37 @@ class WindowAggregateITCase extends BatchTestBase { checkResult(sqlQuery, Seq()) } + + @Test + def testLocalGlobalWindowAggregateWithoutGroupingAndNamedProperties(): Unit = { + val data: Seq[Row] = Seq( + row(1, LocalDateTime.of(2021, 7, 26, 0, 0, 0)), + row(2, LocalDateTime.of(2021, 7, 26, 0, 0, 3)), + row(3, LocalDateTime.of(2021, 7, 26, 0, 0, 6)), + row(4, LocalDateTime.of(2021, 7, 26, 0, 0, 4)), + row(5, LocalDateTime.of(2021, 7, 26, 0, 0, 5)), + row(6, LocalDateTime.of(2021, 7, 26, 0, 0, 8)), + row(7, LocalDateTime.of(2021, 7, 26, 0, 0, 10))) + val dataId = TestValuesTableFactory.registerData(data) + val ddl = + s""" + |CREATE TABLE MyTable ( + | a INT, + | ts TIMESTAMP + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId', + | 'bounded' = 'true' + |) + |""".stripMargin + + tEnv.executeSql(ddl) + checkResult( + """ + |SELECT sum(a) FROM MyTable + |GROUP BY + |TUMBLE(ts, interval '5' seconds) + |""".stripMargin, + Seq(row(14), row(7), row(7))) + } }