diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala index a1844b0e4fd289..4d5b2c3d1226c3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -188,10 +188,16 @@ class TableConfig { this } + /** + * Returns the minimum time until state which was not updated will be retained. + */ def getMinIdleStateRetentionTime: Long = { this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS) } + /** + * Returns the maximum time until state which was not updated will be retained. + */ def getMaxIdleStateRetentionTime: Long = { // only min idle ttl provided. if (this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MS) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala index bacd3629acde5c..58d56b84ba4724 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala @@ -26,8 +26,7 @@ import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.plan.util.KeySelectorUtil import org.apache.flink.table.runtime.bundle.KeyedMapBundleOperator import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger -import org.apache.flink.table.runtime.deduplicate.{DeduplicateFunction, -MiniBatchDeduplicateFunction} +import org.apache.flink.table.runtime.deduplicate.{DeduplicateKeepFirstRowFunction, DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepFirstRowFunction, MiniBatchDeduplicateKeepLastRowFunction} import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules import org.apache.flink.table.typeutils.BaseRowTypeInfo @@ -95,10 +94,7 @@ class StreamExecDeduplicate( val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(getInput) if (inputIsAccRetract) { - throw new TableException( - "Deduplicate: Retraction on Deduplicate is not supported yet.\n" + - "please re-check sql grammar. \n" + - "Note: Deduplicate should not follow a non-windowed GroupBy aggregation.") + throw new TableException("Deduplicate doesn't support retraction input stream currently.") } val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv) @@ -111,11 +107,12 @@ class StreamExecDeduplicate( TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY) > 0 val operator = if (isMiniBatchEnabled) { val exeConfig = tableEnv.execEnv.getConfig - val processFunction = new MiniBatchDeduplicateFunction( - rowTypeInfo, - generateRetraction, - rowTypeInfo.createSerializer(exeConfig), - keepLastRow) + val rowSerializer = rowTypeInfo.createSerializer(exeConfig) + val processFunction = if (keepLastRow) { + new MiniBatchDeduplicateKeepLastRowFunction(rowTypeInfo, generateRetraction, rowSerializer) + } else { + new MiniBatchDeduplicateKeepFirstRowFunction(rowSerializer) + } val trigger = new CountBundleTrigger[BaseRow]( tableConfig.getConf.getLong(TableConfigOptions.SQL_EXEC_MINIBATCH_SIZE)) new KeyedMapBundleOperator( @@ -124,12 +121,12 @@ class StreamExecDeduplicate( } else { val minRetentionTime = tableConfig.getMinIdleStateRetentionTime val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime - val processFunction = new DeduplicateFunction( - minRetentionTime, - maxRetentionTime, - rowTypeInfo, - generateRetraction, - keepLastRow) + val processFunction = if (keepLastRow) { + new DeduplicateKeepLastRowFunction(minRetentionTime, maxRetentionTime, rowTypeInfo, + generateRetraction) + } else { + new DeduplicateKeepFirstRowFunction(minRetentionTime, maxRetentionTime) + } new KeyedProcessOperator[BaseRow, BaseRow, BaseRow](processFunction) } val ret = new OneInputTransformation( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala index ea2b5e48bd8c84..fba6bd9233e584 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala @@ -111,7 +111,6 @@ class StreamExecRank( .item("select", getRowType.getFieldNames.mkString(", ")) } - //~ ExecNode methods ----------------------------------------------------------- override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamOptimizer.scala index 19fbd3df9021ac..822359c5ca5206 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamOptimizer.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamOptimizer.scala @@ -41,6 +41,7 @@ class StreamOptimizer(tEnv: StreamTableEnvironment) extends Optimizer { n.sink match { case _: RetractStreamTableSink[_] => true case s: DataStreamTableSink[_] => s.updatesAsRetraction + case _ => false } case o => o.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).sendsUpdatesAsRetractions diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/RankITCase.scala index 2e8225651a864b..ff046bc90a390f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/RankITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/RankITCase.scala @@ -125,9 +125,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 2 """.stripMargin - val sink = new TestingUpsertTableSink(Array(0, 3)) val table = tEnv.sqlQuery(sql) - writeToSink(table, sink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -186,9 +188,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 3 """.stripMargin - val sink = new TestingUpsertTableSink(Array(0, 3)) val table = tEnv.sqlQuery(sql) - writeToSink(table, sink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val updatedExpected = List( @@ -248,7 +252,9 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val sink = new TestingUpsertTableSink(Array(0, 3)) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) tEnv.writeToSink(table, sink) env.execute() @@ -292,8 +298,10 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val sink = new TestingUpsertTableSink(Array(0, 3)) - writeToSink(table, sink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val updatedExpected = List( @@ -344,8 +352,10 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val sink = new TestingUpsertTableSink(Array(0, 3)) - writeToSink(table, sink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val updatedExpected = List( @@ -387,15 +397,17 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val tableSink = new TestingUpsertTableSink(Array(0, 3)) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val updatedExpected = List( "book,2,19,2", "fruit,5,34,2") - assertEquals(updatedExpected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -524,8 +536,10 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val tableSink = new TestingUpsertTableSink(Array(0, 1)) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 1)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -536,7 +550,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "fruit,1,3,5", "fruit,2,2,4", "fruit,3,1,3") - assertEquals(expected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(expected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -581,14 +595,16 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin val table = tEnv.sqlQuery(sql) - val tableSink = new TestingUpsertTableSink(Array(0, 1)) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 1)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( "book,3,2,2", "fruit,3,1,3") - assertEquals(expected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(expected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -638,9 +654,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 4 """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0)) val table = tEnv.sqlQuery(sql2) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -657,10 +675,10 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,3,book,d,4,2)", "(true,4,book,b,3,2)", "(true,4,book,b,3,2)") - assertEquals(expected.mkString("\n"), tableSink.getRawResults.mkString("\n")) + assertEquals(expected.mkString("\n"), sink.getRawResults.mkString("\n")) val expected2 = List("1,fruit,b,6,1", "2,book,e,5,1", "3,book,d,4,2", "4,book,b,3,2") - assertEquals(expected2, tableSink.getUpsertResults.sorted) + assertEquals(expected2, sink.getUpsertResults.sorted) } // FIXME @@ -699,9 +717,12 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode """.stripMargin tEnv.getConfig.getConf.setLong(TableConfigOptions.SQL_EXEC_TOPN_CACHE_SIZE, 1) - val tableSink = new TestingUpsertTableSink(Array(0)) + val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -724,7 +745,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,3,book,b,3,2)", "(true,4,book,a,1,2)") - assertEquals(expected, tableSink.getRawResults) + assertEquals(expected, sink.getRawResults) } // FIXME @@ -768,9 +789,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= topSize """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0, 1)) val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 1)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -780,7 +803,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "book,4,1,3", "fruit,1,3,5", "fruit,2,2,4") - assertEquals(expected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(expected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -829,9 +852,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 3 """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0, 3)) val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 3)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -862,14 +887,14 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,book,12,10,3)") - assertEquals(expected.mkString("\n"), tableSink.getRawResults.mkString("\n")) + assertEquals(expected.mkString("\n"), sink.getRawResults.mkString("\n")) val updatedExpected = List( "book,2,9,1", "book,7,10,2", "book,12,10,3") - assertEquals(updatedExpected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -906,9 +931,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 3 """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0, 1)) val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 1)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -924,7 +951,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,book,1,225.0)", "(true,fruit,5,100.0)") - assertEquals(expected, tableSink.getRawResults) + assertEquals(expected, sink.getRawResults) val updatedExpected = List( "book,1,225.0", @@ -932,7 +959,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "book,4,310.0", "fruit,5,100.0") - assertEquals(updatedExpected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -977,9 +1004,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 3 """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0, 1)) val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 1)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -1002,7 +1031,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,fruit,4,2)", "(true,fruit,5,2)", "(true,fruit,5,3)") - assertEquals(expected, tableSink.getRawResults) + assertEquals(expected, sink.getRawResults) val updatedExpected = List( "book,4,5", @@ -1011,7 +1040,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "fruit,5,3", "fruit,4,2", "fruit,3,1") - assertEquals(updatedExpected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } @Test @@ -1040,9 +1069,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode |WHERE rank_num <= 2 """.stripMargin - val tableSink = new TestingUpsertTableSink(Array(0, 2)) val table = tEnv.sqlQuery(sql) - writeToSink(table, tableSink) + val schema = table.getSchema + val sink = new TestingUpsertTableSink(Array(0, 2)). + configure(schema.getFieldNames, schema.getFieldTypes) + tEnv.writeToSink(table, sink) env.execute() val expected = List( @@ -1054,14 +1085,14 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,fruit,44,3)", "(false,fruit,33,4)", "(true,fruit,40,1)") - assertEquals(expected, tableSink.getRawResults) + assertEquals(expected, sink.getRawResults) val updatedExpected = List( "book,19,2", "book,20,5", "fruit,40,1", "fruit,44,3") - assertEquals(updatedExpected.sorted, tableSink.getUpsertResults.sorted) + assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } // FIXME @@ -1113,7 +1144,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode assertEquals(expected1.sorted, sink1.getRetractResults.sorted) val sink2 = new TestingRetractSink - val table2 = tEnv.sqlQuery( + tEnv.sqlQuery( s""" |SELECT * |FROM ( @@ -1122,7 +1153,6 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode | FROM MyView) |WHERE rank_num <= 2 |""".stripMargin).toRetractStream[Row].addSink(sink2).setParallelism(1) - env.execute() val expected2 = List( @@ -1160,7 +1190,6 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode val t1 = tEnv.sqlQuery(subquery) tEnv.registerTable("MyView", t1) - val sink1 = new TestingUpsertTableSink(Array(0, 3)) val table1 = tEnv.sqlQuery( s""" |SELECT * @@ -1170,9 +1199,12 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode | FROM MyView) |WHERE rank_num <= 2 |""".stripMargin) - writeToSink(table1, sink1) + val schema1 = table1.getSchema + val sink1 = new TestingUpsertTableSink(Array(0, 3)). + configure(schema1.getFieldNames, schema1 + .getFieldTypes) + tEnv.writeToSink(table1, sink1) - val sink2 = new TestingUpsertTableSink(Array(0, 3)) val table2 = tEnv.sqlQuery( s""" |SELECT * @@ -1182,9 +1214,13 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode | FROM MyView) |WHERE rank_num <= 2 |""".stripMargin) + val schema2 = table2.getSchema + val sink2 = new TestingUpsertTableSink(Array(0, 3)). + configure(schema2.getFieldNames, schema2 + .getFieldTypes) + tEnv.writeToSink(table2, sink2) - writeToSink(table2, sink2) - + env.execute() val expected1 = List( "book,1,25,1", "book,2,19,2", @@ -1227,7 +1263,6 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode val t1 = tEnv.sqlQuery(subquery) tEnv.registerTable("MyView", t1) - val sink1 = new TestingRetractTableSink val table1 = tEnv.sqlQuery( s""" |SELECT * @@ -1237,9 +1272,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode | FROM MyView) |WHERE rank_num <= 2 |""".stripMargin) - writeToSink(table1, sink1) + val schema1 = table1.getSchema + val sink1 = new TestingRetractTableSink(). + configure(schema1.getFieldNames, schema1.getFieldTypes) + tEnv.writeToSink(table1, sink1) - val sink2 = new TestingRetractTableSink val table2 = tEnv.sqlQuery( s""" |SELECT * @@ -1249,8 +1286,11 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode | FROM MyView) |WHERE rank_num <= 2 |""".stripMargin) - - writeToSink(table2, sink2) + val schema2 = table2.getSchema + val sink2 = new TestingRetractTableSink(). + configure(schema2.getFieldNames, schema2.getFieldTypes) + tEnv.writeToSink(table2, sink2) + env.execute() val expected1 = List( "book,1,2,1", diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala index e3fc360a90ddd5..e91a1523b3ac99 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala @@ -293,8 +293,7 @@ final class TestingUpsertTableSink(keys: Array[Int], tz: TimeZone) override def configure( fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]) - : TableSink[JTuple2[JBoolean, BaseRow]] = { + fieldTypes: Array[TypeInformation[_]]): TestingUpsertTableSink = { val copy = new TestingUpsertTableSink(keys, tz) copy.fNames = fieldNames copy.fTypes = fieldTypes @@ -501,7 +500,7 @@ final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink override def configure( fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBoolean, Row]] = { + fieldTypes: Array[TypeInformation[_]]): TestingRetractTableSink = { val copy = new TestingRetractTableSink(tz) copy.fNames = fieldNames copy.fTypes = fieldTypes diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala index 94b814fe833a80..1335e897bda1b1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala @@ -20,9 +20,7 @@ package org.apache.flink.table.runtime.utils import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.{Table, TableImpl} import org.apache.flink.table.api.scala.StreamTableEnvironment -import org.apache.flink.table.sinks.TableSink import org.apache.flink.test.util.AbstractTestBase import org.junit.rules.{ExpectedException, TemporaryFolder} @@ -55,8 +53,4 @@ class StreamingTestBase extends AbstractTestBase { this.tEnv = StreamTableEnvironment.create(env) } - def writeToSink(table: Table, sink: TableSink[_]): Unit = { - TableUtil.writeToSink(table.asInstanceOf[TableImpl], sink) - } - } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala index ae80d9c0525bfb..9a3836ba767034 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala @@ -109,32 +109,6 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB failingDataSource(result)(newTypeInfo.asInstanceOf[TypeInformation[BaseRow]]) } - /** - * Creates a DataStream from the given non-empty [[Seq]]. - */ - def retainStateDataSource[T: TypeInformation](data: Seq[T]): DataStream[T] = { - env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE) - env.setRestartStrategy(RestartStrategies.noRestart()) - env.setParallelism(1) - // reset failedBefore flag to false - FailingCollectionSource.reset() - - require(data != null, "Data must not be null.") - val typeInfo = implicitly[TypeInformation[T]] - - val collection = scala.collection.JavaConversions.asJavaCollection(data) - // must not have null elements and mixed elements - FromElementsFunction.checkCollection(data, typeInfo.getTypeClass) - - val function = new FailingCollectionSource[T]( - typeInfo.createSerializer(env.getConfig), - collection, - data.length, // fail after half elements - true) - - env.addSource(function)(typeInfo).setMaxParallelism(1) - } - /** * Creates a DataStream from the given non-empty [[Seq]]. */ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TableUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TableUtil.scala index 1522bb63d38ec0..4595ac81841bd4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TableUtil.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TableUtil.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.`type`.TypeConverters.createExternalTypeInfoFromInternalType import org.apache.flink.table.api.{BatchTableEnvironment, TableImpl} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sinks.{CollectRowTableSink, CollectTableSink, TableSink} +import org.apache.flink.table.sinks.{CollectRowTableSink, CollectTableSink} import org.apache.flink.types.Row import _root_.scala.collection.JavaConversions._ @@ -61,14 +61,4 @@ object TableUtil { table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName) } - def writeToSink(table: TableImpl, sink: TableSink[_]): Unit = { - // get schema information of table - val rowType = table.getRelNode.getRowType - val fieldNames = rowType.getFieldNames.asScala.toArray - val fieldTypes = rowType.getFieldList - .map(field => FlinkTypeFactory.toInternalType(field.getType)).toArray - val configuredSink = sink.configure( - fieldNames, fieldTypes.map(createExternalTypeInfoFromInternalType)) - table.tableEnv.writeToSink(table, configuredSink) - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java index 831a2cfbd63897..1c554bd5c1497d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java @@ -39,21 +39,18 @@ class DeduplicateFunctionHelper { * @param out underlying collector * @throws Exception */ - static void processLastRow(BaseRow currentRow, boolean generateRetraction, ValueState state, + static void processLastRow(BaseRow currentRow, boolean generateRetraction, ValueState state, Collector out) throws Exception { - // should be accumulate msg + // Check message should be accumulate Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow)); if (generateRetraction) { // state stores complete row if generateRetraction is true - BaseRow preRow = (BaseRow) state.value(); + BaseRow preRow = state.value(); state.update(currentRow); if (preRow != null) { preRow.setHeader(BaseRowUtil.RETRACT_MSG); out.collect(preRow); } - } else { - // state stores a flag to indicator whether pk appears before - state.update(true); } out.collect(currentRow); } @@ -66,9 +63,9 @@ static void processLastRow(BaseRow currentRow, boolean generateRetraction, Value * @param out underlying collector * @throws Exception */ - static void processFirstRow(BaseRow currentRow, ValueState state, Collector out) + static void processFirstRow(BaseRow currentRow, ValueState state, Collector out) throws Exception { - // should be accumulate msg. + // Check message should be accumulate Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow)); // ignore record with timestamp bigger than preRow if (state.value() != null) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunction.java similarity index 58% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunction.java index 1467b1c9f0e99a..14feaf981fd320 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunction.java @@ -24,49 +24,30 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; -import org.apache.flink.table.typeutils.BaseRowTypeInfo; import org.apache.flink.util.Collector; import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow; -import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow; /** - * This function is used to deduplicate on keys and keeps only first row or last row. + * This function is used to deduplicate on keys and keeps only first row. */ -public class DeduplicateFunction +public class DeduplicateKeepFirstRowFunction extends KeyedProcessFunctionWithCleanupState { - private static final long serialVersionUID = 4950071982706870944L; + private static final long serialVersionUID = 5865777137707602549L; - private final BaseRowTypeInfo rowTypeInfo; - private final boolean generateRetraction; - private final boolean keepLastRow; + // state stores a boolean flag to indicate whether key appears before. + private ValueState state; - // state stores complete row if keep last row and generate retraction is true, - // else stores a flag to indicate whether key appears before. - private ValueState state; - - public DeduplicateFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo rowTypeInfo, - boolean generateRetraction, boolean keepLastRow) { + public DeduplicateKeepFirstRowFunction(long minRetentionTime, long maxRetentionTime) { super(minRetentionTime, maxRetentionTime); - this.rowTypeInfo = rowTypeInfo; - this.generateRetraction = generateRetraction; - this.keepLastRow = keepLastRow; } @Override public void open(Configuration configure) throws Exception { super.open(configure); - String stateName = keepLastRow ? "DeduplicateFunctionKeepLastRow" : "DeduplicateFunctionKeepFirstRow"; - initCleanupTimeState(stateName); - ValueStateDescriptor stateDesc = null; - if (keepLastRow && generateRetraction) { - // if need generate retraction and keep last row, stores complete row into state - stateDesc = new ValueStateDescriptor("deduplicateFunction", rowTypeInfo); - } else { - // else stores a flag to indicator whether pk appears before. - stateDesc = new ValueStateDescriptor("fistValueState", Types.BOOLEAN); - } + initCleanupTimeState("DeduplicateFunctionKeepFirstRow"); + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>("existsState", Types.BOOLEAN); state = getRuntimeContext().getState(stateDesc); } @@ -75,12 +56,7 @@ public void processElement(BaseRow input, Context ctx, Collector out) t long currentTime = ctx.timerService().currentProcessingTime(); // register state-cleanup timer registerProcessingCleanupTimer(ctx, currentTime); - - if (keepLastRow) { - processLastRow(input, generateRetraction, state, out); - } else { - processFirstRow(input, state, out); - } + processFirstRow(input, state, out); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunction.java new file mode 100644 index 00000000000000..3dcde66eb5ea67 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.deduplicate; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; + +import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow; + +/** + * This function is used to deduplicate on keys and keeps only last row. + */ +public class DeduplicateKeepLastRowFunction + extends KeyedProcessFunctionWithCleanupState { + + private static final long serialVersionUID = -291348892087180350L; + private final BaseRowTypeInfo rowTypeInfo; + private final boolean generateRetraction; + + // state stores complete row. + private ValueState state; + + public DeduplicateKeepLastRowFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo rowTypeInfo, + boolean generateRetraction) { + super(minRetentionTime, maxRetentionTime); + this.rowTypeInfo = rowTypeInfo; + this.generateRetraction = generateRetraction; + } + + @Override + public void open(Configuration configure) throws Exception { + super.open(configure); + if (generateRetraction) { + // state stores complete row if need generate retraction, otherwise do not need a state + initCleanupTimeState("DeduplicateFunctionKeepLastRow"); + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>("preRowState", rowTypeInfo); + state = getRuntimeContext().getState(stateDesc); + } + } + + @Override + public void processElement(BaseRow input, Context ctx, Collector out) throws Exception { + if (generateRetraction) { + long currentTime = ctx.timerService().currentProcessingTime(); + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, currentTime); + } + processLastRow(input, generateRetraction, state, out); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + if (stateCleaningEnabled) { + cleanupState(state); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunction.java new file mode 100644 index 00000000000000..b97d1a670ac3bd --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.deduplicate; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.bundle.MapBundleFunction; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.util.Collector; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow; + +/** + * This function is used to get the first row for every key partition in miniBatch mode. + */ +public class MiniBatchDeduplicateKeepFirstRowFunction + extends MapBundleFunction { + + private static final long serialVersionUID = -7994602893547654994L; + + private final TypeSerializer typeSerializer; + + // state stores a boolean flag to indicate whether key appears before. + private ValueState state; + + public MiniBatchDeduplicateKeepFirstRowFunction(TypeSerializer typeSerializer) { + this.typeSerializer = typeSerializer; + } + + @Override + public void open(ExecutionContext ctx) throws Exception { + super.open(ctx); + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>("existsState", Types.BOOLEAN); + state = ctx.getRuntimeContext().getState(stateDesc); + } + + @Override + public BaseRow addInput(@Nullable BaseRow value, BaseRow input) { + if (value == null) { + // put the input into buffer + return typeSerializer.copy(input); + } else { + // the input is not first row, ignore it + return value; + } + } + + @Override + public void finishBundle( + Map buffer, Collector out) throws Exception { + for (Map.Entry entry : buffer.entrySet()) { + BaseRow currentKey = entry.getKey(); + BaseRow currentRow = entry.getValue(); + ctx.setCurrentKey(currentKey); + processFirstRow(currentRow, state, out); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java similarity index 57% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunction.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java index 8b8f9832e1748c..c1f2ec40cf5b8b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.runtime.bundle.MapBundleFunction; @@ -32,59 +31,41 @@ import java.util.Map; -import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow; import static org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow; /** - * This function is used to get the first row or last row for every key partition in miniBatch - * mode. + * This function is used to get the last row for every key partition in miniBatch mode. */ -public class MiniBatchDeduplicateFunction +public class MiniBatchDeduplicateKeepLastRowFunction extends MapBundleFunction { - private BaseRowTypeInfo rowTypeInfo; - private boolean generateRetraction; - private boolean keepLastRow; + private static final long serialVersionUID = -8981813609115029119L; - // state stores complete row if keep last row and generate retraction is true, - // else stores a flag to indicate whether key appears before. - private ValueState state; - private TypeSerializer ser; + private final BaseRowTypeInfo rowTypeInfo; + private final boolean generateRetraction; + private final TypeSerializer typeSerializer; - public MiniBatchDeduplicateFunction( - BaseRowTypeInfo rowTypeInfo, - boolean generateRetraction, - TypeSerializer typeSerializer, - boolean keepLastRow) { + // state stores complete row. + private ValueState state; + + public MiniBatchDeduplicateKeepLastRowFunction(BaseRowTypeInfo rowTypeInfo, boolean generateRetraction, + TypeSerializer typeSerializer) { this.rowTypeInfo = rowTypeInfo; - this.keepLastRow = keepLastRow; this.generateRetraction = generateRetraction; - ser = typeSerializer; + this.typeSerializer = typeSerializer; } @Override public void open(ExecutionContext ctx) throws Exception { super.open(ctx); - ValueStateDescriptor stateDesc = null; - if (keepLastRow && generateRetraction) { - // if need generate retraction and keep last row, stores complete row into state - stateDesc = new ValueStateDescriptor("deduplicateFunction", rowTypeInfo); - } else { - // else stores a flag to indicator whether pk appears before. - stateDesc = new ValueStateDescriptor("fistValueState", Types.BOOLEAN); - } + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>("preRowState", rowTypeInfo); state = ctx.getRuntimeContext().getState(stateDesc); } @Override public BaseRow addInput(@Nullable BaseRow value, BaseRow input) { - if (value == null || keepLastRow || (!keepLastRow && value == null)) { - // put the input into buffer - return ser.copy(input); - } else { - // the input is not last row, ignore it - return value; - } + // always put the input into buffer + return typeSerializer.copy(input); } @Override @@ -94,12 +75,7 @@ public void finishBundle( BaseRow currentKey = entry.getKey(); BaseRow currentRow = entry.getValue(); ctx.setCurrentKey(currentKey); - - if (keepLastRow) { - processLastRow(currentRow, generateRetraction, state, out); - } else { - processFirstRow(currentRow, state, out); - } + processLastRow(currentRow, generateRetraction, state, out); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowKeySelector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowKeySelector.java index 2b51e7a2db55a6..3d7887a57afcd7 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowKeySelector.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowKeySelector.java @@ -25,7 +25,7 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo; /** - * A utility class which will extract key from BaseRow. + * A utility class which extracts key from BaseRow. The key type is BinaryRow. */ public class BinaryRowKeySelector implements BaseRowKeySelector { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/NullBinaryRowKeySelector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/NullBinaryRowKeySelector.java index ed83f9e2adfa59..795bdbf5a37968 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/NullBinaryRowKeySelector.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/NullBinaryRowKeySelector.java @@ -23,10 +23,12 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo; /** - * A utility class which key is always empty. + * A utility class which key is always empty no matter what the input row is. */ public class NullBinaryRowKeySelector implements BaseRowKeySelector { + private static final long serialVersionUID = -2079386198687082032L; + private final BaseRowTypeInfo returnType = new BaseRowTypeInfo(); @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java index 431d25fb8f977f..5c44133ded5a6f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java @@ -155,7 +155,7 @@ public void open(Configuration parameters) throws Exception { outputRow = new JoinedRow(); if (!isConstantRankEnd) { - ValueStateDescriptor rankStateDesc = new ValueStateDescriptor("rankEnd", Types.LONG); + ValueStateDescriptor rankStateDesc = new ValueStateDescriptor<>("rankEnd", Types.LONG); rankEndState = getRuntimeContext().getState(rankStateDesc); } // compile equaliser @@ -223,7 +223,7 @@ protected boolean checkSortKeyInBufferRange(BaseRow sortKey, TopNBuffer buffer) if (compare < 0) { return true; } else { - return buffer.getCurrentTopNum() < getMaxSizeOfBuffer(); + return buffer.getCurrentTopNum() < getDefaultTopNSize(); } } } @@ -231,24 +231,10 @@ protected boolean checkSortKeyInBufferRange(BaseRow sortKey, TopNBuffer buffer) protected void registerMetric(long heapSize) { getRuntimeContext().getMetricGroup().>gauge( "topn.cache.hitRate", - new Gauge() { - - @Override - public Double getValue() { - return requestCount == 0 ? 1.0 : - Long.valueOf(hitCount).doubleValue() / requestCount; - } - }); + () -> requestCount == 0 ? 1.0 : Long.valueOf(hitCount).doubleValue() / requestCount); getRuntimeContext().getMetricGroup().>gauge( - "topn.cache.size", - new Gauge() { - - @Override - public Long getValue() { - return heapSize; - } - }); + "topn.cache.size", () -> heapSize); } protected void collect(Collector out, BaseRow inputRow) { @@ -313,13 +299,6 @@ private BaseRow createOutputRow(BaseRow inputRow, long rank, byte header) { } } - /** - * Gets buffer size limit. Implementations may vary depending on each rank who has in-memory buffer. - * - * @return buffer size limit - */ - protected abstract long getMaxSizeOfBuffer(); - /** * Sets keyContext to RankFunction. * diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AppendRankFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AppendRankFunction.java index 6464637e2a8717..73e7edb631a3c9 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AppendRankFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AppendRankFunction.java @@ -40,7 +40,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Supplier; /** * AppendRankFunction's input stream only contains append record. @@ -82,7 +81,7 @@ public void open(Configuration parameters) throws Exception { LOG.info("Top{} operator is using LRU caches key-size: {}", getDefaultTopNSize(), lruCacheSize); ListTypeInfo valueTypeInfo = new ListTypeInfo<>(inputRowType); - MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor( + MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( "data-state-with-append", sortKeyType, valueTypeInfo); dataState = getRuntimeContext().getMapState(mapStateDescriptor); @@ -129,23 +128,12 @@ public void onTimer( } } - @Override - protected long getMaxSizeOfBuffer() { - return getDefaultTopNSize(); - } - private void initHeapStates() throws Exception { requestCount += 1; BaseRow currentKey = (BaseRow) keyContext.getCurrentKey(); buffer = kvSortedMap.get(currentKey); if (buffer == null) { - buffer = new TopNBuffer(sortKeyComparator, new Supplier>() { - - @Override - public Collection get() { - return new ArrayList<>(); - } - }); + buffer = new TopNBuffer(sortKeyComparator, ArrayList::new); kvSortedMap.put(currentKey, buffer); // restore buffer Iterator>> iter = dataState.iterator(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/RetractRankFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/RetractRankFunction.java index 21c3e934ab0041..eb0250d83de25b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/RetractRankFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/RetractRankFunction.java @@ -81,13 +81,13 @@ public RetractRankFunction(long minRetentionTime, long maxRetentionTime, BaseRow public void open(Configuration parameters) throws Exception { super.open(parameters); ListTypeInfo valueTypeInfo = new ListTypeInfo<>(inputRowType); - MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor( + MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( "data-state", sortKeyType, valueTypeInfo); dataState = getRuntimeContext().getMapState(mapStateDescriptor); - ValueStateDescriptor> valueStateDescriptor = new ValueStateDescriptor( + ValueStateDescriptor> valueStateDescriptor = new ValueStateDescriptor<>( "sorted-map", - new SortedMapTypeInfo(sortKeyType, BasicTypeInfo.LONG_TYPE_INFO, sortKeyComparator)); + new SortedMapTypeInfo<>(sortKeyType, BasicTypeInfo.LONG_TYPE_INFO, sortKeyComparator)); treeMap = getRuntimeContext().getState(valueStateDescriptor); } @@ -242,9 +242,4 @@ private void emitRecordsWithRowNumber( } } - @Override - protected long getMaxSizeOfBuffer() { - // just let it go, retract rank has no interest in this - return 0L; - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/UpdateRankFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/UpdateRankFunction.java index 1965e52e2ea2c8..a78b00d5e11ead 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/UpdateRankFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/UpdateRankFunction.java @@ -49,7 +49,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.function.Supplier; /** * A fast version of rank process function which only hold top n data in state, and keep sorted map in heap. @@ -104,20 +103,20 @@ public UpdateRankFunction(long minRetentionTime, long maxRetentionTime, BaseRowT @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - int lruCacheSize = Math.max(1, (int) (cacheSize / getMaxSizeOfBuffer())); + int lruCacheSize = Math.max(1, (int) (cacheSize / getDefaultTopNSize())); // make sure the cached map is in a fixed size, avoid OOM kvSortedMap = new HashMap<>(lruCacheSize); kvRowKeyMap = new LRUMap<>(lruCacheSize, new CacheRemovalListener()); - LOG.info("Top{} operator is using LRU caches key-size: {}", getMaxSizeOfBuffer(), lruCacheSize); + LOG.info("Top{} operator is using LRU caches key-size: {}", getDefaultTopNSize(), lruCacheSize); TupleTypeInfo> valueTypeInfo = new TupleTypeInfo<>(inputRowType, Types.INT); - MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor( + MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( "data-state-with-update", rowKeyType, valueTypeInfo); dataState = getRuntimeContext().getMapState(mapStateDescriptor); // metrics - registerMetric(kvSortedMap.size() * getMaxSizeOfBuffer()); + registerMetric(kvSortedMap.size() * getDefaultTopNSize()); } @Override @@ -157,11 +156,6 @@ public void processElement( } } - @Override - protected long getMaxSizeOfBuffer() { - return getDefaultTopNSize(); - } - @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Iterator>> iter = kvRowKeyMap.entrySet().iterator(); @@ -170,7 +164,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { BaseRow partitionKey = entry.getKey(); Map currentRowKeyMap = entry.getValue(); keyContext.setCurrentKey(partitionKey); - synchronizeState(currentRowKeyMap); + flushBufferToState(currentRowKeyMap); } } @@ -180,13 +174,7 @@ private void initHeapStates() throws Exception { buffer = kvSortedMap.get(partitionKey); rowKeyMap = kvRowKeyMap.get(partitionKey); if (buffer == null) { - buffer = new TopNBuffer(sortKeyComparator, new Supplier>() { - - @Override - public Collection get() { - return new LinkedHashSet<>(); - } - }); + buffer = new TopNBuffer(sortKeyComparator, LinkedHashSet::new); rowKeyMap = new HashMap<>(); kvSortedMap.put(partitionKey, buffer); kvRowKeyMap.put(partitionKey, rowKeyMap); @@ -432,7 +420,7 @@ private void processElementWithoutRowNumber(BaseRow inputRow, Collector } } - private void synchronizeState(Map curRowKeyMap) throws Exception { + private void flushBufferToState(Map curRowKeyMap) throws Exception { Iterator> iter = curRowKeyMap.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); @@ -473,7 +461,7 @@ public void onRemoval(Map.Entry> eldest) { keyContext.setCurrentKey(partitionKey); kvSortedMap.remove(partitionKey); try { - synchronizeState(currentRowKeyMap); + flushBufferToState(currentRowKeyMap); } catch (Throwable e) { LOG.error("Fail to synchronize state!", e); throw new RuntimeException(e); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunctionTest.java new file mode 100644 index 00000000000000..f23acd727a00d2 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepFirstRowFunctionTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.deduplicate; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor; +import org.apache.flink.table.runtime.util.BinaryRowKeySelector; +import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; +import org.apache.flink.table.type.InternalTypes; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.record; + +/** + * Tests for {@link DeduplicateKeepFirstRowFunction}. + */ +public class DeduplicateKeepFirstRowFunctionTest { + + private Time minTime = Time.milliseconds(10); + private Time maxTime = Time.milliseconds(20); + + private BaseRowTypeInfo inputRowType = new BaseRowTypeInfo(InternalTypes.STRING, InternalTypes.LONG, + InternalTypes.INT); + + private int rowKeyIdx = 1; + private BinaryRowKeySelector rowKeySelector = new BinaryRowKeySelector(new int[] { rowKeyIdx }, + inputRowType.getInternalTypes()); + + private BaseRowHarnessAssertor assertor = new BaseRowHarnessAssertor( + inputRowType.getFieldTypes(), + new GenericRowRecordSortComparator(rowKeyIdx, inputRowType.getInternalTypes()[rowKeyIdx])); + + private OneInputStreamOperatorTestHarness createTestHarness( + DeduplicateKeepFirstRowFunction func) + throws Exception { + KeyedProcessOperator operator = new KeyedProcessOperator<>(func); + return new KeyedOneInputStreamOperatorTestHarness<>(operator, rowKeySelector, rowKeySelector.getProducedType()); + } + + @Test + public void test() throws Exception { + DeduplicateKeepFirstRowFunction func = new DeduplicateKeepFirstRowFunction(minTime.toMilliseconds(), + maxTime.toMilliseconds()); + OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); + testHarness.open(); + testHarness.processElement(record("book", 1L, 12)); + testHarness.processElement(record("book", 2L, 11)); + testHarness.processElement(record("book", 1L, 13)); + testHarness.close(); + + // Keep FirstRow in deduplicate will not send retraction + List expectedOutputOutput = new ArrayList<>(); + expectedOutputOutput.add(record("book", 1L, 12)); + expectedOutputOutput.add(record("book", 2L, 11)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); + } + +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunctionTest.java similarity index 62% rename from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionTest.java rename to flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunctionTest.java index 6210f1945f83ca..f6c3419185f437 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/DeduplicateKeepLastRowFunctionTest.java @@ -38,9 +38,9 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.retractRecord; /** - * Tests for {@link DeduplicateFunction}. + * Tests for {@link DeduplicateKeepLastRowFunction}. */ -public class DeduplicateFunctionTest { +public class DeduplicateKeepLastRowFunctionTest { private Time minTime = Time.milliseconds(10); private Time maxTime = Time.milliseconds(20); @@ -56,55 +56,21 @@ public class DeduplicateFunctionTest { inputRowType.getFieldTypes(), new GenericRowRecordSortComparator(rowKeyIdx, inputRowType.getInternalTypes()[rowKeyIdx])); - private DeduplicateFunction createFunction(boolean generateRetraction, boolean keepLastRow) { - DeduplicateFunction func = new DeduplicateFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), - inputRowType, generateRetraction, keepLastRow); - return func; + private DeduplicateKeepLastRowFunction createFunction(boolean generateRetraction) { + return new DeduplicateKeepLastRowFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), inputRowType, + generateRetraction); } private OneInputStreamOperatorTestHarness createTestHarness( - DeduplicateFunction func) + DeduplicateKeepLastRowFunction func) throws Exception { - KeyedProcessOperator operator = new KeyedProcessOperator(func); - return new KeyedOneInputStreamOperatorTestHarness(operator, rowKeySelector, rowKeySelector.getProducedType()); + KeyedProcessOperator operator = new KeyedProcessOperator<>(func); + return new KeyedOneInputStreamOperatorTestHarness<>(operator, rowKeySelector, rowKeySelector.getProducedType()); } @Test - public void testKeepFirstRowWithoutGenerateRetraction() throws Exception { - DeduplicateFunction func = createFunction(false, false); - OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); - testHarness.open(); - testHarness.processElement(record("book", 1L, 12)); - testHarness.processElement(record("book", 2L, 11)); - testHarness.processElement(record("book", 1L, 13)); - testHarness.close(); - - List expectedOutputOutput = new ArrayList<>(); - expectedOutputOutput.add(record("book", 1L, 12)); - expectedOutputOutput.add(record("book", 2L, 11)); - assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); - } - - @Test - public void testKeepFirstRowWithGenerateRetraction() throws Exception { - DeduplicateFunction func = createFunction(true, false); - OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); - testHarness.open(); - testHarness.processElement(record("book", 1L, 12)); - testHarness.processElement(record("book", 2L, 11)); - testHarness.processElement(record("book", 1L, 13)); - testHarness.close(); - - // Keep FirstRow in deduplicate will not send retraction - List expectedOutputOutput = new ArrayList<>(); - expectedOutputOutput.add(record("book", 1L, 12)); - expectedOutputOutput.add(record("book", 2L, 11)); - assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); - } - - @Test - public void testKeepLastWithoutGenerateRetraction() throws Exception { - DeduplicateFunction func = createFunction(false, true); + public void testWithoutGenerateRetraction() throws Exception { + DeduplicateKeepLastRowFunction func = createFunction(false); OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); testHarness.open(); testHarness.processElement(record("book", 1L, 12)); @@ -120,8 +86,8 @@ public void testKeepLastWithoutGenerateRetraction() throws Exception { } @Test - public void testKeepLastRowWithGenerateRetraction() throws Exception { - DeduplicateFunction func = createFunction(true, true); + public void testWithGenerateRetraction() throws Exception { + DeduplicateKeepLastRowFunction func = createFunction(true); OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); testHarness.open(); testHarness.processElement(record("book", 1L, 12)); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunctionTest.java new file mode 100644 index 00000000000000..af3dbde80c9ce9 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepFirstRowFunctionTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.deduplicate; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.bundle.KeyedMapBundleOperator; +import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger; +import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor; +import org.apache.flink.table.runtime.util.BinaryRowKeySelector; +import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; +import org.apache.flink.table.type.InternalTypes; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.record; + +/** + * Tests for {@link MiniBatchDeduplicateKeepFirstRowFunction}. + */ +public class MiniBatchDeduplicateKeepFirstRowFunctionTest { + + private BaseRowTypeInfo inputRowType = new BaseRowTypeInfo(InternalTypes.STRING, InternalTypes.LONG, + InternalTypes.INT); + + private int rowKeyIdx = 1; + private BinaryRowKeySelector rowKeySelector = new BinaryRowKeySelector(new int[] { rowKeyIdx }, + inputRowType.getInternalTypes()); + + + private BaseRowHarnessAssertor assertor = new BaseRowHarnessAssertor( + inputRowType.getFieldTypes(), + new GenericRowRecordSortComparator(rowKeyIdx, inputRowType.getInternalTypes()[rowKeyIdx])); + + private TypeSerializer typeSerializer = inputRowType.createSerializer(new ExecutionConfig()); + + private OneInputStreamOperatorTestHarness createTestHarness( + MiniBatchDeduplicateKeepFirstRowFunction func) + throws Exception { + CountBundleTrigger> trigger = new CountBundleTrigger<>(3); + KeyedMapBundleOperator op = new KeyedMapBundleOperator(func, trigger); + return new KeyedOneInputStreamOperatorTestHarness<>(op, rowKeySelector, rowKeySelector.getProducedType()); + } + + @Test + public void testKeepFirstRowWithGenerateRetraction() throws Exception { + MiniBatchDeduplicateKeepFirstRowFunction func = new MiniBatchDeduplicateKeepFirstRowFunction(typeSerializer); + OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); + testHarness.open(); + testHarness.processElement(record("book", 1L, 12)); + testHarness.processElement(record("book", 2L, 11)); + + // output is empty because bundle not trigger yet. + Assert.assertTrue(testHarness.getOutput().isEmpty()); + + testHarness.processElement(record("book", 1L, 13)); + + // Keep FirstRow in deduplicate will not send retraction + List expectedOutputOutput = new ArrayList<>(); + expectedOutputOutput.add(record("book", 1L, 12)); + expectedOutputOutput.add(record("book", 2L, 11)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); + testHarness.close(); + } + +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java similarity index 67% rename from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunctionTest.java rename to flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java index e47525db45e844..efbf33552da8fa 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java @@ -42,9 +42,9 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.retractRecord; /** - * Tests for {@link MiniBatchDeduplicateFunction}. + * Tests for {@link MiniBatchDeduplicateKeepLastRowFunction}. */ -public class MiniBatchDeduplicateFunctionTest { +public class MiniBatchDeduplicateKeepLastRowFunctionTest { private BaseRowTypeInfo inputRowType = new BaseRowTypeInfo(InternalTypes.STRING, InternalTypes.LONG, InternalTypes.INT); @@ -60,64 +60,21 @@ public class MiniBatchDeduplicateFunctionTest { private TypeSerializer typeSerializer = inputRowType.createSerializer(new ExecutionConfig()); - private MiniBatchDeduplicateFunction createFunction(boolean generateRetraction, boolean keepLastRow) { - MiniBatchDeduplicateFunction func = new MiniBatchDeduplicateFunction(inputRowType, generateRetraction, - typeSerializer, keepLastRow); - return func; + private MiniBatchDeduplicateKeepLastRowFunction createFunction(boolean generateRetraction) { + return new MiniBatchDeduplicateKeepLastRowFunction(inputRowType, generateRetraction, typeSerializer); } private OneInputStreamOperatorTestHarness createTestHarness( - MiniBatchDeduplicateFunction func) + MiniBatchDeduplicateKeepLastRowFunction func) throws Exception { CountBundleTrigger> trigger = new CountBundleTrigger<>(3); KeyedMapBundleOperator op = new KeyedMapBundleOperator(func, trigger); - return new KeyedOneInputStreamOperatorTestHarness(op, rowKeySelector, rowKeySelector.getProducedType()); + return new KeyedOneInputStreamOperatorTestHarness<>(op, rowKeySelector, rowKeySelector.getProducedType()); } @Test - public void testKeepFirstRowWithoutGenerateRetraction() throws Exception { - MiniBatchDeduplicateFunction func = createFunction(false, false); - OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); - testHarness.open(); - testHarness.processElement(record("book", 1L, 12)); - testHarness.processElement(record("book", 2L, 11)); - - // output is empty because bundle not trigger yet. - Assert.assertTrue(testHarness.getOutput().isEmpty()); - - testHarness.processElement(record("book", 1L, 13)); - // output is not empty because bundle is trigger. - List expectedOutputOutput = new ArrayList<>(); - expectedOutputOutput.add(record("book", 1L, 12)); - expectedOutputOutput.add(record("book", 2L, 11)); - assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); - testHarness.close(); - } - - @Test - public void testKeepFirstRowWithGenerateRetraction() throws Exception { - MiniBatchDeduplicateFunction func = createFunction(true, false); - OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); - testHarness.open(); - testHarness.processElement(record("book", 1L, 12)); - testHarness.processElement(record("book", 2L, 11)); - - // output is empty because bundle not trigger yet. - Assert.assertTrue(testHarness.getOutput().isEmpty()); - - testHarness.processElement(record("book", 1L, 13)); - - // Keep FirstRow in deduplicate will not send retraction - List expectedOutputOutput = new ArrayList<>(); - expectedOutputOutput.add(record("book", 1L, 12)); - expectedOutputOutput.add(record("book", 2L, 11)); - assertor.assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); - testHarness.close(); - } - - @Test - public void testKeepLastWithoutGenerateRetraction() throws Exception { - MiniBatchDeduplicateFunction func = createFunction(false, true); + public void testWithoutGenerateRetraction() throws Exception { + MiniBatchDeduplicateKeepLastRowFunction func = createFunction(false); OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); testHarness.open(); testHarness.processElement(record("book", 1L, 10)); @@ -144,8 +101,8 @@ public void testKeepLastWithoutGenerateRetraction() throws Exception { } @Test - public void testKeepLastRowWithGenerateRetraction() throws Exception { - MiniBatchDeduplicateFunction func = createFunction(true, true); + public void testWithGenerateRetraction() throws Exception { + MiniBatchDeduplicateKeepLastRowFunction func = createFunction(true); OneInputStreamOperatorTestHarness testHarness = createTestHarness(func); testHarness.open(); testHarness.processElement(record("book", 1L, 10)); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/AppendRankFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/AppendRankFunctionTest.java index 288be224693435..49b9366c71f515 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/AppendRankFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/AppendRankFunctionTest.java @@ -37,10 +37,9 @@ public class AppendRankFunctionTest extends BaseRankFunctionTest { @Override protected AbstractRankFunction createRankFunction(RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber) { - AbstractRankFunction rankFunction = new AppendRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), + return new AppendRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), inputRowType, sortKeyComparator, sortKeySelector, rankType, rankRange, generatedEqualiser, generateRetraction, outputRankNumber, cacheSize); - return rankFunction; } @Test diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/BaseRankFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/BaseRankFunctionTest.java index 0d766a546c686b..deff2ac2ef98db 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/BaseRankFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/BaseRankFunctionTest.java @@ -50,9 +50,9 @@ */ abstract class BaseRankFunctionTest { - protected Time minTime = Time.milliseconds(10); - protected Time maxTime = Time.milliseconds(20); - protected long cacheSize = 10000L; + Time minTime = Time.milliseconds(10); + Time maxTime = Time.milliseconds(20); + long cacheSize = 10000L; BaseRowTypeInfo inputRowType = new BaseRowTypeInfo( InternalTypes.STRING, @@ -323,12 +323,12 @@ public void testConstantRankRangeWithoutOffset() throws Exception { .assertOutputEqualsSorted("output wrong.", expectedOutputOutput, testHarness.getOutput()); } - protected OneInputStreamOperatorTestHarness createTestHarness( + OneInputStreamOperatorTestHarness createTestHarness( AbstractRankFunction rankFunction) throws Exception { - KeyedProcessOperator operator = new KeyedProcessOperator(rankFunction); + KeyedProcessOperator operator = new KeyedProcessOperator<>(rankFunction); rankFunction.setKeyContext(operator); - return new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, keySelector.getProducedType()); + return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keySelector.getProducedType()); } protected abstract AbstractRankFunction createRankFunction(RankType rankType, RankRange rankRange, diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/RetractRankFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/RetractRankFunctionTest.java index c068412470a02a..ed38f631267ead 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/RetractRankFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/RetractRankFunctionTest.java @@ -39,10 +39,9 @@ public class RetractRankFunctionTest extends BaseRankFunctionTest { @Override protected AbstractRankFunction createRankFunction(RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber) { - AbstractRankFunction rankFunction = new RetractRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), + return new RetractRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), inputRowType, sortKeyComparator, sortKeySelector, rankType, rankRange, generatedEqualiser, generateRetraction, outputRankNumber); - return rankFunction; } @Test diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/UpdateRankFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/UpdateRankFunctionTest.java index b657dab26d85c8..92b76cb8eb9551 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/UpdateRankFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/rank/UpdateRankFunctionTest.java @@ -38,11 +38,9 @@ public class UpdateRankFunctionTest extends BaseRankFunctionTest { @Override protected AbstractRankFunction createRankFunction(RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber) { - - AbstractRankFunction rankFunction = new UpdateRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), + return new UpdateRankFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), inputRowType, rowKeySelector, sortKeyComparator, sortKeySelector, rankType, rankRange, generatedEqualiser, generateRetraction, outputRankNumber, cacheSize); - return rankFunction; } @Test diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java index e697ed80c227a0..46475662a0b51a 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java @@ -28,10 +28,12 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo; /** - * A utility class which will extract key from BaseRow. + * A utility class which extracts key from BaseRow. */ public class BinaryRowKeySelector implements BaseRowKeySelector { + private static final long serialVersionUID = -2327761762415377059L; + private final int[] keyFields; private final InternalType[] inputFieldTypes; private final InternalType[] keyFieldTypes; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java index c0338f1a4453a2..5f4deb0818d2e4 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java @@ -69,10 +69,10 @@ public class WindowOperatorTest { // For counting if close() is called the correct number of times on the SumReducer private static AtomicInteger closeCalled = new AtomicInteger(0); - private InternalType[] inputFieldTypes = new InternalType[] { + private InternalType[] inputFieldTypes = new InternalType[]{ InternalTypes.STRING, InternalTypes.INT, - InternalTypes.LONG }; + InternalTypes.LONG}; private BaseRowTypeInfo outputType = new BaseRowTypeInfo( InternalTypes.STRING,