diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java index 2b2acabf278db..934234da897ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.runtime.util.SerializedThrowable; /** * Network-stack level Exception to notify remote receiver about a failed @@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException { private static final long serialVersionUID = -1555492656299526395L; - private final String causeAsString; - /** * The cause of the producer failure. * - * Note: The cause will be stringified, because it might be an instance of - * a user level Exception, which can not be deserialized by the remote - * receiver's system class loader. + *

The cause will be stored as a {@link SerializedThrowable}, because it might + * be an instance of a user level Exception, which may not be possible to deserialize + * by the remote receiver's system class loader. */ public ProducerFailedException(Throwable cause) { - this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null; - } - - /** - * Returns the stringified cause of the producer failure. - */ - public String getCauseAsString() { - return causeAsString; + super(new SerializedThrowable(cause)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java index 042c136dfde9a..ca2de0ca83e40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java @@ -19,27 +19,27 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.util.SerializedThrowable; + import org.junit.Test; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ProducerFailedExceptionTest { @Test public void testInstanceOfCancelTaskException() throws Exception { - ProducerFailedException e = new ProducerFailedException(new Exception()); - assertTrue(e instanceof CancelTaskException); + assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class)); } @Test - public void testCauseIsStringified() throws Exception { + public void testCauseIsSerialized() throws Exception { // Tests that the cause is stringified, because it might be an instance // of a user level Exception, which can not be deserialized by the // remote receiver's system class loader. ProducerFailedException e = new ProducerFailedException(new Exception()); - assertNull(e.getCause()); - assertNotNull(e.getCauseAsString()); + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof SerializedThrowable); } }