New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-10331][network] reduce unnecesary flushing #6692
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM
} | ||
if (!flushRequested) { | ||
flushRequested = true; // set this before the notification! | ||
if (buffers.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this if check deserves an explanation, either here or in the java doc above.
If there is more then 1 buffer, when we were adding second one we have already notified the reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure - I think, I explained the flushing behaviour in the class' JavaDoc
Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second {@link BufferConsumer} (in which case we will assume the first one finished), we will {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows no more buffers being available.
But it doesn't hurt to have something small / more explicit here as well
*/ | ||
@Test | ||
public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { | ||
final ResultSubpartition subpartition = createSubpartition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract those initialisations to setup/teardown/@Rule
. This code block is duplicated couple of times:
final ResultSubpartition subpartition = createSubpartition();
AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
availablityListener.resetNotificationCounters();
(...)
} finally {
subpartition.release();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, not every unit test works on the same setup - are you proposing to
- instantiate these nonetheless and let those be unused in some tests, or
- split the unit test into one with and one without this initialization?
Or maybe I'm not aware of some trick that solves this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I forked off the methods with read view and availability listener to PipelinedSubpartitionWithReadViewTest
@@ -198,6 +199,8 @@ public void testConsumptionWithMixedChannels() throws Exception { | |||
private abstract static class Source { | |||
|
|||
abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception; | |||
|
|||
abstract void flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you need to add this flush
? What was wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on the implementation in PipelinedSubpartition
, i.e. if (buffers.size() == 1 && buffers.peekLast().isFinished())
or whatever we change it to (we don't make guarantees here!), the producer thread may not have flushed its last record after finishing and the source would wait forever (no output flusher in that test)
-> we need to flush all channels before leaving the producer
thanks for the review - I integrated the changes |
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes apache#6692.
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes apache#6692.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes. LGTM :)
…e record length Once we know the record length and if we are not spilling, we should size the buffer immediately to the expected record size, and not incrementally for each received buffer chunk.
…rBuilder and BufferConsumer
…guarantees - producers should flush after writing to make sure all data has been sent - we can only check bufferConsumer.isFinished() after building a Buffer - producer/consumer threads should be named
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes apache#6692.
…nitialization - add PipelinedSubpartitionWithReadViewTest which always creates a subpartition, an availability listener, and a read view before each test and cleans up after each test - remove mockito use from testBasicPipelinedProduceConsumeLogic()
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes #6692.
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes apache#6692.
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes apache#6692.
Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes #6692.
What is the purpose of the change
With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios:
These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided.
Brief change log
flushRequested
and thebuffer
queue sizeSpillingAdaptiveSpanningRecordDeserializer
Verifying this change
This change is already covered by existing tests plus a few new tests in
PipelinedSubpartitionTest
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation