Skip to content

Commit

Permalink
[FLINK-7699][core] Define the BufferListener interface to replace Eve…
Browse files Browse the repository at this point in the history
…ntlListener in BufferProvider

This closes #4735.
  • Loading branch information
zhijiangW authored and zentol committed Oct 10, 2017
1 parent d3cbba5 commit 8706c6f
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 86 deletions.
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.buffer;

/**
* Interface of the availability of buffers. Listeners can opt for a one-time only
* notification or to be notified repeatedly.
*/
public interface BufferListener {

/**
* Notification callback if a buffer is recycled and becomes available in buffer pool.
*
* @param buffer buffer that becomes available in buffer pool.
* @return true if the listener wants to be notified next time.
*/
boolean notifyBufferAvailable(Buffer buffer);

/**
* Notification callback if the buffer provider is destroyed.
*/
void notifyBufferDestroyed();
}
Expand Up @@ -18,43 +18,38 @@

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.runtime.util.event.EventListener;

import java.io.IOException;

/**
* A buffer provider to request buffers from in a synchronous or asynchronous fashion.
*
* <p> The data producing side (result partition writers) request buffers in a synchronous fashion,
* <p>The data producing side (result partition writers) request buffers in a synchronous fashion,
* whereas the input side requests asynchronously.
*/
public interface BufferProvider {

/**
* Returns a {@link Buffer} instance from the buffer provider, if one is available.
* <p>
* Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
*
* <p>Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
*/
Buffer requestBuffer() throws IOException;

/**
* Returns a {@link Buffer} instance from the buffer provider.
* <p>
* If there is no buffer available, the call will block until one becomes available again or the
*
* <p>If there is no buffer available, the call will block until one becomes available again or the
* buffer provider has been destroyed.
*/
Buffer requestBufferBlocking() throws IOException, InterruptedException;

/**
* Adds a buffer availability listener to the buffer provider.
* <p>
* The operation fails with return value <code>false</code>, when there is a buffer available or
*
* <p>The operation fails with return value <code>false</code>, when there is a buffer available or
* the buffer provider has been destroyed.
* <p>
* If the buffer provider gets destroyed while the listener is registered the listener will be
* notified with a <code>null</code> value.
*/
boolean addListener(EventListener<Buffer> listener);
boolean addBufferListener(BufferListener listener);

/**
* Returns whether the buffer provider has been destroyed.
Expand Down
Expand Up @@ -19,13 +19,11 @@
package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.util.event.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -64,7 +62,7 @@ class LocalBufferPool implements BufferPool {
* Buffer availability listeners, which need to be notified when a Buffer becomes available.
* Listeners can only be registered at a time/state where no Buffer instance was available.
*/
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

/** Maximum number of network buffers to allocate. */
private final int maxNumberOfMemorySegments;
Expand Down Expand Up @@ -239,15 +237,18 @@ public void recycle(MemorySegment segment) {
returnMemorySegment(segment);
}
else {
EventListener<Buffer> listener = registeredListeners.poll();
BufferListener listener = registeredListeners.poll();

if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
}
else {
try {
listener.onEvent(new Buffer(segment, this));
boolean needMoreBuffers = listener.notifyBufferAvailable(new Buffer(segment, this));
if (needMoreBuffers) {
registeredListeners.add(listener);
}
}
catch (Throwable ignored) {
availableMemorySegments.add(segment);
Expand All @@ -270,9 +271,9 @@ public void lazyDestroy() {
returnMemorySegment(segment);
}

EventListener<Buffer> listener;
BufferListener listener;
while ((listener = registeredListeners.poll()) != null) {
listener.onEvent(null);
listener.notifyBufferDestroyed();
}

isDestroyed = true;
Expand All @@ -283,7 +284,7 @@ public void lazyDestroy() {
}

@Override
public boolean addListener(EventListener<Buffer> listener) {
public boolean addBufferListener(BufferListener listener) {
synchronized (availableMemorySegments) {
if (!availableMemorySegments.isEmpty() || isDestroyed) {
return false;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
Expand All @@ -29,7 +30,6 @@
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.event.EventListener;

import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -347,15 +347,15 @@ public void run() {
/**
* A buffer availability listener, which subscribes/unsubscribes the NIO
* read event.
* <p>
* If no buffer is available, the channel read event will be unsubscribed
*
* <p>If no buffer is available, the channel read event will be unsubscribed
* until one becomes available again.
* <p>
* After a buffer becomes available again, the buffer is handed over by
* the thread calling {@link #onEvent(Buffer)} to the network I/O
*
* <p>After a buffer becomes available again, the buffer is handed over by
* the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O
* thread, which then continues the processing of the staged buffer.
*/
private class BufferListenerTask implements EventListener<Buffer>, Runnable {
private class BufferListenerTask implements BufferListener, Runnable {

private final AtomicReference<Buffer> availableBuffer = new AtomicReference<Buffer>();

Expand All @@ -365,7 +365,7 @@ private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.Buffer

stagedBufferResponse = bufferResponse;

if (bufferProvider.addListener(this)) {
if (bufferProvider.addBufferListener(this)) {
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
}
Expand All @@ -383,34 +383,33 @@ private boolean hasStagedBufferOrEvent() {
return stagedBufferResponse != null;
}

public void notifyBufferDestroyed() {
// The buffer pool has been destroyed
stagedBufferResponse = null;

if (stagedMessages.isEmpty()) {
ctx.channel().config().setAutoRead(true);
ctx.channel().read();
}
else {
ctx.channel().eventLoop().execute(stagedMessagesHandler);
}
}

// Called by the recycling thread (not network I/O thread)
@Override
public void onEvent(Buffer buffer) {
public boolean notifyBufferAvailable(Buffer buffer) {
boolean success = false;

try {
if (buffer != null) {
if (availableBuffer.compareAndSet(null, buffer)) {
ctx.channel().eventLoop().execute(this);
if (availableBuffer.compareAndSet(null, buffer)) {
ctx.channel().eventLoop().execute(this);

success = true;
}
else {
throw new IllegalStateException("Received a buffer notification, " +
" but the previous one has not been handled yet.");
}
success = true;
}
else {
// The buffer pool has been destroyed
stagedBufferResponse = null;

if (stagedMessages.isEmpty()) {
ctx.channel().config().setAutoRead(true);
ctx.channel().read();
}
else {
ctx.channel().eventLoop().execute(stagedMessagesHandler);
}
throw new IllegalStateException("Received a buffer notification, " +
" but the previous one has not been handled yet.");
}
}
catch (Throwable t) {
Expand All @@ -423,12 +422,14 @@ public void onEvent(Buffer buffer) {
}
}
}

return false;
}

/**
* Continues the decoding of a staged buffer after a buffer has become available again.
* <p>
* This task is executed by the network I/O thread.
*
* <p>This task is executed by the network I/O thread.
*/
@Override
public void run() {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
Expand All @@ -46,7 +47,7 @@
/**
* An input channel, which requests a remote partition queue.
*/
public class RemoteInputChannel extends InputChannel implements BufferRecycler {
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {

/** ID to distinguish this channel from other channels sharing the same TCP connection. */
private final InputChannelID id = new InputChannelID();
Expand Down Expand Up @@ -87,6 +88,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler {
/** The number of available buffers that have not been announced to the producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);

/** The number of unsent buffers in the producer's sub partition. */
private final AtomicInteger senderBacklog = new AtomicInteger(0);

/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */
private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false);

public RemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
Expand Down Expand Up @@ -313,6 +320,50 @@ public int getNumberOfAvailableBuffers() {
}
}

/**
* The Buffer pool notifies this channel of an available floating buffer. If the channel is released or
* currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise,
* the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is
* increased by one.
*
* @param buffer Buffer that becomes available in buffer pool.
* @return True when this channel is waiting for more floating buffers, otherwise false.
*/
@Override
public boolean notifyBufferAvailable(Buffer buffer) {
checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers.");

synchronized (availableBuffers) {
// Important: the isReleased check should be inside the synchronized block.
if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) {
isWaitingForFloatingBuffers.set(false);
buffer.recycle();

return false;
}

availableBuffers.add(buffer);

if (unannouncedCredit.getAndAdd(1) == 0) {
notifyCreditAvailable();
}

if (availableBuffers.size() >= senderBacklog.get()) {
isWaitingForFloatingBuffers.set(false);
return false;
} else {
return true;
}
}
}

@Override
public void notifyBufferDestroyed() {
if (!isWaitingForFloatingBuffers.compareAndSet(true, false)) {
throw new IllegalStateException("This channel should be waiting for floating buffers currently.");
}
}

// ------------------------------------------------------------------------
// Network I/O notifications (called by network I/O thread)
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 8706c6f

Please sign in to comment.