From a722d7cd4c4218543c87c2e8a3b3bbc708bddf55 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 13 Feb 2017 16:30:59 +0100 Subject: [PATCH] [FLINK-5553] keep the original throwable in PartitionRequestClientHandler This way, when checking for a previous error in any input channel, we can throw a meaningful exception instead of the inspecific IllegalStateException("There has been an error in the channel.") before. Note that the original throwable (from an existing channel) may or may not(!) have been printed by the InputGate yet. Any new input channel, however, did not get the Throwable and must fail through the (now enhanced) fallback mechanism. --- .../netty/PartitionRequestClientHandler.java | 27 ++++++++++++++----- .../ClientTransportErrorHandlingTest.java | 3 ++- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 52775d4dd757c..9f80abc177b43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -42,18 +42,15 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.util.Preconditions.checkState; - class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class); private final ConcurrentMap inputChannels = new ConcurrentHashMap(); - private final AtomicBoolean channelError = new AtomicBoolean(false); + private final AtomicReference channelError = new AtomicReference(); private final BufferListenerTask bufferListener = new BufferListenerTask(); @@ -73,8 +70,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { // Input channel/receiver registration // ------------------------------------------------------------------------ - void addInputChannel(RemoteInputChannel listener) { - checkState(!channelError.get(), "There has been an error in the channel."); + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); if (!inputChannels.containsKey(listener.getInputChannelId())) { inputChannels.put(listener.getInputChannelId(), listener); @@ -172,7 +169,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } private void notifyAllChannelsOfErrorAndClose(Throwable cause) { - if (channelError.compareAndSet(false, true)) { + if (channelError.compareAndSet(null, cause)) { try { for (RemoteInputChannel inputChannel : inputChannels.values()) { inputChannel.onError(cause); @@ -195,6 +192,22 @@ private void notifyAllChannelsOfErrorAndClose(Throwable cause) { // ------------------------------------------------------------------------ + /** + * Checks for an error and rethrows it if one was reported. + */ + private void checkError() throws IOException { + final Throwable t = channelError.get(); + + if (t != null) { + if (t instanceof IOException) { + throw (IOException) t; + } + else { + throw new IOException("There has been an error in the channel.", t); + } + } + } + @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index ab96d4a0ab28f..22e7754fcb0b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -389,7 +389,8 @@ private EmbeddedChannel createEmbeddedChannel() { return new EmbeddedChannel(protocol.getClientChannelHandlers()); } - private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) { + private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) + throws IOException { RemoteInputChannel rich = createRemoteInputChannel(); clientHandler.addInputChannel(rich);