Skip to content

Commit

Permalink
[FLINK-19739][table-runtime-blink] Fix compile exception for window a…
Browse files Browse the repository at this point in the history
…ggregate in batch jobs
  • Loading branch information
tsreaper committed Jul 28, 2021
1 parent f07bfa6 commit 9b921d2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
Expand Up @@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.runtime.operators.sort.QuickSort
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.table.api.Types
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.{GenericRowData, RowData}
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.utils.JoinedRowData
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, newName}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
Expand Down Expand Up @@ -611,6 +612,14 @@ class HashWindowCodeGenerator(
""".stripMargin
}

private def getOutputRowClass: Class[_ <: RowData] = {
if (namedProperties.isEmpty && grouping.isEmpty && isFinal) {
classOf[GenericRowData]
} else {
classOf[JoinedRowData]
}
}

private def genOutputDirectly(
windowSize: Long,
inputTerm: String,
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.commons.math3.util.ArithmeticUtils
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.utils.JoinedRowData
import org.apache.flink.table.data.{GenericRowData, RowData}
import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.expressions.ExpressionUtils.extractValue
import org.apache.flink.table.expressions.{Expression, ValueLiteralExpression}
import org.apache.flink.table.functions.AggregateFunction
Expand Down Expand Up @@ -97,14 +97,6 @@ abstract class WindowCodeGenerator(
(groupKeyRowType.getChildren :+ timestampInternalType).toArray,
(groupKeyRowType.getFieldNames :+ "assignedTs$").toArray)

def getOutputRowClass: Class[_ <: RowData] = {
if (namedProperties.isEmpty && grouping.isEmpty) {
classOf[GenericRowData]
} else {
classOf[JoinedRowData]
}
}

private[flink] def getWindowsGroupingElementInfo(
enablePreAccumulate: Boolean = true): RowType = {
if (enablePreAccumulate) {
Expand Down
Expand Up @@ -22,14 +22,18 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
import org.apache.flink.table.planner.utils.{CountAggFunction, IntAvgAggFunction, IntSumAggFunction}
import org.apache.flink.types.Row

import org.junit.{Before, Test}

import java.time.LocalDateTime

class WindowAggregateITCase extends BatchTestBase {

@Before
Expand Down Expand Up @@ -641,4 +645,37 @@ class WindowAggregateITCase extends BatchTestBase {

checkResult(sqlQuery, Seq())
}

@Test
def testLocalGlobalWindowAggregateWithoutGroupingAndNamedProperties(): Unit = {
val data: Seq[Row] = Seq(
row(1, LocalDateTime.of(2021, 7, 26, 0, 0, 0)),
row(2, LocalDateTime.of(2021, 7, 26, 0, 0, 3)),
row(3, LocalDateTime.of(2021, 7, 26, 0, 0, 6)),
row(4, LocalDateTime.of(2021, 7, 26, 0, 0, 4)),
row(5, LocalDateTime.of(2021, 7, 26, 0, 0, 5)),
row(6, LocalDateTime.of(2021, 7, 26, 0, 0, 8)),
row(7, LocalDateTime.of(2021, 7, 26, 0, 0, 10)))
val dataId = TestValuesTableFactory.registerData(data)
val ddl =
s"""
|CREATE TABLE MyTable (
| a INT,
| ts TIMESTAMP
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'true'
|)
|""".stripMargin

tEnv.executeSql(ddl)
checkResult(
"""
|SELECT sum(a) FROM MyTable
|GROUP BY
|TUMBLE(ts, interval '5' seconds)
|""".stripMargin,
Seq(row(14), row(7), row(7)))
}
}

0 comments on commit 9b921d2

Please sign in to comment.