Skip to content

Commit

Permalink
1.Update Deduplicate Function state.
Browse files Browse the repository at this point in the history
2.Other minor update.
  • Loading branch information
beyond1920 committed Apr 15, 2019
1 parent b3237f6 commit f2568cd
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ package org.apache.flink.table.plan.nodes.physical.stream
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.EqualiserCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.generated.GeneratedRecordEqualiser
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.util.KeySelectorUtil
import org.apache.flink.table.runtime.bundle.KeyedMapBundleOperator
import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
import org.apache.flink.table.runtime.deduplicate.{DeduplicateFunction,
MiniBatchDeduplicateFunction}
import org.apache.flink.table.`type`.TypeConverters
import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils.isRowTime

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
Expand All @@ -48,18 +43,18 @@ import scala.collection.JavaConversions._
* Stream physical RelNode which deduplicate on keys and keeps only first row or last row.
* This node is an optimization of [[StreamExecRank]] for some special cases.
* Compared to [[StreamExecRank]], this node could use mini-batch and access less state.
* <p>NOTES: only supports sort on proctime now.
* <p>NOTES: only supports sort on proctime now, sort on rowtime will not translated into
* StreamExecDeduplicate node.
*/
class StreamExecDeduplicate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
uniqueKeys: Array[Int],
isRowtime: Boolean,
keepLastRow: Boolean)
extends SingleRel(cluster, traitSet, inputRel)
with StreamPhysicalRel
with StreamExecNode[BaseRow] {
with StreamPhysicalRel
with StreamExecNode[BaseRow] {

def getUniqueKeys: Array[Int] = uniqueKeys

Expand All @@ -81,17 +76,15 @@ class StreamExecDeduplicate(
traitSet,
inputs.get(0),
uniqueKeys,
isRowtime,
keepLastRow)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val fieldNames = getRowType.getFieldNames
val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
super.explainTerms(pw)
.item("keepLastRow", keepLastRow)
.item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
.item("order", orderString)
.item("order", "PROCTIME")
}

//~ ExecNode methods -----------------------------------------------------------
Expand All @@ -112,31 +105,17 @@ class StreamExecDeduplicate(
.asInstanceOf[StreamTransformation[BaseRow]]

val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]

val generateRetraction = StreamExecRetractionRules.isAccRetract(this)

val inputRowType = FlinkTypeFactory.toInternalRowType(getInput.getRowType)
val rowTimeFieldIndex = inputRowType.getFieldTypes.zipWithIndex
.filter(e => isRowTime(e._1))
.map(_._2)
if (rowTimeFieldIndex.size > 1) {
throw new RuntimeException("More than one row time field. Currently this is not supported!")
}
if (rowTimeFieldIndex.nonEmpty) {
throw new TableException("Currently not support Deduplicate on rowtime.")
}
val tableConfig = tableEnv.getConfig
val isMiniBatchEnabled = tableConfig.getConf.getLong(
TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY) > 0
val generatedRecordEqualiser = generateRecordEqualiser(rowTypeInfo)
val operator = if (isMiniBatchEnabled) {
val exeConfig = tableEnv.execEnv.getConfig
val processFunction = new MiniBatchDeduplicateFunction(
rowTypeInfo,
generateRetraction,
rowTypeInfo.createSerializer(exeConfig),
keepLastRow,
generatedRecordEqualiser)
keepLastRow)
val trigger = new CountBundleTrigger[BaseRow](
tableConfig.getConf.getLong(TableConfigOptions.SQL_EXEC_MINIBATCH_SIZE))
new KeyedMapBundleOperator(
Expand All @@ -150,8 +129,7 @@ class StreamExecDeduplicate(
maxRetentionTime,
rowTypeInfo,
generateRetraction,
keepLastRow,
generatedRecordEqualiser)
keepLastRow)
new KeyedProcessOperator[BaseRow, BaseRow, BaseRow](processFunction)
}
val ret = new OneInputTransformation(
Expand All @@ -169,16 +147,8 @@ class StreamExecDeduplicate(
private def getOperatorName: String = {
val fieldNames = getRowType.getFieldNames
val keyNames = uniqueKeys.map(fieldNames.get).mkString(", ")
val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
s"${if (keepLastRow) "keepLastRow" else "KeepFirstRow"}" +
s": (key: ($keyNames), select: (${fieldNames.mkString(", ")}), order: ($orderString))"
}

private def generateRecordEqualiser(rowTypeInfo: BaseRowTypeInfo): GeneratedRecordEqualiser = {
val generator = new EqualiserCodeGenerator(
rowTypeInfo.getFieldTypes.map(TypeConverters.createInternalTypeFromTypeInfo))
val equaliserName = s"${if (keepLastRow) "LastRow" else "FirstRow"}ValueEqualiser"
generator.generateRecordEqualiser(equaliserName)
s": (key: ($keyNames), select: (${fieldNames.mkString(", ")}), order: (PROCTIME)"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.plan.nodes.physical.stream

import org.apache.flink.runtime.state.KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
Expand All @@ -44,10 +45,8 @@ class StreamExecExchange(
relNode: RelNode,
relDistribution: RelDistribution)
extends CommonPhysicalExchange(cluster, traitSet, relNode, relDistribution)
with StreamPhysicalRel
with StreamExecNode[BaseRow] {

private val DEFAULT_MAX_PARALLELISM = 1 << 7
with StreamPhysicalRel
with StreamExecNode[BaseRow] {

override def producesUpdates: Boolean = false

Expand Down Expand Up @@ -88,9 +87,11 @@ class StreamExecExchange(
transformation
case RelDistribution.Type.HASH_DISTRIBUTED =>
// TODO Eliminate duplicate keys

val selector = KeySelectorUtil.getBaseRowSelector(
relDistribution.getKeys.map(_.toInt).toArray, inputTypeInfo)
val partitioner = new KeyGroupStreamPartitioner(selector, DEFAULT_MAX_PARALLELISM)
val partitioner = new KeyGroupStreamPartitioner(selector,
DEFAULT_LOWER_BOUND_MAX_PARALLELISM)
val transformation = new PartitionTransformation(
inputTransform,
partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,21 @@ class StreamExecDeduplicateRule

override def convert(rel: RelNode): RelNode = {
val rank = rel.asInstanceOf[FlinkLogicalRank]
val fieldCollation = rank.orderKey.getFieldCollations.get(0)
val fieldType = rank.getInput.getRowType.getFieldList.get(fieldCollation.getFieldIndex).getType
val isRowtime = FlinkTypeFactory.isRowtimeIndicatorType(fieldType)

val requiredDistribution = FlinkRelDistribution.hash(rank.partitionKey.toList)
val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
.replace(FlinkConventions.STREAM_PHYSICAL)
.replace(requiredDistribution)
val convInput: RelNode = RelOptRule.convert(rank.getInput, requiredTraitSet)

// order by timeIndicator desc ==> lastRow, otherwise is firstRow
val fieldCollation = rank.orderKey.getFieldCollations.get(0)
val isLastRow = fieldCollation.direction.isDescending
val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
new StreamExecDeduplicate(
rel.getCluster,
providedTraitSet,
convInput,
rank.partitionKey.toArray,
isRowtime,
isLastRow)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,86 +87,4 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

// TODO Deduplicate does not support sort on rowtime now, so it is translated to Rank currently
@Test
def testFirstRowOnRowtime(): Unit = {
val data = List(
(3L, 2L, "Hello world", 3),
(2L, 2L, "Hello", 2),
(6L, 3L, "Luke Skywalker", 6),
(5L, 3L, "I am fine.", 5),
(7L, 4L, "Comment#1", 7),
(9L, 4L, "Comment#3", 9),
(10L, 4L, "Comment#4", 10),
(8L, 4L, "Comment#2", 8),
(1L, 1L, "Hi", 1),
(4L, 3L, "Helloworld, how are you?", 4))

val t = failingDataSource(data)
.assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
.toTable(tEnv, 'rowtime, 'key, 'str, 'int)
tEnv.registerTable("T", t)

val sql =
"""
|SELECT key, str, `int`
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY key ORDER BY rowtime) as rowNum
| FROM T
|)
|WHERE rowNum = 1
""".stripMargin

val sink = new TestingUpsertTableSink(Array(1))
val table = tEnv.sqlQuery(sql)
writeToSink(table, sink)

env.execute()
val expected = List("1,Hi,1", "2,Hello,2", "3,Helloworld, how are you?,4", "4,Comment#1,7")
assertEquals(expected.sorted, sink.getUpsertResults.sorted)
}

// TODO Deduplicate does not support sort on rowtime now, so it is translated to Rank currently
@Test
def testLastRowOnRowtime(): Unit = {
val data = List(
(3L, 2L, "Hello world", 3),
(2L, 2L, "Hello", 2),
(6L, 3L, "Luke Skywalker", 6),
(5L, 3L, "I am fine.", 5),
(7L, 4L, "Comment#1", 7),
(9L, 4L, "Comment#3", 9),
(10L, 4L, "Comment#4", 10),
(8L, 4L, "Comment#2", 8),
(1L, 1L, "Hi", 1),
(4L, 3L, "Helloworld, how are you?", 4))

val t = failingDataSource(data)
.assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
.toTable(tEnv, 'rowtime, 'key, 'str, 'int)
tEnv.registerTable("T", t)

val sql =
"""
|SELECT key, str, `int`
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY key ORDER BY rowtime DESC) as rowNum
| FROM T
|)
|WHERE rowNum = 1
""".stripMargin

val sink = new TestingUpsertTableSink(Array(1))
val table = tEnv.sqlQuery(sql)
writeToSink(table, sink)

env.execute()
val expected = List("1,Hi,1", "2,Hello world,3", "3,Luke Skywalker,6", "4,Comment#4,10")
assertEquals(expected.sorted, sink.getUpsertResults.sorted)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = List(
"book,1,12,2",
"book,2,19,1",
"book,1,12,2",
"fruit,3,44,1",
"fruit,4,33,2")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.generated.RecordEqualiser;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;
Expand All @@ -42,30 +41,33 @@ public class DeduplicateFunction
private final BaseRowTypeInfo rowTypeInfo;
private final boolean generateRetraction;
private final boolean keepLastRow;
private ValueState<BaseRow> pkRow;
private GeneratedRecordEqualiser generatedEqualiser;
private transient RecordEqualiser equaliser;

// state stores complete row if keep last row and generate retraction is true,
// else stores a flag to indicate whether key appears before.
private ValueState state;

public DeduplicateFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
boolean generateRetraction, boolean keepLastRow, GeneratedRecordEqualiser generatedEqualiser) {
boolean generateRetraction, boolean keepLastRow) {
super(minRetentionTime, maxRetentionTime);
this.rowTypeInfo = rowTypeInfo;
this.generateRetraction = generateRetraction;
this.keepLastRow = keepLastRow;
this.generatedEqualiser = generatedEqualiser;
}

@Override
public void open(Configuration configure) throws Exception {
super.open(configure);
String stateName = keepLastRow ? "DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
String stateName = keepLastRow ? "DeduplicateFunctionKeepLastRow" : "DeduplicateFunctionKeepFirstRow";
initCleanupTimeState(stateName);
ValueStateDescriptor rowStateDesc = new ValueStateDescriptor("rowState", rowTypeInfo);
pkRow = getRuntimeContext().getState(rowStateDesc);

// compile equaliser
equaliser = generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
generatedEqualiser = null;
ValueStateDescriptor stateDesc = null;
if (keepLastRow && generateRetraction) {
// if need generate retraction and keep last row, stores complete row into state
stateDesc = new ValueStateDescriptor("deduplicateFunction", rowTypeInfo);
} else {
// else stores a flag to indicator whether pk appears before.
stateDesc = new ValueStateDescriptor("fistValueState", Types.BOOLEAN);
}
state = getRuntimeContext().getState(stateDesc);
}

@Override
Expand All @@ -74,26 +76,17 @@ public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) t
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime);

BaseRow preRow = pkRow.value();
if (keepLastRow) {
processLastRow(preRow, input, generateRetraction, stateCleaningEnabled, pkRow, equaliser, out);
processLastRow(input, generateRetraction, state, out);
} else {
processFirstRow(preRow, input, pkRow, out);
processFirstRow(input, state, out);
}
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<BaseRow> out) throws Exception {
public void onTimer(long timestamp, OnTimerContext ctx, Collector<BaseRow> out) throws Exception {
if (stateCleaningEnabled) {
cleanupState(pkRow);
cleanupState(state);
}
}
}
Loading

0 comments on commit f2568cd

Please sign in to comment.