From 3ffa692f9b7e4fac8be729f7ed605a9568365f8b Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 13 Dec 2017 15:28:08 +0100 Subject: [PATCH 1/4] [hotfix][checkstyle] only ignore checkstyle in existing packages under runtime.io.network - ignore runtime.io.(async|disk) - ignore runtime.io.network.(api|buffer|netty|partition|serialization|util) -> everything else will be checked against the ruleset - fix checkstyle errors in classes directly under runtime.io.network --- .../apache/flink/runtime/io/network/ConnectionID.java | 2 +- .../flink/runtime/io/network/ConnectionManager.java | 2 +- .../runtime/io/network/DataExchangeModeTest.java | 1 + .../io/network/DefaultChannelSelectorTest.java | 8 ++++---- .../runtime/io/network/MockNetworkEnvironment.java | 4 +++- .../runtime/io/network/NetworkEnvironmentTest.java | 5 ++--- tools/maven/suppressions-runtime.xml | 11 +++++++++-- 7 files changed, 21 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java index cc2a19db00a41..b96fb571ef08c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java @@ -32,7 +32,7 @@ * a connection index. This allows multiple connections to the same task manager to be distinguished * by their connection index. * - *

The connection index is assigned by the {@link IntermediateResult} and ensures that it is + *

The connection index is assigned by the {@link IntermediateResult} and ensures that it is * safe to multiplex multiple data transfers over the same physical TCP connection. */ public class ConnectionID implements Serializable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 1225230f25433..8e0f36c34fc10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -38,7 +38,7 @@ void start(ResultPartitionProvider partitionProvider, PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException; /** - * Closes opened ChannelConnections in case of a resource release + * Closes opened ChannelConnections in case of a resource release. */ void closeOpenChannelConnections(ConnectionID connectionId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java index cae80e864f024..14176c5ab76e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.ExecutionMode; + import org.junit.Test; import static org.junit.Assert.assertNotNull; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java index a54f5d90b913e..e090c7fd27f20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java @@ -18,20 +18,20 @@ package org.apache.flink.runtime.io.network; -import static org.junit.Assert.assertEquals; - import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; import org.apache.flink.types.StringValue; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * This class checks the functionality of the {@link RoundRobinChannelSelector} class. - * */ public class DefaultChannelSelectorTest { /** - * This test checks the channel selection + * This test checks the channel selection. */ @Test public void channelSelect() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java index 3bbe6d57ecef0..7551545ad8275 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java @@ -21,11 +21,13 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Wrapper class to get a mocked {@link NetworkEnvironment}. + */ public class MockNetworkEnvironment { private static NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 4964be798a062..2a54769ba079e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartition; @@ -49,9 +48,9 @@ * Various tests for the {@link NetworkEnvironment} class. */ public class NetworkEnvironmentTest { - private final static int numBuffers = 1024; + private static final int numBuffers = 1024; - private final static int memorySegmentSize = 128; + private static final int memorySegmentSize = 128; /** * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 8f0162d8c9ee0..16f444cfa204b 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -80,11 +80,18 @@ under the License. files="(.*)test[/\\](.*)runtime[/\\]instance[/\\](.*)" checks="AvoidStarImport"/> + + + Date: Thu, 14 Dec 2017 17:30:19 +0100 Subject: [PATCH 2/4] [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks This allows us to use the output flushing interval as a parameter to evaluate, too. --- .../NetworkBenchmarkEnvironment.java | 278 ------------------ .../io}/benchmark/LongRecordWriterThread.java | 2 +- .../runtime/io}/benchmark/ReceiverThread.java | 2 +- .../benchmark/SerializingLongReceiver.java | 2 +- .../StreamNetworkBenchmarkEnvironment.java | 257 +++++++++++++++- .../StreamNetworkPointToPointBenchmark.java | 3 +- .../StreamNetworkThroughputBenchmark.java | 12 +- ...StreamNetworkThroughputBenchmarkTests.java | 22 +- 8 files changed, 269 insertions(+), 309 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/LongRecordWriterThread.java (97%) rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/ReceiverThread.java (98%) rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/SerializingLongReceiver.java (97%) rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java (88%) rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java (68%) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java deleted file mode 100644 index ff06187fc7bf6..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionLocation; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ConnectionID; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Arrays; - -import static org.apache.flink.util.ExceptionUtils.suppressExceptions; - -/** - * Context for network benchmarks executed by the external - * flink-benchmarks project. - */ -public class NetworkBenchmarkEnvironment { - - private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); - - private static final int NUM_SLOTS_AND_THREADS = 1; - - private static final InetAddress LOCAL_ADDRESS; - - static { - try { - LOCAL_ADDRESS = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - throw new Error(e); - } - } - - protected final JobID jobId = new JobID(); - protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); - protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); - - protected NetworkEnvironment senderEnv; - protected NetworkEnvironment receiverEnv; - protected IOManager ioManager; - - protected int channels; - - protected ResultPartitionID[] partitionIds; - - public void setUp(int writers, int channels) throws Exception { - this.channels = channels; - this.partitionIds = new ResultPartitionID[writers]; - - int bufferPoolSize = Math.max(2048, writers * channels * 4); - senderEnv = createNettyNetworkEnvironment(bufferPoolSize); - receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); - ioManager = new IOManagerAsync(); - - senderEnv.start(); - receiverEnv.start(); - - generatePartitionIds(); - } - - public void tearDown() { - suppressExceptions(senderEnv::shutdown); - suppressExceptions(receiverEnv::shutdown); - suppressExceptions(ioManager::shutdown); - } - - public SerializingLongReceiver createReceiver() throws Exception { - TaskManagerLocation senderLocation = new TaskManagerLocation( - ResourceID.generate(), - LOCAL_ADDRESS, - senderEnv.getConnectionManager().getDataPort()); - - InputGate receiverGate = createInputGate( - jobId, - dataSetID, - executionAttemptID, - senderLocation, - receiverEnv, - channels); - - SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); - - receiver.start(); - return receiver; - } - - public RecordWriter createRecordWriter(int partitionIndex) throws Exception { - ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new RecordWriter<>(sender); - } - - private void generatePartitionIds() throws Exception { - for (int writer = 0; writer < partitionIds.length; writer++) { - partitionIds[writer] = new ResultPartitionID(); - } - } - - private NetworkEnvironment createNettyNetworkEnvironment( - @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { - - final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); - - final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( - new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); - - return new NetworkEnvironment( - bufferPool, - nettyConnectionManager, - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOMode.SYNC, - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); - } - - protected ResultPartitionWriter createResultPartition( - JobID jobId, - ResultPartitionID partitionId, - NetworkEnvironment environment, - int channels) throws Exception { - - ResultPartition resultPartition = new ResultPartition( - "sender task", - new NoOpTaskActions(), - jobId, - partitionId, - ResultPartitionType.PIPELINED_BOUNDED, - channels, - 1, - environment.getResultPartitionManager(), - new NoOpResultPartitionConsumableNotifier(), - ioManager, - false); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = resultPartition.getNumberOfSubpartitions() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); - resultPartition.registerBufferPool(bufferPool); - - environment.getResultPartitionManager().registerResultPartition(resultPartition); - - return resultPartition; - } - - private InputGate createInputGate( - JobID jobId, - IntermediateDataSetID dataSetID, - ExecutionAttemptID executionAttemptID, - final TaskManagerLocation senderLocation, - NetworkEnvironment environment, - final int channels) throws IOException { - - InputGate[] gates = new InputGate[channels]; - for (int channel = 0; channel < channels; ++channel) { - int finalChannel = channel; - InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) - .map(partitionId -> new InputChannelDeploymentDescriptor( - partitionId, - ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) - .toArray(InputChannelDeploymentDescriptor[]::new); - - final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( - dataSetID, - ResultPartitionType.PIPELINED_BOUNDED, - channel, - channelDescriptors); - - SingleInputGate gate = SingleInputGate.create( - "receiving task[" + channel + "]", - jobId, - executionAttemptID, - gateDescriptor, - environment, - new NoOpTaskActions(), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = gate.getNumberOfInputChannels() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = - environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); - - gate.setBufferPool(bufferPool); - gates[channel] = gate; - } - - if (channels > 1) { - return new UnionInputGate(gates); - } else { - return gates[0]; - } - } - - // ------------------------------------------------------------------------ - // Mocks - // ------------------------------------------------------------------------ - - /** - * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito - * to avoid using mockito in this benchmark class. - */ - private static final class NoOpTaskActions implements TaskActions { - - @Override - public void triggerPartitionProducerStateCheck( - JobID jobId, - IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId) {} - - @Override - public void failExternally(Throwable cause) {} - } - - private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - - @Override - public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java index 6018e5528a17b..e6cc2d5e97741 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java similarity index 98% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java index be1c80f36ceec..126efefcba6b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.core.testutils.CheckedThread; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java index 848c018a5b7f9..580612ca03bc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index acbbdf82ec70a..83508ea245f2d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -18,23 +18,262 @@ package org.apache.flink.streaming.runtime.io.benchmark; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; + +import static org.apache.flink.util.ExceptionUtils.suppressExceptions; + /** - * Context for stream network benchmarks executed by the external + * Context for network benchmarks executed by the external * flink-benchmarks project. */ -public class StreamNetworkBenchmarkEnvironment extends NetworkBenchmarkEnvironment { +public class StreamNetworkBenchmarkEnvironment { + + private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); + + private static final int NUM_SLOTS_AND_THREADS = 1; + + private static final InetAddress LOCAL_ADDRESS; + + static { + try { + LOCAL_ADDRESS = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new Error(e); + } + } + + protected final JobID jobId = new JobID(); + protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); + protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + + protected NetworkEnvironment senderEnv; + protected NetworkEnvironment receiverEnv; + protected IOManager ioManager; + + protected int channels; + + protected ResultPartitionID[] partitionIds; + + public void setUp(int writers, int channels) throws Exception { + this.channels = channels; + this.partitionIds = new ResultPartitionID[writers]; + + int bufferPoolSize = Math.max(2048, writers * channels * 4); + senderEnv = createNettyNetworkEnvironment(bufferPoolSize); + receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); + ioManager = new IOManagerAsync(); + + senderEnv.start(); + receiverEnv.start(); + + generatePartitionIds(); + } + + public void tearDown() { + suppressExceptions(senderEnv::shutdown); + suppressExceptions(receiverEnv::shutdown); + suppressExceptions(ioManager::shutdown); + } + + public SerializingLongReceiver createReceiver() throws Exception { + TaskManagerLocation senderLocation = new TaskManagerLocation( + ResourceID.generate(), + LOCAL_ADDRESS, + senderEnv.getConnectionManager().getDataPort()); + + InputGate receiverGate = createInputGate( + jobId, + dataSetID, + executionAttemptID, + senderLocation, + receiverEnv, + channels); + + SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); + + receiver.start(); + return receiver; + } + + public StreamRecordWriter createRecordWriter(int partitionIndex, long flushTimeout) throws Exception { + ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); + return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector(), flushTimeout); + } + + private void generatePartitionIds() throws Exception { + for (int writer = 0; writer < partitionIds.length; writer++) { + partitionIds[writer] = new ResultPartitionID(); + } + } + + private NetworkEnvironment createNettyNetworkEnvironment( + @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { + + final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); + + final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( + new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); + + return new NetworkEnvironment( + bufferPool, + nettyConnectionManager, + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOMode.SYNC, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); + } + + protected ResultPartitionWriter createResultPartition( + JobID jobId, + ResultPartitionID partitionId, + NetworkEnvironment environment, + int channels) throws Exception { + + ResultPartition resultPartition = new ResultPartition( + "sender task", + new NoOpTaskActions(), + jobId, + partitionId, + ResultPartitionType.PIPELINED_BOUNDED, + channels, + 1, + environment.getResultPartitionManager(), + new NoOpResultPartitionConsumableNotifier(), + ioManager, + false); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = resultPartition.getNumberOfSubpartitions() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); + resultPartition.registerBufferPool(bufferPool); + + environment.getResultPartitionManager().registerResultPartition(resultPartition); + + return resultPartition; + } + + private InputGate createInputGate( + JobID jobId, + IntermediateDataSetID dataSetID, + ExecutionAttemptID executionAttemptID, + final TaskManagerLocation senderLocation, + NetworkEnvironment environment, + final int channels) throws IOException { + + InputGate[] gates = new InputGate[channels]; + for (int channel = 0; channel < channels; ++channel) { + int finalChannel = channel; + InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) + .map(partitionId -> new InputChannelDeploymentDescriptor( + partitionId, + ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) + .toArray(InputChannelDeploymentDescriptor[]::new); + + final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( + dataSetID, + ResultPartitionType.PIPELINED_BOUNDED, + channel, + channelDescriptors); + + SingleInputGate gate = SingleInputGate.create( + "receiving task[" + channel + "]", + jobId, + executionAttemptID, + gateDescriptor, + environment, + new NoOpTaskActions(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = gate.getNumberOfInputChannels() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = + environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); + + gate.setBufferPool(bufferPool); + gates[channel] = gate; + } + + if (channels > 1) { + return new UnionInputGate(gates); + } else { + return gates[0]; + } + } + + // ------------------------------------------------------------------------ + // Mocks + // ------------------------------------------------------------------------ + + /** + * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito + * to avoid using mockito in this benchmark class. + */ + private static final class NoOpTaskActions implements TaskActions { + + @Override + public void triggerPartitionProducerStateCheck( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) {} + + @Override + public void failExternally(Throwable cause) {} + } + + private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - public RecordWriter createStreamRecordWriter(int partitionIndex, long flushTimeout) - throws Exception { - ResultPartitionWriter sender = - createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout); + @Override + public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index 92864854883f1..843d3e2e1f312 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.benchmark.ReceiverThread; import org.apache.flink.types.LongValue; import java.util.concurrent.CompletableFuture; @@ -74,7 +73,7 @@ public void setUp(long flushTimeout) throws Exception { environment.setUp(1, 1); receiver = environment.createReceiver(); - recordWriter = environment.createStreamRecordWriter(0, flushTimeout); + recordWriter = environment.createRecordWriter(0, flushTimeout); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java similarity index 88% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java index 799b7c3093c56..3f41b0087f8d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.types.LongValue; @@ -27,10 +27,10 @@ * Network throughput benchmarks executed by the external * flink-benchmarks project. */ -public class NetworkThroughputBenchmark { +public class StreamNetworkThroughputBenchmark { private static final long RECEIVER_TIMEOUT = 30_000; - private NetworkBenchmarkEnvironment environment; + private StreamNetworkBenchmarkEnvironment environment; private ReceiverThread receiver; private LongRecordWriterThread[] writerThreads; @@ -63,13 +63,13 @@ public void executeBenchmark(long records) throws Exception { * @param channels * number of outgoing channels / receivers */ - public void setUp(int recordWriters, int channels) throws Exception { - environment = new NetworkBenchmarkEnvironment<>(); + public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception { + environment = new StreamNetworkBenchmarkEnvironment<>(); environment.setUp(recordWriters, channels); receiver = environment.createReceiver(); writerThreads = new LongRecordWriterThread[recordWriters]; for (int writer = 0; writer < recordWriters; writer++) { - writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer)); + writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout)); writerThreads[writer].start(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java similarity index 68% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java index c84743be843c9..8af8148bc65f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java @@ -16,18 +16,18 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.junit.Test; /** - * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}. + * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}. */ -public class NetworkThroughputBenchmarkTests { +public class StreamNetworkThroughputBenchmarkTests { @Test public void pointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 1); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 1, 100); try { benchmark.executeBenchmark(1_000); } @@ -38,8 +38,8 @@ public void pointToPointBenchmark() throws Exception { @Test public void pointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 100); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 100, 100); try { benchmark.executeBenchmark(1_000); } @@ -50,8 +50,8 @@ public void pointToMultiPointBenchmark() throws Exception { @Test public void multiPointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 1); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 1, 100); try { benchmark.executeBenchmark(1_000); } @@ -62,8 +62,8 @@ public void multiPointToPointBenchmark() throws Exception { @Test public void multiPointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 100); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 100, 100); try { benchmark.executeBenchmark(1_000); } From b2aec32745544eb5469ed2bbdb190f9f2fb5dd21 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Thu, 17 Aug 2017 19:38:45 +0800 Subject: [PATCH 3/4] [FLINK-7468][network] Implement sender backlog logic for credit-based --- .../netty/SequenceNumberingViewReader.java | 6 +- .../partition/PipelinedSubpartition.java | 54 ++++++++++- .../partition/PipelinedSubpartitionView.java | 6 +- .../network/partition/ResultSubpartition.java | 37 +++++++ .../partition/ResultSubpartitionView.java | 14 +-- .../partition/SpillableSubpartition.java | 47 ++++++++- .../partition/SpillableSubpartitionView.java | 8 +- .../partition/SpilledSubpartitionView.java | 12 ++- .../partition/consumer/InputChannel.java | 12 ++- .../partition/consumer/LocalInputChannel.java | 8 +- .../consumer/RemoteInputChannel.java | 2 +- .../netty/CancelPartitionRequestTest.java | 8 +- .../partition/PipelinedSubpartitionTest.java | 23 ++++- .../partition/SpillableSubpartitionTest.java | 96 +++++++++++++------ .../SpilledSubpartitionViewTest.java | 6 +- .../partition/SubpartitionTestBase.java | 20 +++- .../IteratorWrappingTestSingleInputGate.java | 4 +- .../consumer/SingleInputGateTest.java | 3 +- .../partition/consumer/TestInputChannel.java | 6 +- .../util/TestSubpartitionConsumer.java | 14 +-- .../consumer/StreamTestSingleInputGate.java | 6 +- 21 files changed, 309 insertions(+), 83 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 6d95ca5040364..fcbfb21f38407 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -86,13 +86,13 @@ int getSequenceNumber() { } public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { - Buffer next = subpartitionView.getNextBuffer(); + BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { long remaining = numBuffersAvailable.decrementAndGet(); sequenceNumber++; if (remaining >= 0) { - return new BufferAndAvailability(next, remaining > 0); + return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); } else { throw new IllegalStateException("no buffer available"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index c1d6f133cff16..2d78bc338fdc1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -25,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayDeque; @@ -52,6 +55,10 @@ class PipelinedSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private int buffersInBacklog; + // ------------------------------------------------------------------------ PipelinedSubpartition(int index, ResultPartition parent) { @@ -75,6 +82,7 @@ public boolean add(Buffer buffer) throws IOException { buffers.add(buffer); reader = readView; updateStatistics(buffer); + increaseBuffersInBacklog(buffer); } // Notify the listener outside of the synchronized block @@ -144,9 +152,17 @@ public void release() { } } - Buffer pollBuffer() { + @Nullable + BufferAndBacklog pollBuffer() { synchronized (buffers) { - return buffers.pollFirst(); + Buffer buffer = buffers.pollFirst(); + decreaseBuffersInBacklog(buffer); + + if (buffer != null) { + return new BufferAndBacklog(buffer, buffersInBacklog); + } else { + return null; + } } } @@ -162,6 +178,36 @@ public boolean isReleased() { return isReleased; } + @Override + @VisibleForTesting + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + /** + * Decreases the number of non-event buffers by one after fetching a non-event + * buffer from this subpartition. + */ + private void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + /** + * Increases the number of non-event buffers by one after adding a non-event + * buffer into this subpartition. + */ + private void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } + @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { final int queueSize; @@ -206,8 +252,8 @@ public String toString() { } return String.format( - "PipelinedSubpartition [number of buffers: %d (%d bytes), finished? %s, read view? %s]", - numBuffers, numBytes, finished, hasReadView); + "PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]", + numBuffers, numBytes, buffersInBacklog, finished, hasReadView); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index fda21355e4e0d..2aafd3f2c712d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -18,8 +18,9 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,8 +45,9 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { this.isReleased = new AtomicBoolean(); } + @Nullable @Override - public Buffer getNextBuffer() { + public BufferAndBacklog getNextBuffer() { return parent.pollBuffer(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index e73082a7d7288..e42bd90212794 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -18,10 +18,13 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.buffer.Buffer; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A single subpartition of a {@link ResultPartition} instance. */ @@ -94,6 +97,15 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** + * Gets the number of non-event buffers in this subpartition. + * + *

Beware: This method should only be used in tests in non-concurrent access + * scenarios since it does not make any concurrency guarantees. + */ + @VisibleForTesting + abstract public int getBuffersInBacklog(); + /** * Makes a best effort to get the current size of the queue. * This method must not acquire locks or interfere with the task and network threads in @@ -101,4 +113,29 @@ protected Throwable getFailureCause() { */ abstract public int unsynchronizedGetNumberOfQueuedBuffers(); + // ------------------------------------------------------------------------ + + /** + * A combination of a {@link Buffer} and the backlog length indicating + * how many non-event buffers are available in the subpartition. + */ + public static final class BufferAndBacklog { + + private final Buffer buffer; + private final int buffersInBacklog; + + public BufferAndBacklog(Buffer buffer, int buffersInBacklog) { + this.buffer = checkNotNull(buffer); + this.buffersInBacklog = buffersInBacklog; + } + + public Buffer buffer() { + return buffer; + } + + public int buffersInBacklog() { + return buffersInBacklog; + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index 98be90fa840d0..fb315921caede 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -29,16 +31,17 @@ public interface ResultSubpartitionView { /** * Returns the next {@link Buffer} instance of this queue iterator. - *

- * If there is currently no instance available, it will return null. + * + *

If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. - *

- * Important: The consumer has to make sure that each + * + *

Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; + @Nullable + BufferAndBacklog getNextBuffer() throws IOException, InterruptedException; void notifyBuffersAvailable(long buffers) throws IOException; @@ -49,5 +52,4 @@ public interface ResultSubpartitionView { boolean isReleased(); Throwable getFailureCause(); - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 4a8e165d11230..e977f60bd1203 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -28,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayDeque; @@ -77,6 +79,10 @@ class SpillableSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private int buffersInBacklog; + /** The read view to consume this subpartition. */ private ResultSubpartitionView readView; @@ -102,6 +108,7 @@ public boolean add(Buffer buffer) throws IOException { // the read views. If you ever remove this line here, // make sure to still count the number of buffers. updateStatistics(buffer); + increaseBuffersInBacklog(buffer); return true; } @@ -114,6 +121,7 @@ public boolean add(Buffer buffer) throws IOException { synchronized (buffers) { // See the note above, but only do this if the buffer was correctly added! updateStatistics(buffer); + increaseBuffersInBacklog(buffer); } } finally { buffer.recycle(); @@ -246,6 +254,39 @@ public boolean isReleased() { return isReleased; } + @Override + @VisibleForTesting + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + /** + * Decreases the number of non-event buffers by one after fetching a non-event + * buffer from this subpartition (for access by the subpartition views). + * + * @return backlog after the operation + */ + public int decreaseBuffersInBacklog(Buffer buffer) { + synchronized (buffers) { + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + return buffersInBacklog; + } + } + + /** + * Increases the number of non-event buffers by one after adding a non-event + * buffer into this subpartition. + */ + private void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } + @Override public int unsynchronizedGetNumberOfQueuedBuffers() { // since we do not synchronize, the size may actually be lower than 0! @@ -255,9 +296,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { @Override public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + - "finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, - spillWriter != null); + "%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), + buffersInBacklog, isFinished, readView != null, spillWriter != null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 6781902ca9066..2527273b1b517 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -128,8 +130,9 @@ int releaseMemory() throws IOException { } } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { synchronized (buffers) { if (isReleased.get()) { return null; @@ -141,7 +144,8 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { listener.notifyBuffersAvailable(1); } - return current; + int newBacklog = parent.decreaseBuffersInBacklog(current); + return new BufferAndBacklog(current, newBacklog); } } // else: spilled diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index fec0f2ac1dce7..20e0406a443ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; @@ -51,7 +53,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class); /** The subpartition this view belongs to. */ - private final ResultSubpartition parent; + private final SpillableSubpartition parent; /** Writer for spills. */ private final BufferFileWriter spillWriter; @@ -75,7 +77,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis private volatile boolean isSpillInProgress = true; SpilledSubpartitionView( - ResultSubpartition parent, + SpillableSubpartition parent, int memorySegmentSize, BufferFileWriter spillWriter, long numberOfSpilledBuffers, @@ -113,8 +115,9 @@ public void onNotification() { LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers); } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { if (fileReader.hasReachedEndOfFile() || isSpillInProgress) { return null; } @@ -124,7 +127,8 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { Buffer buffer = bufferPool.requestBufferBlocking(); fileReader.readInto(buffer); - return buffer; + int newBacklog = parent.decreaseBuffersInBacklog(buffer); + return new BufferAndBacklog(buffer, newBacklog); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 68b05d45aa8a7..7b7edf7530ec7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -242,16 +242,20 @@ else if (currentBackoff < maxBackoff) { // ------------------------------------------------------------------------ /** - * A combination of a {@link Buffer} and a flag indicating availability of further buffers. + * A combination of a {@link Buffer} and a flag indicating availability of further buffers, + * and the backlog length indicating how many non-event buffers are available in the + * subpartition. */ public static final class BufferAndAvailability { private final Buffer buffer; private final boolean moreAvailable; + private final int buffersInBacklog; - public BufferAndAvailability(Buffer buffer, boolean moreAvailable) { + public BufferAndAvailability(Buffer buffer, boolean moreAvailable, int buffersInBacklog) { this.buffer = checkNotNull(buffer); this.moreAvailable = moreAvailable; + this.buffersInBacklog = buffersInBacklog; } public Buffer buffer() { @@ -261,5 +265,9 @@ public Buffer buffer() { public boolean moreAvailable() { return moreAvailable; } + + public int buffersInBacklog() { + return buffersInBacklog; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 71b36532bc4e0..85056660c232e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -21,12 +21,12 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.slf4j.Logger; @@ -179,7 +179,7 @@ BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { subpartitionView = checkAndWaitForSubpartitionView(); } - Buffer next = subpartitionView.getNextBuffer(); + BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next == null) { if (subpartitionView.isReleased()) { @@ -195,8 +195,8 @@ BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { long remaining = numBuffersAvailable.decrementAndGet(); if (remaining >= 0) { - numBytesIn.inc(next.getSizeUnsafe()); - return new BufferAndAvailability(next, remaining > 0); + numBytesIn.inc(next.buffer().getSizeUnsafe()); + return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); } else if (subpartitionView.isReleased()) { throw new ProducerFailedException(subpartitionView.getFailureCause()); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 7605075c6f150..397f407bb63ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -196,7 +196,7 @@ BufferAndAvailability getNextBuffer() throws IOException { } numBytesIn.inc(next.getSizeUnsafe()); - return new BufferAndAvailability(next, remaining > 0); + return new BufferAndAvailability(next, remaining > 0, getSenderBacklog()); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index c9f063b6dfe3a..4acdb36943471 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; @@ -36,6 +36,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -185,9 +186,10 @@ public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sy this.sync = checkNotNull(sync); } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { - return bufferProvider.requestBufferBlocking(); + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { + return new BufferAndBacklog(bufferProvider.requestBufferBlocking(), 0); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 6d36aa6b87599..9edba35acd641 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; @@ -106,18 +107,38 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result - assertNotNull(view.getNextBuffer()); + BufferAndBacklog read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + + // Add event to the queue... + Buffer event = createBuffer(); + event.tagAsEvent(); + subpartition.add(event); + + assertEquals(3, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index c50b36125cd31..57aa82fe2e27b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; @@ -203,6 +204,10 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(3, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); // now the buffer may be freed, depending on the timing of the write operation @@ -211,44 +216,65 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(3, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); // + one EndOfPartitionEvent assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener()); SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); - Buffer read = reader.getNextBuffer(); + BufferAndBacklog read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(1, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); - assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertEquals(EndOfPartitionEvent.class, + EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // finally check that the buffer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs @@ -277,6 +303,10 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); @@ -284,9 +314,12 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertEquals(1, listener.getNumNotifiedBuffers()); assertFalse(buffer.isRecycled()); - Buffer read = reader.getNextBuffer(); - assertSame(buffer, read); - read.recycle(); + BufferAndBacklog read = reader.getNextBuffer(); + assertNotNull(read); + assertSame(buffer, read.buffer()); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + read.buffer().recycle(); assertEquals(2, listener.getNumNotifiedBuffers()); assertFalse(buffer.isRecycled()); @@ -295,31 +328,40 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertFalse(buffer.isRecycled()); // still one in the reader! // still same statistics: assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(2, partition.getBuffersInBacklog()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); listener.awaitNotifications(4, 30_000); assertEquals(4, listener.getNumNotifiedBuffers()); read = reader.getNextBuffer(); - assertSame(buffer, read); - read.recycle(); + assertNotNull(read); + assertEquals(1, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertSame(buffer, read.buffer()); + read.buffer().recycle(); // now the buffer may be freed, depending on the timing of the write operation // -> let's do this check at the end of the test (to save some time) read = reader.getNextBuffer(); assertNotNull(read); - assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNotSame(buffer, read.buffer()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); - assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertEquals(EndOfPartitionEvent.class, + EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // finally check that the buffer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java index 69d19fccc20f2..89a4e03b26158 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java @@ -73,7 +73,7 @@ public void testWriteConsume() throws Exception { false, new TestConsumerCallback.RecyclingCallback()); SpilledSubpartitionView view = new SpilledSubpartitionView( - mock(ResultSubpartition.class), + mock(SpillableSubpartition.class), viewBufferPool.getMemorySegmentSize(), writer, numberOfBuffersToWrite + 1, // +1 for end-of-partition @@ -99,7 +99,7 @@ public void testConsumeWithFewBuffers() throws Exception { false, new TestConsumerCallback.RecyclingCallback()); SpilledSubpartitionView view = new SpilledSubpartitionView( - mock(ResultSubpartition.class), + mock(SpillableSubpartition.class), 32 * 1024, writer, numberOfBuffersToWrite + 1, @@ -140,7 +140,7 @@ public void testReadMultipleFilesWithSingleBufferPool() throws Exception { BufferProvider inputBuffers = new TestPooledBufferProvider(2); - ResultSubpartition parent = mock(ResultSubpartition.class); + SpillableSubpartition parent = mock(SpillableSubpartition.class); // Wait for writers to finish for (BufferFileWriter writer : writers) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index d084f6212e0bf..2d56258ee6d2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.util.TestLogger; @@ -51,8 +53,15 @@ public void testAddAfterFinish() throws Exception { assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(4, subpartition.getTotalNumberOfBytes()); - assertFalse(subpartition.add(mock(Buffer.class))); assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + + assertFalse(subpartition.add(buffer)); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(4, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { @@ -70,8 +79,15 @@ public void testAddAfterRelease() throws Exception { assertEquals(0, subpartition.getTotalNumberOfBuffers()); assertEquals(0, subpartition.getTotalNumberOfBytes()); - assertFalse(subpartition.add(mock(Buffer.class))); assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + + assertFalse(subpartition.add(buffer)); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(0, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index 16285b75d5f43..5fe835af78902 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -75,11 +75,11 @@ public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMo hasData = inputIterator.next(reuse) != null; // Call getCurrentBuffer to ensure size is set - return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true); + return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true, 0); } else { when(inputChannel.getInputChannel().isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index da649cd735055..59fa7a3a41442 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -124,7 +125,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class))); + new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)), 0)); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index a6597a20f3f37..43ac7a1b2b000 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -55,9 +55,9 @@ public TestInputChannel(SingleInputGate inputGate, int channelIndex) { public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); + stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); } else { - stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); + stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); } return this; @@ -77,7 +77,7 @@ public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMo // Return true after finishing when(mock.isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java index 676a304f7a20c..37137ffbed446 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java @@ -21,8 +21,8 @@ import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import java.util.Random; @@ -92,24 +92,24 @@ public Boolean call() throws Exception { } } - final Buffer buffer = subpartitionView.getNextBuffer(); + final BufferAndBacklog bufferAndBacklog = subpartitionView.getNextBuffer(); if (isSlowConsumer) { Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1)); } - if (buffer != null) { + if (bufferAndBacklog != null) { numBuffersAvailable.decrementAndGet(); - if (buffer.isBuffer()) { - callback.onBuffer(buffer); + if (bufferAndBacklog.buffer().isBuffer()) { + callback.onBuffer(bufferAndBacklog.buffer()); } else { - final AbstractEvent event = EventSerializer.fromBuffer(buffer, + final AbstractEvent event = EventSerializer.fromBuffer(bufferAndBacklog.buffer(), getClass().getClassLoader()); callback.onEvent(event); - buffer.recycle(); + bufferAndBacklog.buffer().recycle(); if (event.getClass() == EndOfPartitionEvent.class) { subpartitionView.notifySubpartitionConsumed(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index f19d59d44a847..11d8f119755f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -98,7 +98,7 @@ public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Th if (input != null && input.isStreamEnd()) { when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn( true); - return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); @@ -107,10 +107,10 @@ public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Th recordSerializer.addRecord(delegate); // Call getCurrentBuffer to ensure size is set - return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false); + return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false, 0); } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); - return new BufferAndAvailability(EventSerializer.toBuffer(event), false); + return new BufferAndAvailability(EventSerializer.toBuffer(event), false, 0); } else { synchronized (inputQueues[channelIndex]) { inputQueues[channelIndex].wait(); From 31510a7eedf41a1283bd89d736095d60fdaee3c3 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Fri, 5 Jan 2018 15:28:40 +0100 Subject: [PATCH 4/4] [FLINK-8375][network] Remove unnecessary synchronization Synchronized blocks in ResultPartition could affect only: 1. totalNumberOfBuffers and totalNumberOfBytes counters 2. subpartition add(), finish() and release() calls. However: 1. counters were not used anywhere - they are removed by this commit 2a. add(), finish() and release() methods for PipelinedSubpartition were already threads safe 2b. add(), finish() and release() methods for SpillableSubpartition were made thread safe in this commit, by basically pushing synchronized section down one level. --- .../io/network/partition/ResultPartition.java | 42 ++----------------- .../partition/SpillableSubpartition.java | 12 ++++-- .../partition/ResultPartitionTest.java | 2 - 3 files changed, 12 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index ea2cca521fc1a..01c8bfc2416f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -120,14 +120,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { private volatile Throwable cause; - // - Statistics ---------------------------------------------------------- - - /** The total number of buffers (both data and event buffers) */ - private int totalNumberOfBuffers; - - /** The total number of bytes (both data and event buffers) */ - private long totalNumberOfBytes; - public ResultPartition( String owningTaskName, TaskActions taskActions, // actions on the owning task @@ -224,24 +216,6 @@ public BufferPool getBufferPool() { return bufferPool; } - /** - * Returns the total number of processed network buffers since initialization. - * - * @return overall number of processed network buffers - */ - public int getTotalNumberOfBuffers() { - return totalNumberOfBuffers; - } - - /** - * Returns the total size of processed network buffers since initialization. - * - * @return overall size of processed network buffers - */ - public long getTotalNumberOfBytes() { - return totalNumberOfBytes; - } - public int getNumberOfQueuedBuffers() { int totalBuffers = 0; @@ -275,13 +249,7 @@ public void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException // retain for buffer use after add() but also to have a simple path for recycle() buffer.retain(); - synchronized (subpartition) { - success = subpartition.add(buffer); - - // Update statistics - totalNumberOfBuffers++; - totalNumberOfBytes += buffer.getSize(); - } + success = subpartition.add(buffer); } finally { if (success) { notifyPipelinedConsumers(); @@ -304,9 +272,7 @@ public void finish() throws IOException { checkInProduceState(); for (ResultSubpartition subpartition : subpartitions) { - synchronized (subpartition) { - subpartition.finish(); - } + subpartition.finish(); } success = true; @@ -339,9 +305,7 @@ public void release(Throwable cause) { // Release all subpartitions for (ResultSubpartition subpartition : subpartitions) { try { - synchronized (subpartition) { - subpartition.release(); - } + subpartition.release(); } // Catch this in order to ensure that release is called on all subpartitions catch (Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index e977f60bd1203..9047e5150f593 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -59,6 +59,12 @@ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable partitions * will be spilled for real-world data sets. + * + *

Note on thread safety. Synchronizing on {@code buffers} is used to synchronize + * writes and reads. Synchronizing on {@code this} is used against concurrent + * {@link #add(Buffer)} and clean ups {@link #release()} / {@link #finish()} which + * also are touching {@code spillWriter}. Since we do not want to block reads during + * spilling, we need those two synchronization. Probably this model could be simplified. */ class SpillableSubpartition extends ResultSubpartition { @@ -93,7 +99,7 @@ class SpillableSubpartition extends ResultSubpartition { } @Override - public boolean add(Buffer buffer) throws IOException { + public synchronized boolean add(Buffer buffer) throws IOException { checkNotNull(buffer); synchronized (buffers) { @@ -131,7 +137,7 @@ public boolean add(Buffer buffer) throws IOException { } @Override - public void finish() throws IOException { + public synchronized void finish() throws IOException { synchronized (buffers) { if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) { isFinished = true; @@ -145,7 +151,7 @@ public void finish() throws IOException { } @Override - public void release() throws IOException { + public synchronized void release() throws IOException { final ResultSubpartitionView view; synchronized (buffers) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 5d24b4aaeff87..4512625954523 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -192,8 +192,6 @@ public void testWriteBufferToAllSubpartitionsReferenceCounting() throws Exceptio partition.writeBufferToAllSubpartitions(buffer); - // Verify added to all queues, i.e. two buffers in total - assertEquals(2, partition.getTotalNumberOfBuffers()); // release the buffers in the partition partition.release();