-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-8694][runtime] Fix notifyDataAvailable race condition #5572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e70cd04 to
57f83c7
Compare
NicoK
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall good changes and thanks for fixing the issues.
Some smaller comments inline I'd like addressed before merging.
| assertEquals(channelIndex, bufferOrEvent.get().getChannelIndex()); | ||
| assertEquals(expectedIsBuffer, bufferOrEvent.get().isBuffer()); | ||
| assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex()); | ||
| assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also verify assertFalse(inputGate.pollNextBufferOrEvent().isPresent()); for SingleInputGate instances here? (ideally also for any other input gate, once UnionInputGate implements this.
| /** Flag indicating whether the subpartition has been finished. */ | ||
| private boolean isFinished; | ||
|
|
||
| private boolean flushRequested; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add @GuardedBy("buffers")?
| readView.notifyDataAvailable(); | ||
| } | ||
| flushRequested = !buffers.isEmpty(); | ||
| notifyDataAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe don't even flush at all if buffers is empty
| availabilityListener); | ||
| } | ||
|
|
||
| readView.notifyDataAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that both read views` implementation of this function are no-ops, but both constructors do their own notification handling already, so this should probably be removed again
|
|
||
| @Override | ||
| public void notifyDataAvailable() { | ||
| // We do the availability listener notification one by one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is still true for the SpilledSubpartitionView and without the (potentially extended) comment here, a reader might wonder why we don't do anything here, not even forwarding it to the spilled view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the remanent of a different approach to fix the problem. Reverted.
| } | ||
| } // else: spilled | ||
|
|
||
| checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I see something wrong here, but isn't spilledView always non-null here? In that case, please remove the check.
| for (RecordWriter<?> eventualOutput : this.eventualOutputs) { | ||
| eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE); | ||
| eventualOutput.clearBuffers(); | ||
| eventualOutput.flushAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove those two - they should be unnecessary
| assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); | ||
| read.buffer().recycleBuffer(); | ||
| assertEquals(2, listener.getNumNotifications()); | ||
| assertEquals(1, listener.getNumNotifications()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a small comment that we don't get a new notification (anymore!) since the buffer is already available at the getNextBuffer() call?
| listener.awaitNotifications(3, 30_000); | ||
| assertEquals(3, listener.getNumNotifications()); | ||
| listener.awaitNotifications(2, 30_000); | ||
| assertEquals(2, listener.getNumNotifications()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that we will be notified once the spilled writer completes
| env.setUp(1, 1, 0, false); | ||
| env.executeBenchmark(1_000_000); | ||
| env.tearDown(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not tested via your non-IT tests now?
(we should be careful about adding integration/full stack tests because of their added time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is smaller scope test compared to ITCase and it covers higher load. It's a very good stress test for investigating and detecting deadlocks/race conditions in the network stack.
| "requestLock=" + requestLock + | ||
| ", receiverId=" + receiverId + | ||
| ", buffersAvailable=" + buffersAvailable.get() + | ||
| ", buffersAvailable=" + hasBuffersAvailable() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line should now be removed (the property is not part of this object anymore and going through a synchronized block which may be nasty during debugging sessions)
| if (nextBuffer != null) { | ||
| return true; | ||
| } | ||
| return fileReader.hasReachedEndOfFile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be !fileReader.hasReachedEndOfFile(); - I will add tests in a later PR for this
57f83c7 to
e80460d
Compare
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed comments.
|
|
||
| @Override | ||
| public void notifyDataAvailable() { | ||
| // We do the availability listener notification one by one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the remanent of a different approach to fix the problem. Reverted.
| env.setUp(1, 1, 0, false); | ||
| env.executeBenchmark(1_000_000); | ||
| env.tearDown(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is smaller scope test compared to ITCase and it covers higher load. It's a very good stress test for investigating and detecting deadlocks/race conditions in the network stack.
e80460d to
81d8d38
Compare
NicoK
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected fixup commits, but I'll trust you with the changes. After fixing the (simple) merge conflict, we're good to go.
…SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks.
…BuffersFailureITCase
Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking).
81d8d38 to
4a16fb4
Compare
|
Rebased. Lets merge after green travis. |
|
Thanks for merging :) |
This fixes two bugs in network stack:
https://issues.apache.org/jira/browse/FLINK-8760
https://issues.apache.org/jira/browse/FLINK-8694
Brief change log
Please check individual commit messages.
Verifying this change
This PR adds new tests covering the previously bugged cases.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation