New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11250][runtime] Added method init for RecordWriter for initialization resources(OutputFlusher) outside of constructor #17187
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit d7bfc75 (Tue Sep 07 17:10:00 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Hey @akalash , thanks for the fix. LGTM mostly. Why the init method has to be pulled out and called explicitly here |
@@ -647,6 +647,8 @@ void restoreInternal() throws Exception { | |||
closedOperators = false; | |||
LOG.debug("Initializing {}.", getName()); | |||
|
|||
recordWriter.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why called here separately?
What's the problem of starting the thread in constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We somehow should guarantee that all resources will be closed in case of an error. Before my changes, if the StreamTask fails with the error right after recordWriter would be created, nobody closes it which leads to the leak. So semantic should be as following:
allocateResources;
try {
// do something
} catch(Throwable ex) {
releaseResources;
}
But it is difficult to follow this rule with the current implementation. For example, look at StreamTask#createRecordWriters, if creating the second record writer fails we lost the link to the first record writer, so it would be impossible to close it.
So in my opinion, we have two choices here: to have the init method which would be invoked under try-catch block(it is exactly what I did). or rewrite a code in such a way that exception in any constructor(ex. StreamTask) guarantee releasing the earlier allocated resources in this constructor.
Yes, you are almost right. This commit resolves the problem related to the exception after StreamTask was created but this commit doesn't resolve the problem if the exception happens inside of StreamTask constructor right after recordWriter is created. You can check my test |
…in StreamConfigChainer
8542b78
to
9bda457
Compare
if (channelIOExecutor != null) { | ||
suppressedException = | ||
runAndSuppressThrowable(channelIOExecutor::shutdown, suppressedException); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are other resources that might be null as well. Not just
channelIOExecutor
and mailboxProcessor
Let's take releaseOutputResources
for example,
both operatorChain and recordWriter can possibly be null if thrown exception from the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course, there are more possible NPE but it is not problem since all of that NPE could happen only inside runAndSuppressThrowable which helps to handle it correctly. at least, all variables will checked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
er... I think runAndSuppressThrowable
is not designed for this purpose, but coincidentally yes ^-^.
Hey @akalash , thanks for updating the PR. Please take a look at my comments. Besides of this, I think this try-catch solution + adding non-null check is not a clean way of solving the problem. In the end, the solution may as complicated as extract all resource initialization outside of the constructor (you have to check each of the member variables whether they are null or not). I think the right way is to extract the resource initialization out of streamtask. But I do understand that may need careful walk through the code and clean-up. So I am also fine to take this bug fix as two steps:
|
@curcur, Do I understand correctly that you propose to do nothing in the constructor in this PR but leave only changes related to transition state? |
9bda457
to
257c3b6
Compare
@curcur , I left in PR only changes about transition state. And I also have created the extra ticket - https://issues.apache.org/jira/browse/FLINK-24294 |
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Show resolved
Hide resolved
257c3b6
to
9295f1f
Compare
Thanks @akalash for fixing merged. |
…ization resources(OutputFlusher) outside of constructor (apache#17187) * [refactor][streaming] Ability to change bufferTimeout for StreamEdge in StreamConfigChainer * [FLINK-11250][streaming] Correctly clean up stream task on every place it uses
What is the purpose of the change
This PR resolves the problem with closing the OutputFlusher when an exception happens before the task restore.
Brief change log
init
for RecordWriter createdVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation