From fca2dca12efdb86517c12209b07e03811e5d0c30 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 22 Feb 2018 14:11:13 +0100 Subject: [PATCH 1/6] [hotfix][network] rename RecordWriter#closeBufferConsumer() to closeBufferBuilder() --- .../flink/runtime/io/network/api/writer/RecordWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 4ec28631e07f4..c35c7f3e8aa90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -175,7 +175,7 @@ public void flushAll() { public void clearBuffers() { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer serializer = serializers[targetChannel]; - closeBufferConsumer(targetChannel); + closeBufferBuilder(targetChannel); serializer.clear(); } } @@ -213,7 +213,7 @@ private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOExcept return bufferBuilder; } - private void closeBufferConsumer(int targetChannel) { + private void closeBufferBuilder(int targetChannel) { if (bufferBuilders[targetChannel].isPresent()) { bufferBuilders[targetChannel].get().finish(); bufferBuilders[targetChannel] = Optional.empty(); From 979b3e74370f050457af305c3a8a4818483c2763 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 22 Feb 2018 14:17:06 +0100 Subject: [PATCH 2/6] [hotfix][network] various minor improvements --- .../netty/CreditBasedSequenceNumberingViewReader.java | 3 ++- .../runtime/io/network/netty/PartitionRequestQueue.java | 5 ++++- .../io/network/netty/SequenceNumberingViewReader.java | 1 + .../runtime/io/network/partition/SubpartitionTestBase.java | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index 9acbbacf2735b..8fc7ef4842b48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -114,7 +114,7 @@ public boolean isRegisteredAsAvailable() { */ @Override public boolean isAvailable() { - // BEWARE: this must be in sync with #isAvailable()! + // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! return hasBuffersAvailable() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -130,6 +130,7 @@ public boolean isAvailable() { * current buffer and backlog including information about the next buffer */ private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + // BEWARE: this must be in sync with #isAvailable()! return bufferAndBacklog.isMoreAvailable() && (numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index d63a88e718276..8c05b8208f90f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -38,6 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Set; @@ -52,7 +54,7 @@ */ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { - private final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class); + private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class); private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); @@ -278,6 +280,7 @@ private void registerAvailableReader(NetworkSequenceViewReader reader) { reader.setRegisteredAsAvailable(true); } + @Nullable private NetworkSequenceViewReader pollAvailableReader() { NetworkSequenceViewReader reader = availableReaders.poll(); if (reader != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 6a83af1383782..054046f086a75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -148,6 +148,7 @@ public String toString() { "requestLock=" + requestLock + ", receiverId=" + receiverId + ", sequenceNumber=" + sequenceNumber + + ", isRegisteredAsAvailable=" + isRegisteredAvailable + '}'; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 215726b3b5ad1..a3f18f6c18cea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -134,7 +134,7 @@ private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) assertTrue(view.isReleased()); } - protected void assertNextBuffer( + static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, From 140a88593e37bf9c8807ccb46c4cce44d2becbef Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 23 Feb 2018 10:35:41 +0100 Subject: [PATCH 3/6] [hotfix][network][tests] make AwaitableBufferAvailablityListener thread-safe This is called asynchronously by the spill writer and thus may need synchronization on incrementing the counter but definately had visibility issues with the counter. Using an AtomicLong fixes that. --- .../AwaitableBufferAvailablityListener.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java index 2b6b834c0b560..6cf9d64f1b3bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java @@ -18,29 +18,31 @@ package org.apache.flink.runtime.io.network.partition; +import java.util.concurrent.atomic.AtomicLong; + /** * Test implementation of {@link BufferAvailabilityListener}. */ class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { - private long numNotifications; + private final AtomicLong numNotifications = new AtomicLong(); @Override public void notifyDataAvailable() { - ++numNotifications; + numNotifications.getAndIncrement(); } public long getNumNotifications() { - return numNotifications; + return numNotifications.get(); } public void resetNotificationCounters() { - numNotifications = 0; + numNotifications.set(0L); } void awaitNotifications(long awaitedNumNotifications, long timeoutMillis) throws InterruptedException { long deadline = System.currentTimeMillis() + timeoutMillis; - while (numNotifications < awaitedNumNotifications && System.currentTimeMillis() < deadline) { + while (numNotifications.get() < awaitedNumNotifications && System.currentTimeMillis() < deadline) { Thread.sleep(1); } } From e12ea80ecf246ecf91075d1752849a62eb9823b9 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 23 Feb 2018 10:19:58 +0100 Subject: [PATCH 4/6] [FLINK-8755][network] fix SpilledSubpartitionView relying on the backlog for determining whether more data is available Fix SpilledSubpartitionView#getNextBuffer() to not only rely on the backlog: instead it is sufficient to also return true if the next buffer is an event since either there is a real buffer enqueued (reflected by the backlog) or at least one event. --- .../runtime/io/network/partition/SpilledSubpartitionView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 378b0867d6fee..2a6a71f05d667 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -148,7 +148,7 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException } int newBacklog = parent.decreaseBuffersInBacklog(current); - return new BufferAndBacklog(current, newBacklog > 0, newBacklog, nextBufferIsEvent); + return new BufferAndBacklog(current, newBacklog > 0 || nextBufferIsEvent, newBacklog, nextBufferIsEvent); } @Nullable From a3f9ce62cd01dc06eb3079e8dcecddebcac1597d Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 26 Feb 2018 16:27:44 +0100 Subject: [PATCH 5/6] [FLINK-8786][network] fix SpillableSubpartitionView#getNextBuffer returning wrong isMoreAvailable when processing last in-memory buffer When processing the last in-memory buffer in SpillableSubpartitionView#getNextBuffer while the rest of the buffers are spilled, need to rely on the spilled view's isAvailable instead of always setting the isMoreAvailable flag of the returned BufferAndBacklog to false. --- .../runtime/io/network/partition/SpillableSubpartitionView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 3c73e43d8cb9f..0f51bc8b03e37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -177,7 +177,7 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException SpilledSubpartitionView spilled = spilledView; if (spilled != null) { if (current != null) { - return new BufferAndBacklog(current, isMoreAvailable, newBacklog, spilled.nextBufferIsEvent()); + return new BufferAndBacklog(current, spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent()); } else { return spilled.getNextBuffer(); } From a5a17145f5b763a994332d6effcf4582cb20079c Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 23 Feb 2018 12:13:20 +0100 Subject: [PATCH 6/6] [FLINK-8755][FLINK-8786][network] add and improve subpartition tests + also improve the subpartition tests in general to reduce some duplication --- .../partition/SpillableSubpartitionView.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 11 +- .../partition/SpillableSubpartitionTest.java | 130 ++++++------------ .../partition/SubpartitionTestBase.java | 78 ++++++++++- 4 files changed, 121 insertions(+), 100 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 0f51bc8b03e37..65790d79df28d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -167,7 +167,7 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException parent.updateStatistics(current); // if we are spilled (but still process a non-spilled nextBuffer), we don't know the - // state of nextBufferIsEvent... + // state of nextBufferIsEvent or whether more buffers are available if (spilledView == null) { return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index ee678abc4ccc4..bc66c9d292dc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -135,7 +135,8 @@ public void testAddNonEmptyNotFinishedBuffer() throws Exception { bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024)); subpartition.add(bufferBuilder.createBufferConsumer()); - assertNextBuffer(readView, 1024, false, 1); + // note that since the buffer builder is not finished, there is still a retained instance! + assertNextBuffer(readView, 1024, false, 1, false, false); assertEquals(1, subpartition.getBuffersInBacklog()); } finally { readView.releaseAllResources(); @@ -157,7 +158,7 @@ public void testUnfinishedBufferBehindFinished() throws Exception { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished - assertNextBuffer(readView, 1025, false, 1); + assertNextBuffer(readView, 1025, false, 1, false, true); } finally { subpartition.release(); } @@ -178,8 +179,8 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished subpartition.flush(); - assertNextBuffer(readView, 1025, true, 1); - assertNextBuffer(readView, 1024, false, 1); + assertNextBuffer(readView, 1025, true, 1, false, true); + assertNextBuffer(readView, 1024, false, 1, false, false); } finally { subpartition.release(); } @@ -208,7 +209,7 @@ public void testMultipleEmptyBuffers() throws Exception { subpartition.add(createFilledBufferConsumer(1024)); assertEquals(2, availablityListener.getNumNotifications()); - assertNextBuffer(readView, 1024, false, 0); + assertNextBuffer(readView, 1024, false, 0, false, true); } finally { readView.releaseAllResources(); subpartition.release(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index e41a85c5207b4..840669e7c3fcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -24,13 +24,13 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.junit.AfterClass; import org.junit.Assert; @@ -52,7 +52,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -190,10 +189,13 @@ public void testConsumeSpilledPartition() throws Exception { SpillableSubpartition partition = createSubpartition(); BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE); + BufferConsumer eventBufferConsumer = + EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1)); + final int eventSize = eventBufferConsumer.getWrittenBytes(); partition.add(bufferConsumer.copy()); partition.add(bufferConsumer.copy()); - partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE)); + partition.add(eventBufferConsumer); partition.add(bufferConsumer); assertEquals(4, partition.getTotalNumberOfBuffers()); @@ -207,13 +209,13 @@ public void testConsumeSpilledPartition() throws Exception { // still same statistics assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes()); partition.finish(); // + one EndOfPartitionEvent assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); @@ -221,59 +223,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // end of partition event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertEquals(EndOfPartitionEvent.class, - EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); // finally check that the bufferConsumer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs @@ -292,10 +259,13 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception SpillableSubpartition partition = createSubpartition(); BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE); + BufferConsumer eventBufferConsumer = + EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1)); + final int eventSize = eventBufferConsumer.getWrittenBytes(); partition.add(bufferConsumer.copy()); partition.add(bufferConsumer.copy()); - partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE)); + partition.add(eventBufferConsumer); partition.add(bufferConsumer); partition.finish(); @@ -311,17 +281,12 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertFalse(bufferConsumer.isRecycled()); assertFalse(reader.nextBufferIsEvent()); - BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled) - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + // first buffer (non-spilled) + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false); assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertTrue(read.isMoreAvailable()); assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification assertFalse(bufferConsumer.isRecycled()); - assertFalse(read.nextBufferIsEvent()); // Spill now assertEquals(3, partition.releaseMemory()); @@ -330,59 +295,44 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(2, partition.getBuffersInBacklog()); // only updated when getting/spilling the buffers but without the nextBuffer (kept in memory) - assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes()); + // wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!) listener.awaitNotifications(2, 30_000); // Spiller finished assertEquals(2, listener.getNumNotifications()); + // after consuming and releasing the next buffer, the bufferConsumer may be freed, + // depending on the timing of the last write operation + // -> retain once so that we can check below + Buffer buffer = bufferConsumer.build(); + buffer.retainBuffer(); + assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - // now the bufferConsumer may be freed, depending on the timing of the write operation - // -> let's do this check at the end of the test (to save some time) - assertTrue(read.nextBufferIsEvent()); + + bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!) assertTrue(reader.nextBufferIsEvent()); // the event (spilled) - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled) - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); + + buffer.recycleBuffer(); + assertTrue(buffer.isRecycled()); // End of partition assertTrue(reader.nextBufferIsEvent()); - read = reader.getNextBuffer(); - assertNotNull(read); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertEquals(EndOfPartitionEvent.class, - EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); // finally check that the bufferConsumer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index a3f18f6c18cea..8c902157da977 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -18,19 +18,26 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nullable; + import java.io.IOException; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -138,11 +145,74 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + @Nullable Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + try { + assertEquals("buffer size", expectedReadableBufferSize, + bufferAndBacklog.buffer().readableBytes()); + assertEquals("buffer or event", expectedIsBuffer, + bufferAndBacklog.buffer().isBuffer()); + if (expectedEventClass != null) { + assertThat(EventSerializer + .fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()), + instanceOf(expectedEventClass)); + } + assertEquals("more available", expectedIsMoreAvailable, + bufferAndBacklog.isMoreAvailable()); + assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable()); + assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertEquals("next is event", expectedNextBufferIsEvent, + bufferAndBacklog.nextBufferIsEvent()); + + assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled()); + } finally { + bufferAndBacklog.buffer().recycleBuffer(); + } + assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled()); } protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {