From 73998ba1328d4bf61ee979ed327b0a684ed03aa7 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 24 Jul 2017 18:47:23 +0200 Subject: [PATCH] [FLINK-7258] [network] Fix watermark configuration order When configuring larger memory segment sizes, configuring the low watermark before the high watermark may lead to an IllegalArgumentException, because the low watermark will temporarily be higher than the high watermark. It's necessary to configure the high watermark before the low watermark. For the queryable state server in KvStateServer I didn't add an extra test as the watermarks cannot be configured there. --- .../runtime/io/network/netty/NettyServer.java | 2 +- .../runtime/query/netty/KvStateServer.java | 2 +- .../NettyServerLowAndHighWatermarkTest.java | 24 ++++++++++++++++--- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 3cf14b8b15569..ee3e9234d9f8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -121,8 +121,8 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws } // Low and high water marks for flow control - bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1); bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize()); + bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1); // SSL related configuration try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java index 925a7757f6003..c6f46d1552075 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java @@ -141,8 +141,8 @@ public KvStateServer( .option(ChannelOption.ALLOCATOR, bufferPool) // Child channel options .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) // See initializer for pipeline details .childHandler(new KvStateServerChannelInitializer(serverHandler)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java index 0038640b3e93c..e8b655062272f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java @@ -34,12 +34,17 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class NettyServerLowAndHighWatermarkTest { - private final static int PageSize = 1024; + /** + * Pick a larger memory segment size here in order to trigger + * FLINK-7258. + */ + private final static int PageSize = 65536; /** * Verifies that the high and low watermark are set in relation to the page size. @@ -54,12 +59,16 @@ public class NettyServerLowAndHighWatermarkTest { */ @Test public void testLowAndHighWatermarks() throws Throwable { + final int expectedLowWatermark = PageSize + 1; + final int expectedHighWatermark = 2 * PageSize; + final AtomicReference error = new AtomicReference(); final NettyProtocol protocol = new NettyProtocol() { @Override public ChannelHandler[] getServerChannelHandlers() { // The channel handler implements the test - return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(error)}; + return new ChannelHandler[] {new TestLowAndHighWatermarkHandler( + expectedLowWatermark, expectedHighWatermark, error)}; } @Override @@ -97,11 +106,17 @@ public ChannelHandler[] getClientChannelHandlers() { */ private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter { + private final int expectedLowWatermark; + + private final int expectedHighWatermark; + private final AtomicReference error; private boolean hasFlushed; - public TestLowAndHighWatermarkHandler(AtomicReference error) { + public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference error) { + this.expectedLowWatermark = expectedLowWatermark; + this.expectedHighWatermark = expectedHighWatermark; this.error = error; } @@ -109,6 +124,9 @@ public TestLowAndHighWatermarkHandler(AtomicReference error) { public void channelActive(ChannelHandlerContext ctx) throws Exception { final Channel ch = ctx.channel(); + assertEquals("Low watermark", expectedLowWatermark, ch.config().getWriteBufferLowWaterMark()); + assertEquals("High watermark", expectedHighWatermark, ch.config().getWriteBufferHighWaterMark()); + // Start with a writable channel assertTrue(ch.isWritable());