Skip to content

Commit

Permalink
[FLINK-1954] [FLINK-1957] [runtime] Improve error handling of transpo…
Browse files Browse the repository at this point in the history
…rt failures

Problem: Failures in the network stack were not properly handled and correctly
attributed.

Solution: Failures are always attributeed to the client (consumer). This change
introduces TransportException, which indicates whether the problem ocurred
locally or remotely. This makes it easy to reason about the source of a problem.

This closes #713.
  • Loading branch information
uce committed May 26, 2015
1 parent fdac963 commit 2a65b62
Show file tree
Hide file tree
Showing 21 changed files with 1,218 additions and 231 deletions.
Expand Up @@ -107,14 +107,18 @@ void init(final NettyProtocol protocol) throws IOException {
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
protocol.setClientChannelPipeline(channel.pipeline());
channel.pipeline().addLast(protocol.getClientChannelHandlers());
}
});

long end = System.currentTimeMillis();
LOG.info("Successful initialization (took {} ms).", (end - start));
}

NettyConfig getConfig() {
return config;
}

void shutdown() {
long start = System.currentTimeMillis();

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

Expand Down Expand Up @@ -89,12 +90,20 @@ static class NettyMessageEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof NettyMessage) {

ByteBuf serialized = null;

try {
ctx.write(((NettyMessage) msg).write(ctx.alloc()), promise);
serialized = ((NettyMessage) msg).write(ctx.alloc());
}
catch (Throwable t) {
throw new IOException("Error while serializing message: " + msg, t);
}
finally {
if (serialized != null) {
ctx.write(serialized, promise);
}
}
}
else {
ctx.write(msg, promise);
Expand Down Expand Up @@ -311,6 +320,7 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {

return result;
}

@Override
void readFrom(ByteBuf buffer) throws Exception {
DataInputView inputView = new ByteBufDataInputView(buffer);
Expand Down Expand Up @@ -463,6 +473,48 @@ public void readFrom(ByteBuf buffer) {
}
}

/**
* Cancels the partition request of the {@link InputChannel} identified by
* {@link InputChannelID}.
*
* <p> There is a 1:1 mapping between the input channel and partition per physical channel.
* Therefore, the {@link InputChannelID} instance is enough to identify which request to cancel.
*/
static class CancelPartitionRequest extends NettyMessage {

final static byte ID = 4;

InputChannelID receiverId;

public CancelPartitionRequest(InputChannelID receiverId) {
this.receiverId = receiverId;
}

@Override
ByteBuf write(ByteBufAllocator allocator) throws Exception {
ByteBuf result = null;

try {
result = allocateBuffer(allocator, ID);
receiverId.writeTo(result);
}
catch (Throwable t) {
if (result != null) {
result.release();
}

throw new IOException(t);
}

return result;
}

@Override
void readFrom(ByteBuf buffer) throws Exception {
receiverId = InputChannelID.fromByteBuf(buffer);
}
}

// ------------------------------------------------------------------------

private static class ByteBufDataInputView implements DataInputView {
Expand Down
Expand Up @@ -18,12 +18,12 @@

package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandler;

public interface NettyProtocol {

void setServerChannelPipeline(ChannelPipeline channelPipeline);
ChannelHandler[] getServerChannelHandlers();

void setClientChannelPipeline(ChannelPipeline channelPipeline);
ChannelHandler[] getClientChannelHandlers();

}
Expand Up @@ -119,7 +119,7 @@ void init(final NettyProtocol protocol) throws IOException {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
protocol.setServerChannelPipeline(channel.pipeline());
channel.pipeline().addLast(protocol.getServerChannelHandlers());
}
});

Expand All @@ -130,7 +130,11 @@ public void initChannel(SocketChannel channel) throws Exception {
bindFuture = bootstrap.bind().syncUninterruptibly();

long end = System.currentTimeMillis();
LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString());
LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString());
}

NettyConfig getConfig() {
return config;
}

void shutdown() {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelFutureListener;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
Expand Down Expand Up @@ -84,7 +85,7 @@ boolean incrementReferenceCounter() {
* The request goes to the remote producer, for which this partition
* request client instance has been created.
*/
public void requestSubpartition(
public ChannelFuture requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
Expand All @@ -103,21 +104,31 @@ public void requestSubpartition(
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
partitionRequestHandler.removeInputChannel(inputChannel);
inputChannel.onError(future.cause());
inputChannel.onError(
new LocalTransportException(
"Sending the partition request failed.",
future.channel().localAddress(), future.cause()
));
}
}
};

if (delayMs == 0) {
tcpChannel.writeAndFlush(request).addListener(listener);
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
return f;
}
else {
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
tcpChannel.writeAndFlush(request).addListener(listener);
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS);

return f[0];
}
}

Expand All @@ -137,7 +148,10 @@ public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
inputChannel.onError(future.cause());
inputChannel.onError(new LocalTransportException(
"Sending the task event failed.",
future.channel().localAddress(), future.cause()
));
}
}
});
Expand Down
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;

import java.io.IOException;
Expand Down Expand Up @@ -161,10 +163,11 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
PartitionRequestClientHandler requestHandler =
(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
PartitionRequestClientHandler requestHandler = channel.pipeline()
.get(PartitionRequestClientHandler.class);

partitionRequestClient = new PartitionRequestClient(channel, requestHandler, connectionId, clientFactory);
partitionRequestClient = new PartitionRequestClient(
channel, requestHandler, connectionId, clientFactory);

if (disposeRequestClient) {
partitionRequestClient.disposeIfNotUsed();
Expand Down Expand Up @@ -209,10 +212,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
handInChannel(future.channel());
}
else if (future.cause() != null) {
notifyOfError(future.cause());
notifyOfError(new RemoteTransportException(
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has failed. This might indicate that the remote task " +
"manager has been lost.",
connectionId.getAddress(), future.cause()));
}
else {
notifyOfError(new IllegalStateException("Connecting the channel has been cancelled."));
notifyOfError(new LocalTransportException(
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has been cancelled.", null));
}
}
}
Expand Down

0 comments on commit 2a65b62

Please sign in to comment.