New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7699][core] Define the BufferListener interface to replace EventlListener in BufferProvider #4735
Conversation
…pe for the floating buffers
…ntlListener in BufferProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution.
I really like the new interface, but we should also add a test for the new possibility to re-add the listener.
The sender backlog changes inside RemoteInputChannel
probably slipped through and should go into #4509 instead. Please remove them from this PR. I'll re-evaluate there.
*/ | ||
@Override | ||
public boolean notifyBufferAvailable(Buffer buffer) { | ||
checkState(isWaitingFloatingBuffers.get(), "This channel should be waiting for floating buffers currently."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"This channel should be waiting for floating buffers."
buffer.recycle(); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add a test which verifies that requests for multiple buffers, i.e. returning true
here, works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also consider the issue of verifying notification multi times.
And I guess you may point out it. :)
It can not work to return true for that, and I will think of proper way of verifying it.
/** | ||
* Buffer pool notifies this channel of an available floating buffer. If the channel is released or not | ||
* needing extra buffers currently, the buffer should be recycled to buffer pool. Otherwise, the buffer | ||
* will be added into the available queue and the unannounced credit is increased by one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Buffer pool notifies this channel of an available floating buffer. If the channel is released or currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is increased by one.
private final AtomicInteger senderBacklog = new AtomicInteger(0); | ||
|
||
/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ | ||
private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better: isWaitingForFloatingBuffers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but in general, do we need this field? For now, I only see this being used to verify correct use and I'm not sure whether the costs verify this. Also, nothing in this PR sets it to true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two fields are currently used in notifyBufferAvailable()
logic, so we have to define them in this PR.
In next PR #4509 , isWaitingForFloatingBuffers
field will be set true by two conditions:
- The number of current available buffers is less than sender backlog
- There are no available floating buffers in
BufferProvider
And this field also used to avoid register listener inBufferProvider
multi times.
It may confuse you to only see this PR change, sorry for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, fine - so I'll see more details in the next PR and we get this in here
087b6b4
to
616f81b
Compare
…ntlListener in BufferProvider
616f81b
to
88bb089
Compare
@NicoK , I have submitted the updates based on your comments! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice new test - I only have minor comments to address.
|
||
assertTrue(localBufferPool.addBufferListener(listener)); | ||
// Recycle the first buffer to notify both of the above listeners and the | ||
// <<tt>twoTimesListener</tt> will be added into the <<tt>registeredListeners</tt> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need the HTML tags here (I suggested them only for the Javadoc - some use {@code }
instead), please remove them here
// Recycle the first buffer to notify both of the above listeners and the | ||
// <<tt>twoTimesListener</tt> will be added into the <<tt>registeredListeners</tt> | ||
// queue of buffer pool again | ||
available1.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also verify the notification right after this recycle()
call? By adding:
verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
@Override | ||
public boolean notifyBufferAvailable(Buffer buffer) { | ||
times++; | ||
buffer.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice trick with the recycle()
call here leading to another invocation or the release of the buffer
…ntlListener in BufferProvider
Already removed the tags and added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
merging. |
…ntlListener in BufferProvider This closes apache#4735.
…ntlListener in BufferProvider This closes apache#4735.
What is the purpose of the change
Currently the EventListener is used in BufferProvider to be notified of buffer available or destroyed pool.
To be semantic clearly, we define a new BufferListener interface which can opt for a one-time only notification or to be notified repeatedly. And we can also notify the pool destroyed via explicitly method notifyBufferDestroyed.
This RP is based on #4499 whose commit is also included for passing travis. Review the third commit for this PR change.
Brief change log
RemoteInputChannel
will implement theBufferListener
to wait for floating buffers fromBufferProvider
.Verifying this change
This change is already covered by existing tests, such as LocalBufferPoolTest.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation