New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-13289][table-planner-blink] Blink-planner should setKeyFields to upsert table sink #9195
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 1eeb005 (Tue Aug 06 15:51:04 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @JingsongLi . Looks good to me overall, just left a minor comment.
val uniqueKeys = fmq.getUniqueKeys(relNode) | ||
if (uniqueKeys != null) { | ||
uniqueKeys.filter(_.nonEmpty).toList.map { uniqueKey => | ||
val keys = new util.HashSet[String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a HashSet
is needed? It seems that the indices returned by the ImmutableBitSet
is strictly ascending. Why not just uniqueKey.asList().toArray
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove HashSet
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the effort @JingsongLi . I left some comments.
// Now we pick shortest one to sink | ||
// TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]] | ||
val tableKeys: Option[Array[String]] = | ||
UpdatingPlanChecker.getUniqueKeys(getInput, planner).sortBy(_.length).headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a potential NPE at this line. Because UpdatingPlanChecker.getUniqueKeys
may return null
.
What about returning Option[Array[String]]
like flink planner? We can move the sortBy and headOption into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can return Option[Array[Array[String]]]
, let sortBy and headOption
still remain in caller, keep the TODO with UpsertStreamTableSink.setKeyFields
val uniqueKeys = fmq.getUniqueKeys(relNode) | ||
if (uniqueKeys != null) { | ||
uniqueKeys.filter(_.nonEmpty).toList | ||
.map(_.asList().map(rowType.getFieldNames.get(_)).toArray).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified to uniqueKeys.filter(_.nonEmpty).map(_.toArray.map(fieldNames.get)).toArray
|
||
} | ||
|
||
private[flink] class TestUpsertSink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to avoid introducing another set of testing sink (incl. upsert sink, retract sink, append sink). We also have TestingRetractTableSink
, TestingUpsertTableSink
, TestingAppendTableSink
. Can we leverage them in this class? Maybe we need some changes for TestingUpsertTableSink.setKeyFields/setIsAppendOnly
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can make TestingUpsertTableSink
support expectedKeys
and expectedIsAppendOnly
@wuchong Please take a look again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JingsongLi for the updating. I left two thoughts. However, overall looks good to me now.
.assignAscendingTimestamps(_._1.toLong) | ||
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) | ||
|
||
val sink = new TestingUpsertTableSink(Array(0, 1, 2), TimeZone.getDefault) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key array can be "0, 1" to align with "wend", "num"
// TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]] | ||
val tableKeys = { | ||
UpdatingPlanChecker.getUniqueKeyFields(getInput, planner) match { | ||
case Some(keys) => keys.sortBy(_.length).headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return the longest instead of the shortest one? So that it can be the same with flink planner when window start and window end are both selected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @JingsongLi , I looked into the FlinkRelMdUniqueKeys
. Currently, we didn't permutate all the possible unique keys and it may touch too many things. I think it's not a blocking problem, we can fix it later.
So I will merge this pull request.
…to upsert table sink This closes #9195
…to upsert table sink This closes apache#9195
What is the purpose of the change
Support upsert table sink in blink-planner
Verifying this change
TableSinkITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation