From 0b623b66399915d43f29245da148fed63bf940bf Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jul 2018 15:49:15 +0200 Subject: [PATCH] [FLINK-9766][network][tests] fix cleanup in RemoteInputChannelTest If an assertion in the test fails and as a result the cleanup fails, in most tests the original assertion was swallowed making it hard to debug. Furthermore, #testConcurrentRecycleAndRelease2() does even not clean up at all if successful. --- .../consumer/RemoteInputChannelTest.java | 136 +++++++++--------- 1 file changed, 70 insertions(+), 66 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 408010696d6e8..6c6fd9680fc16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -39,6 +39,8 @@ import org.junit.Test; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -53,7 +55,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -329,6 +330,7 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); @@ -447,13 +449,10 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources - inputChannel.releaseAllResources(); - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); + cleanup(networkBufferPool, null, thrown, inputChannel); } } @@ -471,6 +470,7 @@ public void testAvailableBuffersEqualToRequiredBuffers() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); @@ -525,13 +525,10 @@ public void testAvailableBuffersEqualToRequiredBuffers() throws Exception { 14, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 2 buffers available in local pool", 2, bufferPool.getNumberOfAvailableMemorySegments()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources - inputChannel.releaseAllResources(); - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); + cleanup(networkBufferPool, null, thrown, inputChannel); } } @@ -549,6 +546,7 @@ public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); @@ -617,13 +615,10 @@ public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception { 12, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 2 buffers available in local pool", 2, bufferPool.getNumberOfAvailableMemorySegments()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources - inputChannel.releaseAllResources(); - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); + cleanup(networkBufferPool, null, thrown, inputChannel); } } @@ -645,6 +640,7 @@ public void testFairDistributionFloatingBuffers() throws Exception { inputGate.setInputChannel(channel1.partitionId.getPartitionId(), channel1); inputGate.setInputChannel(channel2.partitionId.getPartitionId(), channel2); inputGate.setInputChannel(channel3.partitionId.getPartitionId(), channel3); + Throwable thrown = null; try { final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); @@ -688,15 +684,10 @@ public void testFairDistributionFloatingBuffers() throws Exception { assertEquals("There should be 3 buffers available in the channel", 3, channel1.getNumberOfAvailableBuffers()); assertEquals("There should be 3 buffers available in the channel", 3, channel2.getNumberOfAvailableBuffers()); assertEquals("There should be 3 buffers available in the channel", 3, channel3.getNumberOfAvailableBuffers()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources - channel1.releaseAllResources(); - channel2.releaseAllResources(); - channel3.releaseAllResources(); - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); + cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3); } } @@ -717,6 +708,7 @@ public void testConcurrentOnSenderBacklogAndRelease() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); @@ -754,17 +746,10 @@ public Void call() throws Exception { 0, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 130 buffers available in local pool.", 130, bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources once exception - if (!inputChannel.isReleased()) { - inputChannel.releaseAllResources(); - } - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); - - executor.shutdown(); + cleanup(networkBufferPool, executor, thrown, inputChannel); } } @@ -786,6 +771,7 @@ public void testConcurrentOnSenderBacklogAndRecycle() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); @@ -813,15 +799,10 @@ public Void call() throws Exception { inputChannel.getNumberOfRequiredBuffers(), inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be no buffers available in local pool.", 0, bufferPool.getNumberOfAvailableMemorySegments()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources - inputChannel.releaseAllResources(); - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); - - executor.shutdown(); + cleanup(networkBufferPool, executor, thrown, inputChannel); } } @@ -842,6 +823,7 @@ public void testConcurrentRecycleAndRelease() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); @@ -869,17 +851,10 @@ public Void call() throws Exception { numFloatingBuffers, bufferPool.getNumberOfAvailableMemorySegments()); assertEquals("There should be " + numExclusiveSegments + " buffers available in global pool.", numExclusiveSegments, networkBufferPool.getNumberOfAvailableMemorySegments()); - + } catch (Throwable t) { + thrown = t; } finally { - // Release all the buffer resources once exception - if (!inputChannel.isReleased()) { - inputChannel.releaseAllResources(); - } - - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); - - executor.shutdown(); + cleanup(networkBufferPool, executor, thrown, inputChannel); } } @@ -903,6 +878,7 @@ public void testConcurrentRecycleAndRelease2() throws Exception { final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + Throwable thrown = null; try { final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers); inputGate.setBufferPool(bufferPool); @@ -958,17 +934,9 @@ public void testConcurrentRecycleAndRelease2() throws Exception { submitTasksAndWaitForResults(executor, new Callable[] {bufferPoolInteractionsTask, channelInteractionsTask}); } catch (Throwable t) { - inputChannel.releaseAllResources(); - - try { - networkBufferPool.destroyAllBufferPools(); - } catch (Throwable tInner) { - t.addSuppressed(tInner); - } - - networkBufferPool.destroy(); - executor.shutdown(); - ExceptionUtils.rethrowException(t); + thrown = t; + } finally { + cleanup(networkBufferPool, executor, thrown, inputChannel); } } @@ -1089,4 +1057,40 @@ private void submitTasksAndWaitForResults(ExecutorService executor, Callable[] t result.get(); } } + + /** + * Helper code to ease cleanup handling with suppressed exceptions. + */ + private void cleanup( + NetworkBufferPool networkBufferPool, + @Nullable ExecutorService executor, + @Nullable Throwable throwable, + InputChannel... inputChannels) throws Exception { + for (InputChannel inputChannel : inputChannels) { + try { + inputChannel.releaseAllResources(); + } catch (Throwable tInner) { + throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable); + } + } + + try { + networkBufferPool.destroyAllBufferPools(); + } catch (Throwable tInner) { + throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable); + } + + try { + networkBufferPool.destroy(); + } catch (Throwable tInner) { + throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable); + } + + if (executor != null) { + executor.shutdown(); + } + if (throwable != null) { + ExceptionUtils.rethrowException(throwable); + } + } }