Skip to content

Commit

Permalink
[FLINK-10400] Fail JobResult if application finished in CANCELED or F…
Browse files Browse the repository at this point in the history
…AILED state

In case of the CANCELED state, the client will throw an JobCancellationException.
In case of the FAILED state, the client will throw an JobExecutionException.

This closes #6742.
  • Loading branch information
tillrohrmann committed Sep 27, 2018
1 parent ec73c60 commit 7606ccf
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
Expand Down Expand Up @@ -94,8 +95,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobResult.WrappedJobException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e.getCause());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
Expand Down Expand Up @@ -263,8 +264,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
try {
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobResult.WrappedJobException we) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), we.getCause());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand Down Expand Up @@ -122,6 +123,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -229,6 +231,7 @@ public void testJobSubmitCancelStop() throws Exception {
TestJobExecutionResultHandler testJobExecutionResultHandler =
new TestJobExecutionResultHandler(
JobExecutionResultResponseBody.created(new JobResult.Builder()
.applicationStatus(ApplicationStatus.SUCCEEDED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.build()));
Expand Down Expand Up @@ -351,11 +354,13 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
JobExecutionResultResponseBody.inProgress(),
JobExecutionResultResponseBody.created(new JobResult.Builder()
.applicationStatus(ApplicationStatus.SUCCEEDED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
.build()),
JobExecutionResultResponseBody.created(new JobResult.Builder()
.applicationStatus(ApplicationStatus.FAILED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
Expand Down Expand Up @@ -385,8 +390,10 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
fail("Expected exception not thrown.");
} catch (final ProgramInvocationException e) {
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("expected"));
final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);

assertThat(cause.isPresent(), is(true));
assertThat(cause.get().getMessage(), equalTo("expected"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class JobResult implements Serializable {

private final JobID jobId;

private final ApplicationStatus applicationStatus;

private final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;

private final long netRuntime;
Expand All @@ -64,13 +68,15 @@ public class JobResult implements Serializable {

private JobResult(
final JobID jobId,
final ApplicationStatus applicationStatus,
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
final long netRuntime,
@Nullable final SerializedThrowable serializedThrowable) {

checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");

this.jobId = requireNonNull(jobId);
this.applicationStatus = requireNonNull(applicationStatus);
this.accumulatorResults = requireNonNull(accumulatorResults);
this.netRuntime = netRuntime;
this.serializedThrowable = serializedThrowable;
Expand All @@ -80,13 +86,17 @@ private JobResult(
* Returns {@code true} if the job finished successfully.
*/
public boolean isSuccess() {
return serializedThrowable == null;
return applicationStatus == ApplicationStatus.SUCCEEDED || (applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
}

public JobID getJobId() {
return jobId;
}

public ApplicationStatus getApplicationStatus() {
return applicationStatus;
}

public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorResults() {
return accumulatorResults;
}
Expand All @@ -108,22 +118,40 @@ public Optional<SerializedThrowable> getSerializedThrowable() {
*
* @param classLoader to use for deserialization
* @return JobExecutionResult
* @throws WrappedJobException if the JobResult contains a serialized exception
* @throws JobCancellationException if the job was cancelled
* @throws JobExecutionException if the job execution did not succeed
* @throws IOException if the accumulator could not be deserialized
* @throws ClassNotFoundException if the accumulator could not deserialized
*/
public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
if (serializedThrowable != null) {
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new WrappedJobException(throwable);
}
public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException {
if (applicationStatus == ApplicationStatus.SUCCEEDED) {
return new JobExecutionResult(
jobId,
netRuntime,
AccumulatorHelper.deserializeAccumulators(
accumulatorResults,
classLoader));
} else {
final Throwable cause;

if (serializedThrowable == null) {
cause = null;
} else {
cause = serializedThrowable.deserializeError(classLoader);
}

final JobExecutionException exception;

if (applicationStatus == ApplicationStatus.FAILED) {
exception = new JobExecutionException(jobId, "Job execution failed.", cause);
} else if (applicationStatus == ApplicationStatus.CANCELED) {
exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
} else {
exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause);
}

return new JobExecutionResult(
jobId,
netRuntime,
AccumulatorHelper.deserializeAccumulators(
accumulatorResults,
classLoader));
throw exception;
}
}

/**
Expand All @@ -134,6 +162,8 @@ public static class Builder {

private JobID jobId;

private ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;

private Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;

private long netRuntime = -1;
Expand All @@ -145,6 +175,11 @@ public Builder jobId(final JobID jobId) {
return this;
}

public Builder applicationStatus(final ApplicationStatus applicationStatus) {
this.applicationStatus = applicationStatus;
return this;
}

public Builder accumulatorResults(final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
this.accumulatorResults = accumulatorResults;
return this;
Expand All @@ -163,6 +198,7 @@ public Builder serializedThrowable(final SerializedThrowable serializedThrowable
public JobResult build() {
return new JobResult(
jobId,
applicationStatus,
accumulatorResults == null ? Collections.emptyMap() : accumulatorResults,
netRuntime,
serializedThrowable);
Expand All @@ -188,6 +224,8 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
final JobResult.Builder builder = new JobResult.Builder();
builder.jobId(jobId);

builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));

final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
// guard against clock changes
final long guardedNetRuntime = Math.max(netRuntime, 0L);
Expand All @@ -204,17 +242,4 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {

return builder.build();
}

/**
* Exception which indicates that the job has finished with an {@link Exception}.
*/
public static final class WrappedJobException extends FlinkException {

private static final long serialVersionUID = 6535061898650156019L;

public WrappedJobException(Throwable cause) {
super(cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,6 @@ public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionEx

try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (JobResult.WrappedJobException e) {
throw new JobExecutionException(job.getJobID(), e.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.json;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedThrowable;
Expand Down Expand Up @@ -68,6 +69,7 @@ public JobResultDeserializer() {
@Override
public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
JobID jobId = null;
ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
long netRuntime = -1;
SerializedThrowable serializedThrowable = null;
Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = null;
Expand All @@ -85,6 +87,10 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
assertNextToken(p, JsonToken.VALUE_STRING);
jobId = jobIdDeserializer.deserialize(p, ctxt);
break;
case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
assertNextToken(p, JsonToken.VALUE_STRING);
applicationStatus = ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
break;
case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
netRuntime = p.getLongValue();
Expand All @@ -105,6 +111,7 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
try {
return new JobResult.Builder()
.jobId(jobId)
.applicationStatus(applicationStatus)
.netRuntime(netRuntime)
.accumulatorResults(accumulatorResults)
.serializedThrowable(serializedThrowable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class JobResultSerializer extends StdSerializer<JobResult> {

static final String FIELD_NAME_JOB_ID = "id";

static final String FIELD_NAME_APPLICATION_STATUS = "application-status";

static final String FIELD_NAME_NET_RUNTIME = "net-runtime";

static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results";
Expand Down Expand Up @@ -76,6 +78,9 @@ public void serialize(
gen.writeFieldName(FIELD_NAME_JOB_ID);
jobIdSerializer.serialize(result.getJobId(), gen, provider);

gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
gen.writeString(result.getApplicationStatus().name());

gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
gen.writeStartObject();
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = result.getAccumulatorResults();
Expand Down
Loading

0 comments on commit 7606ccf

Please sign in to comment.