-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-26516][streaming] Recover GlobalCommittables with Sink V1 GlobalCommittable serializer #18805
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8ab3d80 (Wed Feb 16 14:32:33 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@gaoyunhaii I think this should fix the upgrade path from Sink V1 to Sink V2. |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
Thanks @fapaul for the PR! I'll have a look~ |
ab6319c
to
438259a
Compare
...java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Outdated
Show resolved
Hide resolved
...java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Show resolved
Hide resolved
@@ -447,5 +451,13 @@ public void commit(Collection<CommitRequest<CommT>> committables) | |||
committables.forEach(CommitRequest::retryLater); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here seems should be failures.forEach
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not easily possible because only committable are retriable at this point. I would need to add a special logic when this happens. I rely here on the fact that I assume all commit
calls are idempotent because it might happen that global committables are committed again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification! I did not notice the difference of types. Might we add some comments for this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I add a comment.
Very thanks @fapaul for the PR! One main issue from my side is that it seems the behavior of the current |
I would have though To be honest I never really understood the idea behind exposing |
Hi Fabian~
It seems currently For |
I thought about this for a longer time and think the behaviour of I am not really what to do about that. Do you have an idea? |
Hi Fabian~ thanks for the clarification! For the For the If we want to call However, currently the sink implementation in the If so, perhaps we could create a new jira issue for state compatibility with the sink v1 and attach this PR to that issue, and dowgrade the FLINK-26173 and leaves it open? Since we have not fully fix the issues listed in the FLINK-26173~ [1] https://lists.apache.org/thread/3tqgnhclbr8ptb69b2ro74535dtbd608 |
…alCommittable serializer With SinkV2 the committer and global committer work very similar and they only write committables into state. SinkV1's GlobalCommitter on the other hand used to write GlobalCommittables into state so this commits adds an migration path.
Thanks for the suggestion. I created a new ticket https://issues.apache.org/jira/browse/FLINK-26516 to fix the state incompatibility and lowered the priority of the original ticket. |
I added a fix for the failed azure test: previously if there are no committables the combine and commit would be skipped, otherwise there might create an empty global committable. The test failed due to this reason. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fapaul for the PR! LGTM
…alCommittable serializer With SinkV2 the committer and global committer work very similar and they only write committables into state. SinkV1's GlobalCommitter on the other hand used to write GlobalCommittables into state so this commits adds an migration path. This closes apache#18805.
…alCommittable serializer With SinkV2 the committer and global committer work very similar and they only write committables into state. SinkV1's GlobalCommitter on the other hand used to write GlobalCommittables into state so this commits adds an migration path. This closes apache#18805.
What is the purpose of the change
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation