-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12777][network] Support CheckpointBarrierHandler in StreamTwoInputSelectableProcessor #8811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@zhijiangW, @sunhaibotb and @Myasuka: you might be interested in this one. |
|
Thanks for the great work @pnowojski . |
sunhaibotb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left two small comments about the "Rename BufferBlocker to BufferStorage" commit.
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
Outdated
Show resolved
Hide resolved
|
Thanks for notifying me about this PR @pnowojski ! Actually I am interested in it, but there are many review works in hands for release-1.9 atm. I am not sure whether have time for reviewing it in time before feature freeze, and I will try best. :) |
|
Thanks @sunhaibotb for the review. I have addressed the comments and resolved the conflicts. |
sunhaibotb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find questions about the correctness, and only left some comments on other issues. I'll take the time to look at the key parts again. @pnowojski
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorageImpl.java
Outdated
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
Outdated
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
Outdated
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
Show resolved
Hide resolved
| public void close() { | ||
| public void close() throws IOException { | ||
| BufferOrEvent boe; | ||
| while ((boe = currentBuffers.poll()) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: If you can add a few comments like "this part should not ever fail", I don't need to find out why there's no try... finally here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if that's the contract here. I guess this is on a best effort basis clean up.
Regardless of that, this is a separate issue independent of my change. If you think that something is not correct here, can you create a new issue and investigate it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, the reason why it don't need to add try...finally like the following code is that the while {...} parts will never fail, and the code is clearer if we can add some comments to express this point.
public void close() throws IOException {
try {
BufferOrEvent boe;
while ((boe = currentBuffers.poll()) != null) {
if (boe.isBuffer()) {
boe.getBuffer().recycleBuffer();
}
}
} finally {
super.close();
}
}
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
Outdated
Show resolved
Hide resolved
…ufferStorage This makes BufferStorage contract more complete. Now it takes care of the whole process of storing and returning the data with simpler interface (single #rollOver method vs two different as it was before).
…Base#createBuffer
… cancellation marker was lost
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @sunhaibotb. I've addressed and/or responded to your comments.
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
Outdated
Show resolved
Hide resolved
| public void close() { | ||
| public void close() throws IOException { | ||
| BufferOrEvent boe; | ||
| while ((boe = currentBuffers.poll()) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if that's the contract here. I guess this is on a best effort basis clean up.
Regardless of that, this is a separate issue independent of my change. If you think that something is not correct here, can you create a new issue and investigate it there?
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
Outdated
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
Show resolved
Hide resolved
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
Outdated
Show resolved
Hide resolved
StephanEwen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generally looks like a very good implementation.
One thing that strikes me as slightly odd is the LinkedBufferStorage.
- Is this very much tailored towards two inputs, when it would be good to keep it generic enough for N inputs (think side inputs in the future).
- if the linking is done in two places that need to be kept carefully in sync (
BarrierHandlerandLinkedBufferStorage), rather than in one place.
Could the barrier handler (or the specific barrier aligner implementation) not trigger the roll over of the buffer storage? That would eliminate the many if clauses (and rollover logic) out of the CheckpointedInputGate and abstract the entire linking between multiple input gates behind the fact that there is a shared barrier handler.
There are some minor issues in the code, which might be artifacts of prior work:
InputGate.pollNext()throws anInterruptedExceptioneven though it is a non-blocking operation.- creating a
BufferStorageforAT_LEAST_ONCEmode creates also aCachingBufferStorage. While this is never used (no buffer is stored in that way), should this maybe be an implementation that throws unsupported operation exceptions, to fail fast in case a buggy code path actually tries to store a buffer there.
|
Thanks for the review @StephanEwen.
I was considering making it more general, but currently we don't have general
Because of those reasons I thought that it's better to keep it specialised now and maybe revisit this in the future.
That I was also considering. The problem is that buffer storage can not be hidden from All in all, I don't like the current setup too much, but I think it's a lesser evil and at least
I think I will add a java doc above this method: @StephanEwen + @StefanRRichter + @NicoK + @pnowojski = 4 :) The catch is that this method might block if there is some data, but the buffers pool is empty and we are waiting for some buffer to be recycled (via
Done |
|
Re Actually, with your @StephanEwen new |
|
Forwarding the results of an offline discussion between @pnowojski and me: We sketched an option where However, there are a lot of open questions about that approach as well. For example, the Given the uncertainty about the shortcomings or design complexity of the alternative implementation, I concur with Piotr that this implementation is the better approach for the time being, especially when avoiding the risk of regression of the current TwoInput* code paths. For the introduction of the N input operators and side inputs, we should revisit this design. |
|
With decision mentioned in the previous comment, I have no more concerns about this PR. +1 to merge from my side. |
|
Thanks for the review @StephanEwen and +1 for what you have written. PR has passed tests on my private travis instance and previous version (without the |
…tructure as BarrierBuffer
…ith the refactor 1. Rename BarrierBuffer to CheckpointedInputGate CheckpointedInputGate was an interface, while BarrierBuffer was it's implementation. This rename means that we are dropping the interface and keeping only the concrete class. 2. Rename BarrierBuffer and BarrierTracker tests to match this rename and previous refactorings.
This reference was introducing unwanted dependency between CachedBufferStorage and a class that was using it.
This PR consists of series of changes that:
CheckpointBarrierHandlerto prepare a ground for using them inStreamTwoInputSelectableProcessorCheckpointBarrierHandlerinStreamTwoInputSelectableProcessorNext step would be to replace
StreamTwoInputProcessorwithStreamTwoInputSelectableProcessor.For detailed changelog, please check individual commits.
There are quite a lot of refactoring and renaming commits there, I tried hard not to mix them with functional changes.
Because of the sheer amount of renames and refactorings and my attempt to keep them small, commits split might not be perfect :(
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation