-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-23457][network] The upstream sends the buffer of the right size for broadcast case #17024
Conversation
cc @pnowojski |
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit e2ca022 (Fri Aug 27 12:05:36 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot run azure |
I've updated, please have a look again.thx @pnowojski |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lometheus, thanks for your changes. I left a couple of comments in the PR. Also, I remind you that the correct format for the commit comment is the same as you use for the PR name([FLINK-task][component] Comment). So don't forget to update it.
for (ResultSubpartition subpartition : subpartitions) { | ||
subpartition.add(consumer.copy(), partialRecordBytes); | ||
int subPartitionBufferSize = subpartition.add(consumer.copy(), partialRecordBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If subpartition#add
fails it returns the negative value(-1
), so we should take into account this case and for example ignore such value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for review. I add special treatment for negative value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe more correctly is to ignore the value for calculation of desirable buffer size rather than ignore the total value. I mean:
if(subPartitionBufferSize > 0) {
desirableBufferSize = Math.min(desirableBufferSize, subPartitionBufferSize);
}
I think that it is better because if one of the subpartitions fails we are still able to send it to another ones. As I understand eventually they all will be closed if at least one was closed so maybe it is not so important but anyway it is better to follow the current semantic which doesn't forbid sending data to subpartitions even if one of them is closed.
|
||
// then: The buffer less or equal to configured. | ||
assertEquals(5, subpartition0.pollBuffer().buffer().getSize()); | ||
assertEquals(5, subpartition1.pollBuffer().buffer().getSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, take a look more carefully at this test(testDifferentBufferSizeForSubpartitions
), there are several scenarios that were tested here(send buffer less/greater/equal than the buffer size, change the buffer size and send the again). But you have only one test case which is not enough. So please, increase your test coverage.
Also I suggest extracting your test in the separated method like testDynamicBufferSizeForBroadcast
or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add testDynamicBufferSizeForBroadcast for less/greater/equal case
d94a61c
to
e83ed6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lometheus , I left one more comment, you can check it and then you can squash your commits to one for the final review and test.
for (ResultSubpartition subpartition : subpartitions) { | ||
subpartition.add(consumer.copy(), partialRecordBytes); | ||
int subPartitionBufferSize = subpartition.add(consumer.copy(), partialRecordBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe more correctly is to ignore the value for calculation of desirable buffer size rather than ignore the total value. I mean:
if(subPartitionBufferSize > 0) {
desirableBufferSize = Math.min(desirableBufferSize, subPartitionBufferSize);
}
I think that it is better because if one of the subpartitions fails we are still able to send it to another ones. As I understand eventually they all will be closed if at least one was closed so maybe it is not so important but anyway it is better to follow the current semantic which doesn't forbid sending data to subpartitions even if one of them is closed.
Wonderful idea ,I just fixed it with select a desirable buffer size. |
If I understood your question right then if one record allocates more than one buffer then the first buffer would be trimmed but the second one will be equal to the size of the rest of the record. In an ideal world, we should trim the second buffer too but in reality, it requires serious changes in the code which actually doesn't make any sense because it is a big mistake to configure buffer size less than one record. P.S. Please, squash your commits into one(and you can rebase onto the fresh master). |
? Math.min(desirableBufferSize, subPartitionBufferSize) | ||
: desirableBufferSize; | ||
} | ||
if (desirableBufferSize != Integer.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor comment: it is up to you but this condition is not really needed here because buffer.trim(MAX_VALUE) is not a problem it just set the buffer size to the maximum possible value which is ok. But it is not a mistake if you left this condition here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct, the code should be as concise as possible,I drop this condition.
fcfa7e6
to
a941706
Compare
I squash all my commits into one , thinks for your review and patiently answer again
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @pnowojski, can you help with the merge, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, there is an unrelated test failure that has been fixed FLINK-24036.
@flinkbot run azure |
@lometheus , could you rebase this PR on top of the latest master to pull in the fix for this failure so that we could have a green build? |
bbdb8d5
to
71836bf
Compare
restored dev_20210827 |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
d7e21cf
to
bfeab81
Compare
@flinkbot run azure |
@pnowojski flinknot seems doesn't work anymore, how can I deal with this condition |
It takes some time for the flinkbot to pickup PR. It looks like it did just that and the most recent version of this PR seems to have some failures?
|
@@ -330,9 +330,16 @@ private BufferBuilder appendBroadcastDataForRecordContinuation( | |||
private void createBroadcastBufferConsumers(BufferBuilder buffer, int partialRecordBytes) | |||
throws IOException { | |||
try (final BufferConsumer consumer = buffer.createBufferConsumerFromBeginning()) { | |||
int desirableBufferSize = Integer.MAX_VALUE; | |||
for (ResultSubpartition subpartition : subpartitions) { | |||
subpartition.add(consumer.copy(), partialRecordBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have duplicated:
subpartition.add(consumer.copy(), partialRecordBytes);
which is causing Azure failures.
(edit: I have already fixed it)
...rc/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
Outdated
Show resolved
Hide resolved
…k/partition/BufferWritingResultPartition.java
Azure was green, merged to master manually after squashing the commits. Thanks @lometheus for your contribution :) |
What is the purpose of the change
This PR fit NewBufferSize in broadcast case
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation