diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index 1c8fb21acf5e9..4af6f0988eba7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -152,8 +153,11 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception { } } - private void shutDownAndTerminate(boolean cleanupHaData) { + private void shutDownAndTerminate(boolean cleanupHaData, ApplicationStatus status, String optionalDiagnostics) { try { + if (resourceManager != null) { + resourceManager.shutDownCluster(status, optionalDiagnostics); + } shutDown(cleanupHaData); } catch (Throwable t) { LOG.error("Could not properly shut down cluster entrypoint.", t); @@ -185,21 +189,21 @@ private TerminatingOnCompleteActions(JobID jobId) { public void jobFinished(JobExecutionResult result) { LOG.info("Job({}) finished.", jobId); - shutDownAndTerminate(true); + shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null); } @Override public void jobFailed(Throwable cause) { LOG.info("Job({}) failed.", jobId, cause); - shutDownAndTerminate(false); + shutDownAndTerminate(false, ApplicationStatus.FAILED, cause.getMessage()); } @Override public void jobFinishedByOther() { LOG.info("Job({}) was finished by another JobManager.", jobId); - shutDownAndTerminate(false); + shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master"); } } }