From 511d0b7893c83b94d151f9546165de7d1ce979b4 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 24 Aug 2017 16:49:46 +0200 Subject: [PATCH 1/2] [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER This static buffer did not allow proper reference counting and we should rather create test buffers in the tests which may also be released afterwards. --- .../consumer/RemoteInputChannelTest.java | 22 ++++++++++++++++--- .../io/network/util/TestBufferFactory.java | 6 ----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 4a32d7380b342..9b36890085575 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -40,6 +41,8 @@ import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -55,12 +58,14 @@ public void testExceptionOnReordering() throws Exception { // Setup final SingleInputGate inputGate = mock(SingleInputGate.class); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + final Buffer buffer = TestBufferFactory.createBuffer(); + buffer.retain(); // used twice // The test - inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0); + inputChannel.onBuffer(buffer, 0); // This does not yet throw the exception, but sets the error at the channel. - inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29); + inputChannel.onBuffer(buffer, 29); try { inputChannel.getNextBuffer(); @@ -68,6 +73,10 @@ public void testExceptionOnReordering() throws Exception { fail("Did not throw expected exception after enqueuing an out-of-order buffer."); } catch (Exception expected) { + assertFalse(buffer.isRecycled()); + // free remaining buffer instances + inputChannel.releaseAllResources(); + assertTrue(buffer.isRecycled()); } // Need to notify the input gate for the out-of-order buffer as well. Otherwise the @@ -84,6 +93,7 @@ public void testConcurrentOnBufferAndRelease() throws Exception { // Setup final ExecutorService executor = Executors.newFixedThreadPool(2); + final Buffer buffer = TestBufferFactory.createBuffer(); try { // Test @@ -97,7 +107,10 @@ public void testConcurrentOnBufferAndRelease() throws Exception { public Void call() throws Exception { while (true) { for (int j = 0; j < 128; j++) { - inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j); + // this is the same buffer over and over again which will be + // recycled by the RemoteInputChannel + buffer.retain(); + inputChannel.onBuffer(buffer, j); } if (inputChannel.isReleased()) { @@ -132,6 +145,9 @@ public Void call() throws Exception { } finally { executor.shutdown(); + assertFalse(buffer.isRecycled()); + buffer.recycle(); + assertTrue(buffer.isRecycled()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java index 89ee683c71a62..9856d2283f8db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java @@ -36,8 +36,6 @@ public class TestBufferFactory { private static final BufferRecycler RECYCLER = new DiscardingRecycler(); - private static final Buffer MOCK_BUFFER = createBuffer(); - private final int bufferSize; private final BufferRecycler bufferRecycler; @@ -89,8 +87,4 @@ public static Buffer createBuffer(int bufferSize) { return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER); } - - public static Buffer getMockBuffer() { - return MOCK_BUFFER; - } } From fe0eb65d6e3f09cd11aed6874ff6a503531af28f Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 21 Sep 2017 14:51:48 +0200 Subject: [PATCH 2/2] [FLINK-7513][tests] address PR comments * inline buffer.retain() into the buffer use instead of having it on a separate line --- .../network/partition/consumer/RemoteInputChannelTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 9b36890085575..08bf5eb47999f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -59,10 +59,9 @@ public void testExceptionOnReordering() throws Exception { final SingleInputGate inputGate = mock(SingleInputGate.class); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); final Buffer buffer = TestBufferFactory.createBuffer(); - buffer.retain(); // used twice // The test - inputChannel.onBuffer(buffer, 0); + inputChannel.onBuffer(buffer.retain(), 0); // This does not yet throw the exception, but sets the error at the channel. inputChannel.onBuffer(buffer, 29); @@ -109,8 +108,7 @@ public Void call() throws Exception { for (int j = 0; j < 128; j++) { // this is the same buffer over and over again which will be // recycled by the RemoteInputChannel - buffer.retain(); - inputChannel.onBuffer(buffer, j); + inputChannel.onBuffer(buffer.retain(), j); } if (inputChannel.isReleased()) {