Skip to content

Commit

Permalink
1. split DeduplicateFunction into DeduplicateKeepFirstRowFunction and…
Browse files Browse the repository at this point in the history
… DeduplicateKeepLastRowFunction

2. other minor update.
  • Loading branch information
beyond1920 committed Apr 15, 2019
1 parent f2568cd commit 1d79a64
Show file tree
Hide file tree
Showing 30 changed files with 540 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,16 @@ class TableConfig {
this
}

/**
* Returns the minimum time until state which was not updated will be retained.
*/
def getMinIdleStateRetentionTime: Long = {
this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
}

/**
* Returns the maximum time until state which was not updated will be retained.
*/
def getMaxIdleStateRetentionTime: Long = {
// only min idle ttl provided.
if (this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.util.KeySelectorUtil
import org.apache.flink.table.runtime.bundle.KeyedMapBundleOperator
import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
import org.apache.flink.table.runtime.deduplicate.{DeduplicateFunction,
MiniBatchDeduplicateFunction}
import org.apache.flink.table.runtime.deduplicate.{DeduplicateKeepFirstRowFunction, DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepFirstRowFunction, MiniBatchDeduplicateKeepLastRowFunction}
import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules
import org.apache.flink.table.typeutils.BaseRowTypeInfo

Expand Down Expand Up @@ -95,10 +94,7 @@ class StreamExecDeduplicate(
val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(getInput)

if (inputIsAccRetract) {
throw new TableException(
"Deduplicate: Retraction on Deduplicate is not supported yet.\n" +
"please re-check sql grammar. \n" +
"Note: Deduplicate should not follow a non-windowed GroupBy aggregation.")
throw new TableException("Deduplicate doesn't support retraction input stream currently.")
}

val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
Expand All @@ -111,11 +107,12 @@ class StreamExecDeduplicate(
TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY) > 0
val operator = if (isMiniBatchEnabled) {
val exeConfig = tableEnv.execEnv.getConfig
val processFunction = new MiniBatchDeduplicateFunction(
rowTypeInfo,
generateRetraction,
rowTypeInfo.createSerializer(exeConfig),
keepLastRow)
val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
val processFunction = if (keepLastRow) {
new MiniBatchDeduplicateKeepLastRowFunction(rowTypeInfo, generateRetraction, rowSerializer)
} else {
new MiniBatchDeduplicateKeepFirstRowFunction(rowSerializer)
}
val trigger = new CountBundleTrigger[BaseRow](
tableConfig.getConf.getLong(TableConfigOptions.SQL_EXEC_MINIBATCH_SIZE))
new KeyedMapBundleOperator(
Expand All @@ -124,12 +121,12 @@ class StreamExecDeduplicate(
} else {
val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
val processFunction = new DeduplicateFunction(
minRetentionTime,
maxRetentionTime,
rowTypeInfo,
generateRetraction,
keepLastRow)
val processFunction = if (keepLastRow) {
new DeduplicateKeepLastRowFunction(minRetentionTime, maxRetentionTime, rowTypeInfo,
generateRetraction)
} else {
new DeduplicateKeepFirstRowFunction(minRetentionTime, maxRetentionTime)
}
new KeyedProcessOperator[BaseRow, BaseRow, BaseRow](processFunction)
}
val ret = new OneInputTransformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class StreamExecRank(
.item("select", getRowType.getFieldNames.mkString(", "))
}


//~ ExecNode methods -----------------------------------------------------------

override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class StreamOptimizer(tEnv: StreamTableEnvironment) extends Optimizer {
n.sink match {
case _: RetractStreamTableSink[_] => true
case s: DataStreamTableSink[_] => s.updatesAsRetraction
case _ => false
}
case o =>
o.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).sendsUpdatesAsRetractions
Expand Down

0 comments on commit 1d79a64

Please sign in to comment.