Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
Expand Down Expand Up @@ -236,7 +237,8 @@ private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
/**
* Checks for an error and rethrows it if one was reported.
*/
private void checkError() throws IOException {
@VisibleForTesting
void checkError() throws IOException {
final Throwable t = channelError.get();

if (t != null) {
Expand Down Expand Up @@ -264,7 +266,12 @@ private void decodeMsg(Object msg) throws Throwable {
return;
}

decodeBufferOrEvent(inputChannel, bufferOrEvent);
try {
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} catch (Throwable t) {
inputChannel.onError(t);
}


} else if (msgClazz == NettyMessage.ErrorResponse.class) {
// ---- Error ---------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
Expand Down Expand Up @@ -486,6 +488,47 @@ public void testReadBufferResponseAfterRemovingChannel() throws Exception {
testReadBufferResponseWithReleasingOrRemovingChannel(true, false);
}

@Test
public void testDoNotFailHandlerOnSingleChannelFailure() throws Exception {
// Setup
final int bufferSize = 1024;
final String expectedMessage = "test exception on buffer";
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2);
final SingleInputGate inputGate = createSingleInputGate(1, networkBufferPool);
final RemoteInputChannel inputChannel = new TestRemoteInputChannelForError(inputGate, expectedMessage);
final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();

try {
inputGate.setInputChannels(inputChannel);
inputGate.assignExclusiveSegments();
inputGate.requestPartitions();
handler.addInputChannel(inputChannel);

final BufferResponse bufferResponse = createBufferResponse(
TestBufferFactory.createBuffer(bufferSize),
0,
inputChannel.getInputChannelId(),
1,
new NetworkBufferAllocator(handler));

// It will trigger an expected exception from TestRemoteInputChannelForError#onBuffer
handler.channelRead(null, bufferResponse);

// The handler should not be tagged as error for above excepted exception
handler.checkError();

try {
// The input channel should be tagged as error and the respective exception is thrown via #getNext
inputGate.getNext();
} catch (IOException ignored) {
assertEquals(expectedMessage, ignored.getMessage());
}
} finally {
// Cleanup
releaseResource(inputGate, networkBufferPool);
}
}

private void testReadBufferResponseWithReleasingOrRemovingChannel(
boolean isRemoved,
boolean readBeforeReleasingOrRemoving) throws Exception {
Expand Down Expand Up @@ -575,4 +618,32 @@ private static BufferResponse createBufferResponse(
// Deserialize the bytes to construct the BufferResponse.
return BufferResponse.readFrom(serialized, allocator);
}

/**
* The test remote input channel to throw expected exception while calling
* {@link RemoteInputChannel#onBuffer(Buffer, int, int)}.
*/
private static class TestRemoteInputChannelForError extends RemoteInputChannel {
private final String expectedMessage;

TestRemoteInputChannelForError(SingleInputGate inputGate, String expectedMessage) {
super(
inputGate,
0,
new ResultPartitionID(),
InputChannelBuilder.STUB_CONNECTION_ID,
new TestingConnectionManager(),
0,
100,
new SimpleCounter(),
new SimpleCounter());
this.expectedMessage = expectedMessage;
}

@Override
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
buffer.recycleBuffer();
throw new IOException(expectedMessage);
}
}
}