Skip to content

Commit

Permalink
[FLINK-19694][table-runtime-blink] Support upsert ChangelogMode for S…
Browse files Browse the repository at this point in the history
…canTableSource in runtime

This closes #13721
  • Loading branch information
wuchong committed Oct 27, 2020
1 parent 92a2648 commit 436a4c2
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 112 deletions.
Expand Up @@ -117,6 +117,7 @@ class StreamExecDeduplicate(
rowTypeInfo,
generateUpdateBefore,
generateInsert,
true,
rowSerializer,
minRetentionTime)
} else {
Expand All @@ -134,7 +135,8 @@ class StreamExecDeduplicate(
minRetentionTime,
rowTypeInfo,
generateUpdateBefore,
generateInsert)
generateInsert,
true)
} else {
new DeduplicateKeepFirstRowFunction(minRetentionTime)
}
Expand Down Expand Up @@ -163,12 +165,14 @@ object StreamExecDeduplicate {

@Experimental
val TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE: ConfigOption[JBoolean] =
key("table.exec.insert-and-updateafter-sensitive")
.defaultValue(JBoolean.valueOf(true))
.withDescription("Set whether the job (especially the sinks) is sensitive to " +
"INSERT messages and UPDATE_AFTER messages. " +
"If false, Flink may send UPDATE_AFTER instead of INSERT for the first row " +
"at some times (e.g. deduplication for last row). " +
"If true, Flink will guarantee to send INSERT for the first row. " +
"Default is true.")
key("table.exec.insert-and-updateafter-sensitive")
.booleanType()
.defaultValue(JBoolean.valueOf(true))
.withDescription("Set whether the job (especially the sinks) is sensitive to " +
"INSERT messages and UPDATE_AFTER messages. " +
"If false, Flink may send UPDATE_AFTER instead of INSERT for the first row " +
"at some times (e.g. deduplication for last row). " +
"If true, Flink will guarantee to send INSERT for the first row, " +
"but there will be additional overhead." +
"Default is true.")
}

0 comments on commit 436a4c2

Please sign in to comment.