From 1c0e5aae05f3e4eaec986792b508a08ee242ac73 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 10 Aug 2017 16:58:19 +0200 Subject: [PATCH 1/2] [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol - removes one level of (unneeded) abstraction for clarity --- .../network/netty/NettyConnectionManager.java | 4 +- .../io/network/netty/NettyProtocol.java | 120 ++++++++++++++++- .../netty/PartitionRequestProtocol.java | 127 ------------------ .../netty/CancelPartitionRequestTest.java | 4 +- .../ClientTransportErrorHandlingTest.java | 26 ++-- .../netty/NettyClientServerSslTest.java | 27 ++-- .../NettyServerLowAndHighWatermarkTest.java | 2 +- .../PartitionRequestClientFactoryTest.java | 3 +- .../ServerTransportErrorHandlingTest.java | 8 +- 9 files changed, 149 insertions(+), 172 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index fcf618a1982a1..1d9871587480a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -46,8 +46,8 @@ public NettyConnectionManager(NettyConfig nettyConfig) { @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { - PartitionRequestProtocol partitionRequestProtocol = - new PartitionRequestProtocol(partitionProvider, taskEventDispatcher); + NettyProtocol partitionRequestProtocol = + new NettyProtocol(partitionProvider, taskEventDispatcher); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java index bcfe55807d502..7de00e8d21f7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java @@ -18,12 +18,126 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; + import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -public interface NettyProtocol { +import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; + +/** + * Defines the server and client channel handlers, i.e. the protocol, used by netty. + */ +public class NettyProtocol { + + private final NettyMessage.NettyMessageEncoder + messageEncoder = new NettyMessage.NettyMessageEncoder(); + + private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); + + private final ResultPartitionProvider partitionProvider; + private final TaskEventDispatcher taskEventDispatcher; + + NettyProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { + this.partitionProvider = partitionProvider; + this.taskEventDispatcher = taskEventDispatcher; + } + + /** + * Returns the server channel handlers. + * + *
+	 * +-------------------------------------------------------------------+
+	 * |                        SERVER CHANNEL PIPELINE                    |
+	 * |                                                                   |
+	 * |    +----------+----------+ (3) write  +----------------------+    |
+	 * |    | Queue of queues     +----------->| Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               | (2) enqueue                       |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Request handler     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (1) client request               \|/
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * 
+ * + * @return channel handlers + */ + public ChannelHandler[] getServerChannelHandlers() { + PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); + PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( + partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - ChannelHandler[] getServerChannelHandlers(); + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + serverHandler, + queueOfPartitionQueues + }; + } - ChannelHandler[] getClientChannelHandlers(); + /** + * Returns the client channel handlers. + * + *
+	 *     +-----------+----------+            +----------------------+
+	 *     | Remote input channel |            | request client       |
+	 *     +-----------+----------+            +-----------+----------+
+	 *                 |                                   | (1) write
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |     CLIENT CHANNEL PIPELINE       |               |
+	 * |               |                                  \|/              |
+	 * |    +----------+----------+            +----------------------+    |
+	 * |    | Request handler     +            | Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (3) server response              \|/ (2) client request
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * 
+ * + * @return channel handlers + */ + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + new PartitionRequestClientHandler()}; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java deleted file mode 100644 index b6614b6b789eb..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.netty; - -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; - -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; - -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; - -class PartitionRequestProtocol implements NettyProtocol { - - private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder(); - - private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); - - private final ResultPartitionProvider partitionProvider; - private final TaskEventDispatcher taskEventDispatcher; - - PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { - this.partitionProvider = partitionProvider; - this.taskEventDispatcher = taskEventDispatcher; - } - - // +-------------------------------------------------------------------+ - // | SERVER CHANNEL PIPELINE | - // | | - // | +----------+----------+ (3) write +----------------------+ | - // | | Queue of queues +----------->| Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | (2) enqueue | | - // | +----------+----------+ | | - // | | Request handler | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (1) client request \|/ - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getServerChannelHandlers() { - PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); - PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( - partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - serverHandler, - queueOfPartitionQueues - }; - } - - // +-----------+----------+ +----------------------+ - // | Remote input channel | | request client | - // +-----------+----------+ +-----------+----------+ - // | | (1) write - // +---------------+-----------------------------------+---------------+ - // | | CLIENT CHANNEL PIPELINE | | - // | | \|/ | - // | +----------+----------+ +----------------------+ | - // | | Request handler + | Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (3) server response \|/ (2) client request - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - new PartitionRequestClientHandler()}; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 12f5064117c50..25b97d922fb2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -89,7 +89,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); @@ -140,7 +140,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 5754e36fa393d..eebdc293b9455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -76,18 +76,14 @@ public class ClientTransportErrorHandlingTest { @Test public void testExceptionOnWrite() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; // We need a real server and client in this test, because Netty's EmbeddedChannel is @@ -215,7 +211,10 @@ public void testWrappingOfRemoteErrorMessage() throws Exception { @Test public void testExceptionOnRemoteClose() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[] { @@ -230,13 +229,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } }; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig()); @@ -380,7 +372,7 @@ public void testChannelClosedOnExceptionDuringErrorNotification() throws Excepti // --------------------------------------------------------------------------------------------- private EmbeddedChannel createEmbeddedChannel() { - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 3f2d363f76f4e..20031b31c1201 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -38,19 +38,20 @@ public class NettyClientServerSslTest { /** - * Verify valid ssl configuration and connection - * + * Verify valid ssl configuration and connection. */ @Test public void testValidSslConnection() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; NettyConfig nettyConfig = new NettyConfig( @@ -72,19 +73,20 @@ public ChannelHandler[] getServerChannelHandlers() { } /** - * Verify failure on invalid ssl configuration - * + * Verify failure on invalid ssl configuration. */ @Test public void testInvalidSslConfiguration() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); @@ -110,19 +112,20 @@ public ChannelHandler[] getServerChannelHandlers() { } /** - * Verify SSL handshake error when untrusted server certificate is used - * + * Verify SSL handshake error when untrusted server certificate is used. */ @Test public void testSslHandshakeError() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java index e0128e774ca65..5e427533deee7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java @@ -64,7 +64,7 @@ public void testLowAndHighWatermarks() throws Throwable { final int expectedHighWatermark = 2 * PageSize; final AtomicReference error = new AtomicReference(); - final NettyProtocol protocol = new NettyProtocol() { + final NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { // The channel handler implements the test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 91a052f2540c7..d9716343c1063 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -57,7 +57,8 @@ public void testResourceReleaseAfterInterruptedConnect() throws Exception { final CountDownLatch syncOnConnect = new CountDownLatch(1); final Tuple2 netty = createNettyServerAndClient( - new NettyProtocol() { + new NettyProtocol(null, null) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 01a0b5fa31ee5..77824f57349b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -73,13 +73,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T } }); - NettyProtocol protocol = new NettyProtocol() { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new PartitionRequestProtocol( - partitionManager, - mock(TaskEventDispatcher.class)).getServerChannelHandlers(); - } + NettyProtocol protocol = new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) { @Override public ChannelHandler[] getClientChannelHandlers() { From 6c584d5e175300e777b3a610ee196693ff32fc05 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 10 Aug 2017 18:23:18 +0200 Subject: [PATCH 2/2] [FLINK-7428][network] avoid buffer copies when receiving messages The LengthFieldBasedFrameDecoder used so far creates one additional copy in its `#extractFrame()` method which is avoidable in our case. We do so by letting `NettyMessageDecoder` inherit from it (instead of being an additional step in the pipeline) and overriding this similarly to the `ObjectDecoder` class provided by netty itself. --- .../io/network/netty/NettyMessage.java | 52 +++++++++++------ .../io/network/netty/NettyProtocol.java | 56 +++++++------------ .../netty/NettyMessageSerializationTest.java | 1 - 3 files changed, 54 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index de50f82a2fbb9..61f1a103f6372 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -36,13 +36,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; -import java.util.List; /** * A simple and generic interface to serialize messages to Netty's buffer space. @@ -106,22 +104,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.write(msg, promise); } } - - // Create the frame length decoder here as it depends on the encoder - // - // +------------------+------------------+--------++----------------+ - // | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE | - // +------------------+------------------+--------++----------------+ - static LengthFieldBasedFrameDecoder createFrameLengthDecoder() { - return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4); - } } - @ChannelHandler.Sharable - static class NettyMessageDecoder extends MessageToMessageDecoder { + /** + * Message decoder based on netty's {@link LengthFieldBasedFrameDecoder} but avoiding the + * additional memory copy inside {@link #extractFrame(ChannelHandlerContext, ByteBuf, int, int)} + * since we completely decode the {@link ByteBuf} inside {@link #decode(ChannelHandlerContext, + * ByteBuf)} and will not re-use it afterwards. + * + *

The frame-length encoder will be based on this transmission scheme created by {@link NettyMessage#allocateBuffer(ByteBufAllocator, byte, int)}: + *

+	 * +------------------+------------------+--------++----------------+
+	 * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
+	 * +------------------+------------------+--------++----------------+
+	 * 
+ */ + static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { + + /** + * Creates a new message decoded with the required frame properties. + */ + NettyMessageDecoder() { + super(Integer.MAX_VALUE, 0, 4, -4, 4); + } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + ByteBuf msg = (ByteBuf) super.decode(ctx, in); + if (msg == null) { + return null; + } + int magicNumber = msg.readInt(); if (magicNumber != MAGIC_NUMBER) { @@ -154,10 +167,13 @@ else if (msgId == CloseRequest.ID) { throw new IllegalStateException("Received unknown message from producer: " + msg); } - if (decodedMsg != null) { - decodedMsg.readFrom(msg); - out.add(decodedMsg); - } + decodedMsg.readFrom(msg); + return decodedMsg; + } + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { + return buffer.slice(index, length); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java index 7de00e8d21f7a..6671fc8d80502 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java @@ -23,8 +23,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; - /** * Defines the server and client channel handlers, i.e. the protocol, used by netty. */ @@ -33,8 +31,6 @@ public class NettyProtocol { private final NettyMessage.NettyMessageEncoder messageEncoder = new NettyMessage.NettyMessageEncoder(); - private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); - private final ResultPartitionProvider partitionProvider; private final TaskEventDispatcher taskEventDispatcher; @@ -50,24 +46,19 @@ public class NettyProtocol { * +-------------------------------------------------------------------+ * | SERVER CHANNEL PIPELINE | * | | - * | +----------+----------+ (3) write +----------------------+ | - * | | Queue of queues +----------->| Message encoder | | - * | +----------+----------+ +-----------+----------+ | + * | +-----------+-----------+ (3) write +---------------------+ | + * | | Queue of queues +----------->| Message encoder | | + * | +-----------+-----------+ +----------+----------+ | * | /|\ \|/ | * | | (2) enqueue | | - * | +----------+----------+ | | - * | | Request handler | | | - * | +----------+----------+ | | - * | /|\ | | - * | | | | - * | +----------+----------+ | | - * | | Message decoder | | | - * | +----------+----------+ | | + * | +-----------+-----------+ | | + * | | Request handler | | | + * | +-----------+-----------+ | | * | /|\ | | * | | | | - * | +----------+----------+ | | - * | | Frame decoder | | | - * | +----------+----------+ | | + * | +-----------+-----------+ | | + * | | Message+Frame decoder | | | + * | +-----------+-----------+ | | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | | (1) client request \|/ @@ -88,8 +79,7 @@ public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[] { messageEncoder, - createFrameLengthDecoder(), - messageDecoder, + new NettyMessage.NettyMessageDecoder(), serverHandler, queueOfPartitionQueues }; @@ -99,26 +89,21 @@ public ChannelHandler[] getServerChannelHandlers() { * Returns the client channel handlers. * *
-	 *     +-----------+----------+            +----------------------+
-	 *     | Remote input channel |            | request client       |
-	 *     +-----------+----------+            +-----------+----------+
+	 *     +-----------+------------+          +----------------------+
+	 *     | Remote input channel   |          | request client       |
+	 *     +-----------+------------+          +-----------+----------+
 	 *                 |                                   | (1) write
 	 * +---------------+-----------------------------------+---------------+
 	 * |               |     CLIENT CHANNEL PIPELINE       |               |
 	 * |               |                                  \|/              |
-	 * |    +----------+----------+            +----------------------+    |
-	 * |    | Request handler     +            | Message encoder      |    |
-	 * |    +----------+----------+            +-----------+----------+    |
+	 * |    +----------+------------+          +----------------------+    |
+	 * |    | Request handler       +          | Message encoder      |    |
+	 * |    +----------+------------+          +-----------+----------+    |
 	 * |              /|\                                 \|/              |
 	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Message decoder     |                        |               |
-	 * |    +----------+----------+                        |               |
-	 * |              /|\                                  |               |
-	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Frame decoder       |                        |               |
-	 * |    +----------+----------+                        |               |
+	 * |    +----------+------------+                      |               |
+	 * |    | Message+Frame decoder |                      |               |
+	 * |    +----------+------------+                      |               |
 	 * |              /|\                                  |               |
 	 * +---------------+-----------------------------------+---------------+
 	 * |               | (3) server response              \|/ (2) client request
@@ -135,8 +120,7 @@ public ChannelHandler[] getServerChannelHandlers() {
 	public ChannelHandler[] getClientChannelHandlers() {
 		return new ChannelHandler[] {
 			messageEncoder,
-			createFrameLengthDecoder(),
-			messageDecoder,
+			new NettyMessage.NettyMessageDecoder(),
 			new PartitionRequestClientHandler()};
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 8200caa40e38d..2d8e0db9fd683 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -47,7 +47,6 @@ public class NettyMessageSerializationTest {
 
 	private final EmbeddedChannel channel = new EmbeddedChannel(
 			new NettyMessage.NettyMessageEncoder(), // outbound messages
-			NettyMessage.NettyMessageEncoder.createFrameLengthDecoder(), // inbound messages
 			new NettyMessage.NettyMessageDecoder()); // inbound messages
 
 	private final Random random = new Random();