-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-8581] Improve performance for low latency network #5423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3938b02
to
0896f88
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the changes look good and I had some comments (inlined), but nothing blocking.
* {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. | ||
*/ | ||
@NotThreadSafe | ||
public class BufferConsumer implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought about names: this is called BufferConsumer
, but it does not "consume" buffers. It is coordinating the production of read slices from a shared buffer. BufferBuilder
makes more sense then this. Even worse, this class has a build() : Buffer
method :-(.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know. Can you propose some different naming scheme? BufferWriter
and BufferBuilder
?
* @return how much information was written to the target buffer and | ||
* whether this buffer is full | ||
*/ | ||
SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One remark from reading the code, I found it a bit surprising that a method that looks like a setter will case the write to continue. Maybe this is better called something like continueWritingWithNextBufferBuilder
or split the setter from a continueWrite
method?
bufferBuilders[targetChannel] = Optional.empty(); | ||
|
||
numBytesOut.inc(bufferBuilder.getWrittenBytes()); | ||
bufferBuilder.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could combine this into numBytesOut.inc(bufferBuilder.finish())
or maybe finish()
should not need to have a return value?
result = serializer.setNextBufferBuilder(bufferBuilder); | ||
SerializationResult result = serializer.addRecord(record); | ||
|
||
while (result.isFullBuffer()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this loop could not be simplified to
while (!result.isFullRecord()) {
tryFinishCurrentBufferBuilder(targetChannel, serializer);
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.setNextBufferBuilder(bufferBuilder);
}
This would introduce a minor change in behaviour in cases where the end of the record falls exactly to the end of a buffer. With the change, the buffer is only finished by the next record and not on the spot. However this should not be a problem because this outcome is what usually should happen for almost every record beside those corner cases and thus the code should already handle them well.
With this change, tryFinishCurrentBufferBuilder
also does not longer require a return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, I'm not entirely sure. This "minor change" can be a significant overhead in case of many channels and large records. I don't want to risk increasing the scope of potential problems with this PR :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Can introduce this change later after some more extensive tests.
public static void waitForAll(long timeoutMillis, Collection<Future<?>> futures) throws Exception { | ||
long startMillis = System.currentTimeMillis(); | ||
Set<Future<?>> futuresSet = new HashSet<>(); | ||
for (Future<?> future : futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be replaced with addAll()
or even the constructor taking collection.
Buffer buffer, | ||
private boolean tryFinishCurrentBufferBuilder( | ||
int targetChannel, | ||
RecordSerializer<T> serializer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code no longer throws IOException
.
reader.setRegisteredAsAvailable(true); | ||
} | ||
|
||
private NetworkSequenceViewReader poolAvailableReader() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be pollAvailableReader()
} else { | ||
// This channel was now removed from the available reader queue. | ||
// We re-add it into the queue if it is still available | ||
if (next.moreAvailable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like the most common case, and I wonder why we cannot just peek the queue and only remove reader
in the other cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the most common case - except of super low latencies cases, network is much faster then our capabilities to produce data.
Secondly, there are three branches that we need to cover here. With as it is no, we poll
reader once, and only re-enqueue it once (in this case that you commented). With peek
we would have to pop
it in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
@Override | ||
public synchronized ResultPartitionID getPartitionId() { | ||
return new ResultPartitionID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intended effect of having this synchronized
, looks like it does nothing?
|
||
@Override | ||
public synchronized BufferProvider getBufferProvider() { | ||
return bufferProvider; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this synchronize
help? The field is final
, so I would assume this change is not required.
3c9450a
to
c1f90bf
Compare
This simplifies an API a little bit
Deduplicated code was identical.
…dSerializationTest Dedupilcated code was effectively identical, but implemented in a slightly different way.
BufferConsumer will be used in the future for reading partially written MemorySegments. On flushes instead of requesting new MemorySegment BufferConsumer code will allow to continue writting to partially filled up MemmorySegment.
…Spillable subtartitions
notifyBuffersAvailable is a quick call that doesn't need to be executed outside of the lock
SpilledSubpartitionViewTest duplicates a lot of production logic (TestSubpartitionConsumer is a duplicated logic of LocalInputChannel and mix of CreditBasedSequenceNumberingViewReader with PartitionRequestQueue. Also it seems like most of the logic is covered by SpillableSubpartitionTest.
…nputGate and handle redundant data notifications
This is a preparation for changes in data notifications, which will not be that strict as they are now.
By introducing #commit() method on critical path we reduce number of volatile writes from 2 down to 1. This improves network throughput by 20% and restores the orignal performance for high latency cases.
c1f90bf
to
72b287a
Compare
I have rebased the PR and squashed the fixup commits. |
Thanks for those very good improvements, I will merge this. |
This big PR depends on #4552 and #5314. Main purpose of this change is to increase network throughput/performance in low latency cases. On its own, #4552 and #5314 are causing huge performance degradation for ~1ms flushing intervals (on top of already very poor Flink's performance in such case). This PR is fixing making throughput in ~1ms more or less similar to ~100ms flushing interval.
Quick (noisy) benchmark results:
master branch:
master + credit based flow control
credit based + low latency fixes (this PR):
Brief change log
This last one ([FLINK-8591]) is the one commit that actually improves the performance by allowing sender to append a records to a memory segment, while
PartitionRequestQueue
in Netty is busy handling/processing/flushing previous memory segment and when it is blocked for a new credit to arrive.Verifying this change
This change is a trivial rework ;)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation