From bb0032ff74aa71b3e2f8be9bc201a00e28551280 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 19 Jan 2018 18:27:22 +0100 Subject: [PATCH] [FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest Instead of waiting on the ChannelFuture we register a ChannelFutureListener which is notified when the channel has been established. This unblocks IO executor threads in the RestClient. --- .../apache/flink/runtime/rest/RestClient.java | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 71891de9bf3b5..5af50b2d5ead3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException; @@ -42,6 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; @@ -181,24 +181,31 @@ public , U extends MessageParameters, R extend } private

CompletableFuture

submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class

responseClass) { - return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor) - .thenApply((channel) -> { - try { - return channel.sync(); - } catch (InterruptedException e) { - throw new FlinkRuntimeException(e); + final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort); + + final CompletableFuture channelFuture = new CompletableFuture<>(); + + connectFuture.addListener( + (ChannelFuture future) -> { + if (future.isSuccess()) { + channelFuture.complete(future.channel()); + } else { + channelFuture.completeExceptionally(future.cause()); } - }) - .thenApply((ChannelFuture::channel)) - .thenCompose(channel -> { - ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); - channel.writeAndFlush(httpRequest); - return future; - }).thenComposeAsync( + }); + + return channelFuture + .thenComposeAsync( + channel -> { + ClientHandler handler = channel.pipeline().get(ClientHandler.class); + CompletableFuture future = handler.getJsonFuture(); + channel.writeAndFlush(httpRequest); + return future; + }, + executor) + .thenComposeAsync( (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass), - executor - ); + executor); } private static

CompletableFuture

parseResponse(JsonResponse rawResponse, Class

responseClass) {