From b047b2a50791f4eeeb4c3a984d060ffdbf57ea26 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Fri, 8 Dec 2017 18:02:42 +0800 Subject: [PATCH 1/3] [FLINK-8224] [runtime] shutdown application when job terminated in job mode --- .../runtime/entrypoint/JobClusterEntrypoint.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index 1c8fb21acf5e9..fef524212b210 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -152,8 +153,11 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception { } } - private void shutDownAndTerminate(boolean cleanupHaData) { + private void shutDownAndTerminate(boolean cleanupHaData, ApplicationStatus status, String optionalDiagnostics) { try { + if (resourceManager != null) { + resourceManager.shutDownCluster(status, optionalDiagnostics); + } shutDown(cleanupHaData); } catch (Throwable t) { LOG.error("Could not properly shut down cluster entrypoint.", t); @@ -185,21 +189,21 @@ private TerminatingOnCompleteActions(JobID jobId) { public void jobFinished(JobExecutionResult result) { LOG.info("Job({}) finished.", jobId); - shutDownAndTerminate(true); + shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null); } @Override public void jobFailed(Throwable cause) { LOG.info("Job({}) failed.", jobId, cause); - shutDownAndTerminate(false); + shutDownAndTerminate(false, ApplicationStatus.FAILED, cause.getMessage()); } @Override public void jobFinishedByOther() { LOG.info("Job({}) was finished by another JobManager.", jobId); - shutDownAndTerminate(false); + shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job finish by other another master"); } } } From ba3609a319e0a0891b798091278b6ca2fc5530f9 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Fri, 8 Dec 2017 18:05:33 +0800 Subject: [PATCH 2/3] corrent the typepo --- .../apache/flink/runtime/entrypoint/JobClusterEntrypoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index fef524212b210..d4e5eb4b49685 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -203,7 +203,7 @@ public void jobFailed(Throwable cause) { public void jobFinishedByOther() { LOG.info("Job({}) was finished by another JobManager.", jobId); - shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job finish by other another master"); + shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by other another master"); } } } From d3024fb3841d9e0a5377ccbecb04aa36c5f8de0c Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Fri, 8 Dec 2017 18:12:08 +0800 Subject: [PATCH 3/3] corrent the typepo --- .../apache/flink/runtime/entrypoint/JobClusterEntrypoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index d4e5eb4b49685..4af6f0988eba7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -203,7 +203,7 @@ public void jobFailed(Throwable cause) { public void jobFinishedByOther() { LOG.info("Job({}) was finished by another JobManager.", jobId); - shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by other another master"); + shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master"); } } }