From 1c0e5aae05f3e4eaec986792b508a08ee242ac73 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 10 Aug 2017 16:58:19 +0200 Subject: [PATCH] [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol - removes one level of (unneeded) abstraction for clarity --- .../network/netty/NettyConnectionManager.java | 4 +- .../io/network/netty/NettyProtocol.java | 120 ++++++++++++++++- .../netty/PartitionRequestProtocol.java | 127 ------------------ .../netty/CancelPartitionRequestTest.java | 4 +- .../ClientTransportErrorHandlingTest.java | 26 ++-- .../netty/NettyClientServerSslTest.java | 27 ++-- .../NettyServerLowAndHighWatermarkTest.java | 2 +- .../PartitionRequestClientFactoryTest.java | 3 +- .../ServerTransportErrorHandlingTest.java | 8 +- 9 files changed, 149 insertions(+), 172 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index fcf618a1982a1..1d9871587480a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -46,8 +46,8 @@ public NettyConnectionManager(NettyConfig nettyConfig) { @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { - PartitionRequestProtocol partitionRequestProtocol = - new PartitionRequestProtocol(partitionProvider, taskEventDispatcher); + NettyProtocol partitionRequestProtocol = + new NettyProtocol(partitionProvider, taskEventDispatcher); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java index bcfe55807d502..7de00e8d21f7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java @@ -18,12 +18,126 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; + import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -public interface NettyProtocol { +import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; + +/** + * Defines the server and client channel handlers, i.e. the protocol, used by netty. + */ +public class NettyProtocol { + + private final NettyMessage.NettyMessageEncoder + messageEncoder = new NettyMessage.NettyMessageEncoder(); + + private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); + + private final ResultPartitionProvider partitionProvider; + private final TaskEventDispatcher taskEventDispatcher; + + NettyProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { + this.partitionProvider = partitionProvider; + this.taskEventDispatcher = taskEventDispatcher; + } + + /** + * Returns the server channel handlers. + * + *
+	 * +-------------------------------------------------------------------+
+	 * |                        SERVER CHANNEL PIPELINE                    |
+	 * |                                                                   |
+	 * |    +----------+----------+ (3) write  +----------------------+    |
+	 * |    | Queue of queues     +----------->| Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               | (2) enqueue                       |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Request handler     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (1) client request               \|/
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * 
+ * + * @return channel handlers + */ + public ChannelHandler[] getServerChannelHandlers() { + PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); + PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( + partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - ChannelHandler[] getServerChannelHandlers(); + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + serverHandler, + queueOfPartitionQueues + }; + } - ChannelHandler[] getClientChannelHandlers(); + /** + * Returns the client channel handlers. + * + *
+	 *     +-----------+----------+            +----------------------+
+	 *     | Remote input channel |            | request client       |
+	 *     +-----------+----------+            +-----------+----------+
+	 *                 |                                   | (1) write
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |     CLIENT CHANNEL PIPELINE       |               |
+	 * |               |                                  \|/              |
+	 * |    +----------+----------+            +----------------------+    |
+	 * |    | Request handler     +            | Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (3) server response              \|/ (2) client request
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * 
+ * + * @return channel handlers + */ + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + new PartitionRequestClientHandler()}; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java deleted file mode 100644 index b6614b6b789eb..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.netty; - -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; - -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; - -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; - -class PartitionRequestProtocol implements NettyProtocol { - - private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder(); - - private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); - - private final ResultPartitionProvider partitionProvider; - private final TaskEventDispatcher taskEventDispatcher; - - PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { - this.partitionProvider = partitionProvider; - this.taskEventDispatcher = taskEventDispatcher; - } - - // +-------------------------------------------------------------------+ - // | SERVER CHANNEL PIPELINE | - // | | - // | +----------+----------+ (3) write +----------------------+ | - // | | Queue of queues +----------->| Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | (2) enqueue | | - // | +----------+----------+ | | - // | | Request handler | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (1) client request \|/ - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getServerChannelHandlers() { - PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); - PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( - partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - serverHandler, - queueOfPartitionQueues - }; - } - - // +-----------+----------+ +----------------------+ - // | Remote input channel | | request client | - // +-----------+----------+ +-----------+----------+ - // | | (1) write - // +---------------+-----------------------------------+---------------+ - // | | CLIENT CHANNEL PIPELINE | | - // | | \|/ | - // | +----------+----------+ +----------------------+ | - // | | Request handler + | Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (3) server response \|/ (2) client request - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - new PartitionRequestClientHandler()}; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 12f5064117c50..25b97d922fb2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -89,7 +89,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); @@ -140,7 +140,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 5754e36fa393d..eebdc293b9455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -76,18 +76,14 @@ public class ClientTransportErrorHandlingTest { @Test public void testExceptionOnWrite() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; // We need a real server and client in this test, because Netty's EmbeddedChannel is @@ -215,7 +211,10 @@ public void testWrappingOfRemoteErrorMessage() throws Exception { @Test public void testExceptionOnRemoteClose() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[] { @@ -230,13 +229,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } }; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig()); @@ -380,7 +372,7 @@ public void testChannelClosedOnExceptionDuringErrorNotification() throws Excepti // --------------------------------------------------------------------------------------------- private EmbeddedChannel createEmbeddedChannel() { - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 3f2d363f76f4e..20031b31c1201 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -38,19 +38,20 @@ public class NettyClientServerSslTest { /** - * Verify valid ssl configuration and connection - * + * Verify valid ssl configuration and connection. */ @Test public void testValidSslConnection() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; NettyConfig nettyConfig = new NettyConfig( @@ -72,19 +73,20 @@ public ChannelHandler[] getServerChannelHandlers() { } /** - * Verify failure on invalid ssl configuration - * + * Verify failure on invalid ssl configuration. */ @Test public void testInvalidSslConfiguration() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); @@ -110,19 +112,20 @@ public ChannelHandler[] getServerChannelHandlers() { } /** - * Verify SSL handshake error when untrusted server certificate is used - * + * Verify SSL handshake error when untrusted server certificate is used. */ @Test public void testSslHandshakeError() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java index e0128e774ca65..5e427533deee7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java @@ -64,7 +64,7 @@ public void testLowAndHighWatermarks() throws Throwable { final int expectedHighWatermark = 2 * PageSize; final AtomicReference error = new AtomicReference(); - final NettyProtocol protocol = new NettyProtocol() { + final NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { // The channel handler implements the test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 91a052f2540c7..d9716343c1063 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -57,7 +57,8 @@ public void testResourceReleaseAfterInterruptedConnect() throws Exception { final CountDownLatch syncOnConnect = new CountDownLatch(1); final Tuple2 netty = createNettyServerAndClient( - new NettyProtocol() { + new NettyProtocol(null, null) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 01a0b5fa31ee5..77824f57349b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -73,13 +73,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - NettyProtocol protocol = new NettyProtocol() { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new PartitionRequestProtocol( - partitionManager, - mock(TaskEventDispatcher.class)).getServerChannelHandlers(); - } + NettyProtocol protocol = new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) { @Override public ChannelHandler[] getClientChannelHandlers() {