Skip to content

Commit

Permalink
[FLINK-4566] [network runtime] Properly preserve exception causes for…
Browse files Browse the repository at this point in the history
… ProducerFailedException
  • Loading branch information
StephanEwen committed Sep 2, 2016
1 parent 761d0a0 commit e227b10
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.runtime.util.SerializedThrowable;

/**
* Network-stack level Exception to notify remote receiver about a failed
Expand All @@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException {

private static final long serialVersionUID = -1555492656299526395L;

private final String causeAsString;

/**
* The cause of the producer failure.
*
* Note: The cause will be stringified, because it might be an instance of
* a user level Exception, which can not be deserialized by the remote
* receiver's system class loader.
* <p>The cause will be stored as a {@link SerializedThrowable}, because it might
* be an instance of a user level Exception, which may not be possible to deserialize
* by the remote receiver's system class loader.
*/
public ProducerFailedException(Throwable cause) {
this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null;
}

/**
* Returns the stringified cause of the producer failure.
*/
public String getCauseAsString() {
return causeAsString;
super(new SerializedThrowable(cause));
}
}
Expand Up @@ -19,27 +19,27 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.util.SerializedThrowable;

import org.junit.Test;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class ProducerFailedExceptionTest {

@Test
public void testInstanceOfCancelTaskException() throws Exception {
ProducerFailedException e = new ProducerFailedException(new Exception());
assertTrue(e instanceof CancelTaskException);
assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
}

@Test
public void testCauseIsStringified() throws Exception {
public void testCauseIsSerialized() throws Exception {
// Tests that the cause is stringified, because it might be an instance
// of a user level Exception, which can not be deserialized by the
// remote receiver's system class loader.
ProducerFailedException e = new ProducerFailedException(new Exception());
assertNull(e.getCause());
assertNotNull(e.getCauseAsString());
assertNotNull(e.getCause());
assertTrue(e.getCause() instanceof SerializedThrowable);
}
}

0 comments on commit e227b10

Please sign in to comment.