Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId
private NettyPartitionRequestClient connect(ConnectionID connectionId)
throws RemoteTransportException, InterruptedException {
try {
Channel channel = nettyClient.connect(connectionId.getAddress()).await().channel();
Channel channel = nettyClient.connect(connectionId.getAddress()).sync().channel();
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public ChannelFuture removeListeners(
}

@Override
public ChannelFuture sync() {
throw new UnsupportedOperationException();
public ChannelFuture sync() throws InterruptedException {
while (true) {
Thread.sleep(50);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
Expand All @@ -28,21 +26,15 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;

import org.junit.Test;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -217,6 +209,24 @@ public void testNettyClientConnectRetryMultipleThread() throws Exception {
serverAndClient.server().shutdown();
}

@Test(expected = RemoteTransportException.class)
public void testThrowsWhenNetworkFailure() throws Exception {
NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
try {
NettyClient client = nettyServerAndClient.client();
PartitionRequestClientFactory factory = new PartitionRequestClientFactory(client, 0);

// Connect to a wrong address
InetSocketAddress addr =
new InetSocketAddress(InetAddress.getLocalHost(), NetUtils.getAvailablePort());
ConnectionID connectionID = new ConnectionID(addr, 0);
factory.createPartitionRequestClient(connectionID);
} finally {
nettyServerAndClient.client().shutdown();
nettyServerAndClient.server().shutdown();
}
}

private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
return NettyTestUtil.initServerAndClient(
new NettyProtocol(null, null) {
Expand Down Expand Up @@ -289,66 +299,4 @@ ChannelFuture connect(InetSocketAddress serverSocketAddress) {
}
}
}

private static class CountDownLatchOnConnectHandler extends ChannelOutboundHandlerAdapter {

private final CountDownLatch syncOnConnect;

public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
this.syncOnConnect = syncOnConnect;
}

@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress,
SocketAddress localAddress,
ChannelPromise promise) {
syncOnConnect.countDown();
}
}

private static class UncaughtTestExceptionHandler implements UncaughtExceptionHandler {

private final List<Throwable> errors = new ArrayList<>(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
errors.add(e);
}

private List<Throwable> getErrors() {
return errors;
}
}

// ------------------------------------------------------------------------

private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(
NettyProtocol protocol) throws IOException {
final NettyConfig config =
new NettyConfig(
InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1, new Configuration());

final NettyServer server = new NettyServer(config);
final NettyClient client = new NettyClient(config);

boolean success = false;

try {
NettyBufferPool bufferPool = new NettyBufferPool(1);

server.init(protocol, bufferPool);
client.init(protocol, bufferPool);

success = true;
} finally {
if (!success) {
server.shutdown();
client.shutdown();
}
}

return new Tuple2<>(server, client);
}
}