From 2233204079f107d1abbe9b3237074c07da0227cb Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jul 2018 00:48:33 +0200 Subject: [PATCH] [FLINK-9755][network] forward exceptions in RemoteInputChannel#notifyBufferAvailable() to the responsible thread This mainly involves state checks but previously these have only been swallowed without re-registration or any other logging/handling. This may have lead to some thread stalling while waiting for the notification that never came. --- .../io/network/buffer/BufferListener.java | 9 ++ .../io/network/buffer/LocalBufferPool.java | 36 ++------ .../consumer/RemoteInputChannel.java | 52 ++++++----- .../consumer/RemoteInputChannelTest.java | 90 +++++++++++++++++-- 4 files changed, 131 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java index 05b415625fc28..4cc32c0a66147 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java @@ -27,6 +27,15 @@ public interface BufferListener { /** * Notification callback if a buffer is recycled and becomes available in buffer pool. * + *

Note: responsibility on recycling the given buffer is transferred to this implementation, + * including any errors that lead to exceptions being thrown! + * + *

BEWARE: since this may be called from outside the thread that relies on + * the listener's logic, any exception that occurs with this handler should be forwarded to the + * responsible thread for handling and otherwise ignored in the processing of this method. The + * buffer pool forwards any {@link Throwable} from here upwards to a potentially unrelated call + * stack! + * * @param buffer buffer that becomes available in buffer pool. * @return true if the listener wants to be notified next time. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index e874723755d9e..1596fded6f3f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -262,8 +262,7 @@ public void recycle(MemorySegment segment) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); return; - } - else { + } else { listener = registeredListeners.poll(); if (listener == null) { @@ -277,37 +276,18 @@ public void recycle(MemorySegment segment) { // We do not know which locks have been acquired before the recycle() or are needed in the // notification and which other threads also access them. // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) - boolean success = false; - boolean needMoreBuffers = false; - try { - needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - success = true; - } catch (Throwable ignored) { - // handled below, under the lock - } + // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer + // (either directly or later during error handling) and therefore eventually end up in this + // method again. + boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (!success || needMoreBuffers) { + if (needMoreBuffers) { synchronized (availableMemorySegments) { if (isDestroyed) { // cleanup tasks how they would have been done if we only had one synchronized block - if (needMoreBuffers) { - listener.notifyBufferDestroyed(); - } - if (!success) { - returnMemorySegment(segment); - } + listener.notifyBufferDestroyed(); } else { - if (needMoreBuffers) { - registeredListeners.add(listener); - } - if (!success) { - if (numberOfRequestedMemorySegments > currentPoolSize) { - returnMemorySegment(segment); - } else { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); - } - } + registeredListeners.add(listener); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 0f70d448020a3..b94f48a3a79f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -360,32 +360,44 @@ public boolean notifyBufferAvailable(Buffer buffer) { return false; } - boolean needMoreBuffers = false; - synchronized (bufferQueue) { - checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers."); + boolean recycleBuffer = true; + try { + boolean needMoreBuffers = false; + synchronized (bufferQueue) { + checkState(isWaitingForFloatingBuffers, + "This channel should be waiting for floating buffers."); + + // Important: double check the isReleased state inside synchronized block, so there is no + // race condition when notifyBufferAvailable and releaseAllResources running in parallel. + if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + isWaitingForFloatingBuffers = false; + recycleBuffer = false; // just in case + buffer.recycleBuffer(); + return false; + } - // Important: double check the isReleased state inside synchronized block, so there is no - // race condition when notifyBufferAvailable and releaseAllResources running in parallel. - if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { - isWaitingForFloatingBuffers = false; - buffer.recycleBuffer(); - return false; - } + recycleBuffer = false; + bufferQueue.addFloatingBuffer(buffer); - bufferQueue.addFloatingBuffer(buffer); + if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) { + isWaitingForFloatingBuffers = false; + } else { + needMoreBuffers = true; + } - if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) { - isWaitingForFloatingBuffers = false; - } else { - needMoreBuffers = true; + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } } - } - if (unannouncedCredit.getAndAdd(1) == 0) { - notifyCreditAvailable(); + return needMoreBuffers; + } catch (Throwable t) { + if (recycleBuffer) { + buffer.recycleBuffer(); + } + setError(t); + return false; } - - return needMoreBuffers; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 6c6fd9680fc16..63054923498ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -52,9 +52,13 @@ import scala.Tuple2; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -452,7 +456,7 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, null, thrown, inputChannel); + cleanup(networkBufferPool, null, null, thrown, inputChannel); } } @@ -528,7 +532,7 @@ public void testAvailableBuffersEqualToRequiredBuffers() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, null, thrown, inputChannel); + cleanup(networkBufferPool, null, null, thrown, inputChannel); } } @@ -618,7 +622,7 @@ public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, null, thrown, inputChannel); + cleanup(networkBufferPool, null, null, thrown, inputChannel); } } @@ -687,7 +691,72 @@ public void testFairDistributionFloatingBuffers() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3); + cleanup(networkBufferPool, null, null, thrown, channel1, channel2, channel3); + } + } + + /** + * Tests that failures are propagated correctly if + * {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an exception. Also tests that + * a second listener will be notified in this case. + */ + @Test + public void testFailureInNotifyBufferAvailable() throws Exception { + // Setup + final int numExclusiveBuffers = 0; + final int numFloatingBuffers = 1; + final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers; + final NetworkBufferPool networkBufferPool = new NetworkBufferPool( + numTotalBuffers, 32); + + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC); + + successfulRemoteIC.requestSubpartition(0); + + // late creation -> no exclusive buffers, also no requested subpartition in successfulRemoteIC + // (to trigger a failure in RemoteInputChannel#notifyBufferAvailable()) + final RemoteInputChannel failingRemoteIC = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), failingRemoteIC); + + Buffer buffer = null; + Throwable thrown = null; + try { + final BufferPool bufferPool = + networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); + inputGate.setBufferPool(bufferPool); + + buffer = bufferPool.requestBufferBlocking(); + + // trigger subscription to buffer pool + failingRemoteIC.onSenderBacklog(1); + successfulRemoteIC.onSenderBacklog(numExclusiveBuffers + 1); + // recycling will call RemoteInputChannel#notifyBufferAvailable() which will fail and + // this exception will be swallowed and set as an error in failingRemoteIC + buffer.recycleBuffer(); + buffer = null; + try { + failingRemoteIC.checkError(); + fail("The input channel should have an error based on the failure in RemoteInputChannel#notifyBufferAvailable()"); + } catch (IOException e) { + assertThat(e, hasProperty("cause", isA(IllegalStateException.class))); + } + // currently, the buffer is still enqueued in the bufferQueue of failingRemoteIC + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); + buffer = successfulRemoteIC.requestBuffer(); + assertNull("buffer should still remain in failingRemoteIC", buffer); + + // releasing resources in failingRemoteIC should free the buffer again and immediately + // recycle it into successfulRemoteIC + failingRemoteIC.releaseAllResources(); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); + buffer = successfulRemoteIC.requestBuffer(); + assertNotNull("no buffer given to successfulRemoteIC", buffer); + } catch (Throwable t) { + thrown = t; + } finally { + cleanup(networkBufferPool, null, buffer, thrown, failingRemoteIC, successfulRemoteIC); } } @@ -749,7 +818,7 @@ public Void call() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, executor, thrown, inputChannel); + cleanup(networkBufferPool, executor, null, thrown, inputChannel); } } @@ -802,7 +871,7 @@ public Void call() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, executor, thrown, inputChannel); + cleanup(networkBufferPool, executor, null, thrown, inputChannel); } } @@ -854,7 +923,7 @@ public Void call() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, executor, thrown, inputChannel); + cleanup(networkBufferPool, executor, null, thrown, inputChannel); } } @@ -936,7 +1005,7 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, executor, thrown, inputChannel); + cleanup(networkBufferPool, executor, null, thrown, inputChannel); } } @@ -1064,6 +1133,7 @@ private void submitTasksAndWaitForResults(ExecutorService executor, Callable[] t private void cleanup( NetworkBufferPool networkBufferPool, @Nullable ExecutorService executor, + @Nullable Buffer buffer, @Nullable Throwable throwable, InputChannel... inputChannels) throws Exception { for (InputChannel inputChannel : inputChannels) { @@ -1074,6 +1144,10 @@ private void cleanup( } } + if (buffer != null && !buffer.isRecycled()) { + buffer.recycleBuffer(); + } + try { networkBufferPool.destroyAllBufferPools(); } catch (Throwable tInner) {