From 3ffa692f9b7e4fac8be729f7ed605a9568365f8b Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 13 Dec 2017 15:28:08 +0100 Subject: [PATCH 1/2] [hotfix][checkstyle] only ignore checkstyle in existing packages under runtime.io.network - ignore runtime.io.(async|disk) - ignore runtime.io.network.(api|buffer|netty|partition|serialization|util) -> everything else will be checked against the ruleset - fix checkstyle errors in classes directly under runtime.io.network --- .../apache/flink/runtime/io/network/ConnectionID.java | 2 +- .../flink/runtime/io/network/ConnectionManager.java | 2 +- .../runtime/io/network/DataExchangeModeTest.java | 1 + .../io/network/DefaultChannelSelectorTest.java | 8 ++++---- .../runtime/io/network/MockNetworkEnvironment.java | 4 +++- .../runtime/io/network/NetworkEnvironmentTest.java | 5 ++--- tools/maven/suppressions-runtime.xml | 11 +++++++++-- 7 files changed, 21 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java index cc2a19db00a41..b96fb571ef08c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java @@ -32,7 +32,7 @@ * a connection index. This allows multiple connections to the same task manager to be distinguished * by their connection index. * - *

The connection index is assigned by the {@link IntermediateResult} and ensures that it is + *

The connection index is assigned by the {@link IntermediateResult} and ensures that it is * safe to multiplex multiple data transfers over the same physical TCP connection. */ public class ConnectionID implements Serializable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 1225230f25433..8e0f36c34fc10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -38,7 +38,7 @@ void start(ResultPartitionProvider partitionProvider, PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException; /** - * Closes opened ChannelConnections in case of a resource release + * Closes opened ChannelConnections in case of a resource release. */ void closeOpenChannelConnections(ConnectionID connectionId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java index cae80e864f024..14176c5ab76e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.ExecutionMode; + import org.junit.Test; import static org.junit.Assert.assertNotNull; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java index a54f5d90b913e..e090c7fd27f20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java @@ -18,20 +18,20 @@ package org.apache.flink.runtime.io.network; -import static org.junit.Assert.assertEquals; - import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; import org.apache.flink.types.StringValue; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * This class checks the functionality of the {@link RoundRobinChannelSelector} class. - * */ public class DefaultChannelSelectorTest { /** - * This test checks the channel selection + * This test checks the channel selection. */ @Test public void channelSelect() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java index 3bbe6d57ecef0..7551545ad8275 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java @@ -21,11 +21,13 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Wrapper class to get a mocked {@link NetworkEnvironment}. + */ public class MockNetworkEnvironment { private static NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 4964be798a062..2a54769ba079e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartition; @@ -49,9 +48,9 @@ * Various tests for the {@link NetworkEnvironment} class. */ public class NetworkEnvironmentTest { - private final static int numBuffers = 1024; + private static final int numBuffers = 1024; - private final static int memorySegmentSize = 128; + private static final int memorySegmentSize = 128; /** * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 8f0162d8c9ee0..16f444cfa204b 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -80,11 +80,18 @@ under the License. files="(.*)test[/\\](.*)runtime[/\\]instance[/\\](.*)" checks="AvoidStarImport"/> + + + Date: Thu, 14 Dec 2017 17:30:19 +0100 Subject: [PATCH 2/2] [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks This allows us to use the output flushing interval as a parameter to evaluate, too. --- .../NetworkBenchmarkEnvironment.java | 278 ------------------ .../io}/benchmark/LongRecordWriterThread.java | 2 +- .../runtime/io}/benchmark/ReceiverThread.java | 2 +- .../benchmark/SerializingLongReceiver.java | 2 +- .../StreamNetworkBenchmarkEnvironment.java | 257 +++++++++++++++- .../StreamNetworkPointToPointBenchmark.java | 3 +- .../StreamNetworkThroughputBenchmark.java | 12 +- ...StreamNetworkThroughputBenchmarkTests.java | 22 +- 8 files changed, 269 insertions(+), 309 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/LongRecordWriterThread.java (97%) rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/ReceiverThread.java (98%) rename {flink-runtime/src/test/java/org/apache/flink/runtime/io/network => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io}/benchmark/SerializingLongReceiver.java (97%) rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java (88%) rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java (68%) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java deleted file mode 100644 index ff06187fc7bf6..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionLocation; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ConnectionID; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Arrays; - -import static org.apache.flink.util.ExceptionUtils.suppressExceptions; - -/** - * Context for network benchmarks executed by the external - * flink-benchmarks project. - */ -public class NetworkBenchmarkEnvironment { - - private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); - - private static final int NUM_SLOTS_AND_THREADS = 1; - - private static final InetAddress LOCAL_ADDRESS; - - static { - try { - LOCAL_ADDRESS = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - throw new Error(e); - } - } - - protected final JobID jobId = new JobID(); - protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); - protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); - - protected NetworkEnvironment senderEnv; - protected NetworkEnvironment receiverEnv; - protected IOManager ioManager; - - protected int channels; - - protected ResultPartitionID[] partitionIds; - - public void setUp(int writers, int channels) throws Exception { - this.channels = channels; - this.partitionIds = new ResultPartitionID[writers]; - - int bufferPoolSize = Math.max(2048, writers * channels * 4); - senderEnv = createNettyNetworkEnvironment(bufferPoolSize); - receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); - ioManager = new IOManagerAsync(); - - senderEnv.start(); - receiverEnv.start(); - - generatePartitionIds(); - } - - public void tearDown() { - suppressExceptions(senderEnv::shutdown); - suppressExceptions(receiverEnv::shutdown); - suppressExceptions(ioManager::shutdown); - } - - public SerializingLongReceiver createReceiver() throws Exception { - TaskManagerLocation senderLocation = new TaskManagerLocation( - ResourceID.generate(), - LOCAL_ADDRESS, - senderEnv.getConnectionManager().getDataPort()); - - InputGate receiverGate = createInputGate( - jobId, - dataSetID, - executionAttemptID, - senderLocation, - receiverEnv, - channels); - - SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); - - receiver.start(); - return receiver; - } - - public RecordWriter createRecordWriter(int partitionIndex) throws Exception { - ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new RecordWriter<>(sender); - } - - private void generatePartitionIds() throws Exception { - for (int writer = 0; writer < partitionIds.length; writer++) { - partitionIds[writer] = new ResultPartitionID(); - } - } - - private NetworkEnvironment createNettyNetworkEnvironment( - @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { - - final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); - - final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( - new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); - - return new NetworkEnvironment( - bufferPool, - nettyConnectionManager, - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOMode.SYNC, - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); - } - - protected ResultPartitionWriter createResultPartition( - JobID jobId, - ResultPartitionID partitionId, - NetworkEnvironment environment, - int channels) throws Exception { - - ResultPartition resultPartition = new ResultPartition( - "sender task", - new NoOpTaskActions(), - jobId, - partitionId, - ResultPartitionType.PIPELINED_BOUNDED, - channels, - 1, - environment.getResultPartitionManager(), - new NoOpResultPartitionConsumableNotifier(), - ioManager, - false); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = resultPartition.getNumberOfSubpartitions() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); - resultPartition.registerBufferPool(bufferPool); - - environment.getResultPartitionManager().registerResultPartition(resultPartition); - - return resultPartition; - } - - private InputGate createInputGate( - JobID jobId, - IntermediateDataSetID dataSetID, - ExecutionAttemptID executionAttemptID, - final TaskManagerLocation senderLocation, - NetworkEnvironment environment, - final int channels) throws IOException { - - InputGate[] gates = new InputGate[channels]; - for (int channel = 0; channel < channels; ++channel) { - int finalChannel = channel; - InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) - .map(partitionId -> new InputChannelDeploymentDescriptor( - partitionId, - ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) - .toArray(InputChannelDeploymentDescriptor[]::new); - - final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( - dataSetID, - ResultPartitionType.PIPELINED_BOUNDED, - channel, - channelDescriptors); - - SingleInputGate gate = SingleInputGate.create( - "receiving task[" + channel + "]", - jobId, - executionAttemptID, - gateDescriptor, - environment, - new NoOpTaskActions(), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = gate.getNumberOfInputChannels() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = - environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); - - gate.setBufferPool(bufferPool); - gates[channel] = gate; - } - - if (channels > 1) { - return new UnionInputGate(gates); - } else { - return gates[0]; - } - } - - // ------------------------------------------------------------------------ - // Mocks - // ------------------------------------------------------------------------ - - /** - * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito - * to avoid using mockito in this benchmark class. - */ - private static final class NoOpTaskActions implements TaskActions { - - @Override - public void triggerPartitionProducerStateCheck( - JobID jobId, - IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId) {} - - @Override - public void failExternally(Throwable cause) {} - } - - private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - - @Override - public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java index 6018e5528a17b..e6cc2d5e97741 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java similarity index 98% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java index be1c80f36ceec..126efefcba6b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.core.testutils.CheckedThread; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java index 848c018a5b7f9..580612ca03bc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index acbbdf82ec70a..83508ea245f2d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -18,23 +18,262 @@ package org.apache.flink.streaming.runtime.io.benchmark; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; + +import static org.apache.flink.util.ExceptionUtils.suppressExceptions; + /** - * Context for stream network benchmarks executed by the external + * Context for network benchmarks executed by the external * flink-benchmarks project. */ -public class StreamNetworkBenchmarkEnvironment extends NetworkBenchmarkEnvironment { +public class StreamNetworkBenchmarkEnvironment { + + private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); + + private static final int NUM_SLOTS_AND_THREADS = 1; + + private static final InetAddress LOCAL_ADDRESS; + + static { + try { + LOCAL_ADDRESS = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new Error(e); + } + } + + protected final JobID jobId = new JobID(); + protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); + protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + + protected NetworkEnvironment senderEnv; + protected NetworkEnvironment receiverEnv; + protected IOManager ioManager; + + protected int channels; + + protected ResultPartitionID[] partitionIds; + + public void setUp(int writers, int channels) throws Exception { + this.channels = channels; + this.partitionIds = new ResultPartitionID[writers]; + + int bufferPoolSize = Math.max(2048, writers * channels * 4); + senderEnv = createNettyNetworkEnvironment(bufferPoolSize); + receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); + ioManager = new IOManagerAsync(); + + senderEnv.start(); + receiverEnv.start(); + + generatePartitionIds(); + } + + public void tearDown() { + suppressExceptions(senderEnv::shutdown); + suppressExceptions(receiverEnv::shutdown); + suppressExceptions(ioManager::shutdown); + } + + public SerializingLongReceiver createReceiver() throws Exception { + TaskManagerLocation senderLocation = new TaskManagerLocation( + ResourceID.generate(), + LOCAL_ADDRESS, + senderEnv.getConnectionManager().getDataPort()); + + InputGate receiverGate = createInputGate( + jobId, + dataSetID, + executionAttemptID, + senderLocation, + receiverEnv, + channels); + + SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); + + receiver.start(); + return receiver; + } + + public StreamRecordWriter createRecordWriter(int partitionIndex, long flushTimeout) throws Exception { + ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); + return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector(), flushTimeout); + } + + private void generatePartitionIds() throws Exception { + for (int writer = 0; writer < partitionIds.length; writer++) { + partitionIds[writer] = new ResultPartitionID(); + } + } + + private NetworkEnvironment createNettyNetworkEnvironment( + @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { + + final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); + + final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( + new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); + + return new NetworkEnvironment( + bufferPool, + nettyConnectionManager, + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOMode.SYNC, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); + } + + protected ResultPartitionWriter createResultPartition( + JobID jobId, + ResultPartitionID partitionId, + NetworkEnvironment environment, + int channels) throws Exception { + + ResultPartition resultPartition = new ResultPartition( + "sender task", + new NoOpTaskActions(), + jobId, + partitionId, + ResultPartitionType.PIPELINED_BOUNDED, + channels, + 1, + environment.getResultPartitionManager(), + new NoOpResultPartitionConsumableNotifier(), + ioManager, + false); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = resultPartition.getNumberOfSubpartitions() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); + resultPartition.registerBufferPool(bufferPool); + + environment.getResultPartitionManager().registerResultPartition(resultPartition); + + return resultPartition; + } + + private InputGate createInputGate( + JobID jobId, + IntermediateDataSetID dataSetID, + ExecutionAttemptID executionAttemptID, + final TaskManagerLocation senderLocation, + NetworkEnvironment environment, + final int channels) throws IOException { + + InputGate[] gates = new InputGate[channels]; + for (int channel = 0; channel < channels; ++channel) { + int finalChannel = channel; + InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) + .map(partitionId -> new InputChannelDeploymentDescriptor( + partitionId, + ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) + .toArray(InputChannelDeploymentDescriptor[]::new); + + final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( + dataSetID, + ResultPartitionType.PIPELINED_BOUNDED, + channel, + channelDescriptors); + + SingleInputGate gate = SingleInputGate.create( + "receiving task[" + channel + "]", + jobId, + executionAttemptID, + gateDescriptor, + environment, + new NoOpTaskActions(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = gate.getNumberOfInputChannels() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = + environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); + + gate.setBufferPool(bufferPool); + gates[channel] = gate; + } + + if (channels > 1) { + return new UnionInputGate(gates); + } else { + return gates[0]; + } + } + + // ------------------------------------------------------------------------ + // Mocks + // ------------------------------------------------------------------------ + + /** + * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito + * to avoid using mockito in this benchmark class. + */ + private static final class NoOpTaskActions implements TaskActions { + + @Override + public void triggerPartitionProducerStateCheck( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) {} + + @Override + public void failExternally(Throwable cause) {} + } + + private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - public RecordWriter createStreamRecordWriter(int partitionIndex, long flushTimeout) - throws Exception { - ResultPartitionWriter sender = - createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout); + @Override + public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index 92864854883f1..843d3e2e1f312 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.benchmark.ReceiverThread; import org.apache.flink.types.LongValue; import java.util.concurrent.CompletableFuture; @@ -74,7 +73,7 @@ public void setUp(long flushTimeout) throws Exception { environment.setUp(1, 1); receiver = environment.createReceiver(); - recordWriter = environment.createStreamRecordWriter(0, flushTimeout); + recordWriter = environment.createRecordWriter(0, flushTimeout); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java similarity index 88% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java index 799b7c3093c56..3f41b0087f8d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.types.LongValue; @@ -27,10 +27,10 @@ * Network throughput benchmarks executed by the external * flink-benchmarks project. */ -public class NetworkThroughputBenchmark { +public class StreamNetworkThroughputBenchmark { private static final long RECEIVER_TIMEOUT = 30_000; - private NetworkBenchmarkEnvironment environment; + private StreamNetworkBenchmarkEnvironment environment; private ReceiverThread receiver; private LongRecordWriterThread[] writerThreads; @@ -63,13 +63,13 @@ public void executeBenchmark(long records) throws Exception { * @param channels * number of outgoing channels / receivers */ - public void setUp(int recordWriters, int channels) throws Exception { - environment = new NetworkBenchmarkEnvironment<>(); + public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception { + environment = new StreamNetworkBenchmarkEnvironment<>(); environment.setUp(recordWriters, channels); receiver = environment.createReceiver(); writerThreads = new LongRecordWriterThread[recordWriters]; for (int writer = 0; writer < recordWriters; writer++) { - writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer)); + writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout)); writerThreads[writer].start(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java similarity index 68% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java index c84743be843c9..8af8148bc65f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java @@ -16,18 +16,18 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.benchmark; +package org.apache.flink.streaming.runtime.io.benchmark; import org.junit.Test; /** - * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}. + * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}. */ -public class NetworkThroughputBenchmarkTests { +public class StreamNetworkThroughputBenchmarkTests { @Test public void pointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 1); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 1, 100); try { benchmark.executeBenchmark(1_000); } @@ -38,8 +38,8 @@ public void pointToPointBenchmark() throws Exception { @Test public void pointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 100); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 100, 100); try { benchmark.executeBenchmark(1_000); } @@ -50,8 +50,8 @@ public void pointToMultiPointBenchmark() throws Exception { @Test public void multiPointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 1); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 1, 100); try { benchmark.executeBenchmark(1_000); } @@ -62,8 +62,8 @@ public void multiPointToPointBenchmark() throws Exception { @Test public void multiPointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 100); + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 100, 100); try { benchmark.executeBenchmark(1_000); }