From 70e0c8d343d7a813a79bd230b42702c1b15f1abc Mon Sep 17 00:00:00 2001 From: jinxing64 Date: Fri, 18 Jun 2021 15:34:49 +0800 Subject: [PATCH] [FLINK-23030][network] PartitionRequestClientFactory#createPartitionRequestClient should throw when network failure --- .../netty/PartitionRequestClientFactory.java | 2 +- .../netty/NeverCompletingChannelFuture.java | 6 +- .../PartitionRequestClientFactoryTest.java | 88 ++++--------------- 3 files changed, 23 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index 591e9725678d2..ff4c19c36c367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -131,7 +131,7 @@ private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException, InterruptedException { try { - Channel channel = nettyClient.connect(connectionId.getAddress()).await().channel(); + Channel channel = nettyClient.connect(connectionId.getAddress()).sync().channel(); NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this); } catch (InterruptedException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java index 1d7c81e55e9a4..ad6772c8cf85a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java @@ -74,8 +74,10 @@ public ChannelFuture removeListeners( } @Override - public ChannelFuture sync() { - throw new UnsupportedOperationException(); + public ChannelFuture sync() throws InterruptedException { + while (true) { + Thread.sleep(50); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 595b3e5d4a5cb..55423085fc0df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; @@ -28,21 +26,15 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; import org.junit.Test; import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -217,6 +209,24 @@ public void testNettyClientConnectRetryMultipleThread() throws Exception { serverAndClient.server().shutdown(); } + @Test(expected = RemoteTransportException.class) + public void testThrowsWhenNetworkFailure() throws Exception { + NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient(); + try { + NettyClient client = nettyServerAndClient.client(); + PartitionRequestClientFactory factory = new PartitionRequestClientFactory(client, 0); + + // Connect to a wrong address + InetSocketAddress addr = + new InetSocketAddress(InetAddress.getLocalHost(), NetUtils.getAvailablePort()); + ConnectionID connectionID = new ConnectionID(addr, 0); + factory.createPartitionRequestClient(connectionID); + } finally { + nettyServerAndClient.client().shutdown(); + nettyServerAndClient.server().shutdown(); + } + } + private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception { return NettyTestUtil.initServerAndClient( new NettyProtocol(null, null) { @@ -289,66 +299,4 @@ ChannelFuture connect(InetSocketAddress serverSocketAddress) { } } } - - private static class CountDownLatchOnConnectHandler extends ChannelOutboundHandlerAdapter { - - private final CountDownLatch syncOnConnect; - - public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) { - this.syncOnConnect = syncOnConnect; - } - - @Override - public void connect( - ChannelHandlerContext ctx, - SocketAddress remoteAddress, - SocketAddress localAddress, - ChannelPromise promise) { - syncOnConnect.countDown(); - } - } - - private static class UncaughtTestExceptionHandler implements UncaughtExceptionHandler { - - private final List errors = new ArrayList<>(1); - - @Override - public void uncaughtException(Thread t, Throwable e) { - errors.add(e); - } - - private List getErrors() { - return errors; - } - } - - // ------------------------------------------------------------------------ - - private static Tuple2 createNettyServerAndClient( - NettyProtocol protocol) throws IOException { - final NettyConfig config = - new NettyConfig( - InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1, new Configuration()); - - final NettyServer server = new NettyServer(config); - final NettyClient client = new NettyClient(config); - - boolean success = false; - - try { - NettyBufferPool bufferPool = new NettyBufferPool(1); - - server.init(protocol, bufferPool); - client.init(protocol, bufferPool); - - success = true; - } finally { - if (!success) { - server.shutdown(); - client.shutdown(); - } - } - - return new Tuple2<>(server, client); - } }