diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 4eb48e98b5fc5..581a56f3ccb09 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -899,10 +899,15 @@ public JobExecutionResult execute(String jobName) throws Exception { jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null)); } catch (Throwable t) { + // get() on the JobExecutionResult Future will throw an ExecutionException. This + // behaviour was largely not there in Flink versions before the PipelineExecutor + // refactoring so we should strip that exception. + Throwable strippedException = ExceptionUtils.stripExecutionException(t); + jobListeners.forEach(jobListener -> { - jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)); + jobListener.onJobExecuted(null, strippedException); }); - ExceptionUtils.rethrowException(t); + ExceptionUtils.rethrowException(strippedException); } return lastJobExecutionResult; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 78b3e54086e20..e559147825f09 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1712,10 +1712,15 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { return jobExecutionResult; } catch (Throwable t) { + // get() on the JobExecutionResult Future will throw an ExecutionException. This + // behaviour was largely not there in Flink versions before the PipelineExecutor + // refactoring so we should strip that exception. + Throwable strippedException = ExceptionUtils.stripExecutionException(t); + jobListeners.forEach(jobListener -> { - jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)); + jobListener.onJobExecuted(null, strippedException); }); - ExceptionUtils.rethrowException(t); + ExceptionUtils.rethrowException(strippedException); // never reached, only make javac happy return null; diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index dea9f54e437f6..ee481ed80add8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -233,7 +233,7 @@ public Long map(Long value) throws Exception { Throwable error = errorRef[0]; assertNotNull("The program did not fail properly", error); - assertTrue(error.getCause() instanceof ProgramInvocationException); + assertTrue(error instanceof ProgramInvocationException); // all seems well :-) } catch (Exception e) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java new file mode 100644 index 0000000000000..7dd02e09c283c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api; + +import org.apache.flink.client.deployment.executors.RemoteExecutor; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Integration tests for {@link StreamExecutionEnvironment}. + */ +public class StreamExecutionEnvironmentITCase { + + // We use our own miniClusterResource because we wan't to connect to it using a remote executor. + @ClassRule + public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void executeThrowsProgramInvocationException() throws Exception { + UnmodifiableConfiguration clientConfiguration = miniClusterResource.getClientConfiguration(); + Configuration config = new Configuration(clientConfiguration); + config.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); + config.setBoolean(DeploymentOptions.ATTACHED, true); + + // Create the execution environment explicitly from a Configuration so we know that we + // don't get some other subclass. If we just did + // StreamExecutionEnvironment.getExecutionEnvironment() we would get a + // TestStreamEnvironment that the MiniClusterResource created. We want to test the behaviour + // of the base environment, though. + StreamExecutionEnvironment env = + new StreamExecutionEnvironment(config); + + env + .fromElements("hello") + .map(in -> { + throw new RuntimeException("Failing"); + }) + .print(); + + thrown.expect(ProgramInvocationException.class); + env.execute(); + } +}