From 46ad40588538b4e5d9f2e50fe65af5a83f9c4797 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 15 Jun 2015 16:37:35 +0200 Subject: [PATCH] [FLINK-2226][YARN] fail application on failed single-job cluster job Failing jobs executed in the YARN cluster mode leave the application container in the "SUCCEEDED" final state. While for long-running Flink YARN clusters where multiple jobs are run, this is fine, for single jobs it is appropriate to mark the application as failed. This closes #838. --- .../org/apache/flink/client/CliFrontend.java | 8 ++--- .../flink/client/FlinkYarnSessionCli.java | 6 ++-- .../apache/flink/client/program/Client.java | 1 + .../flink/runtime/jobgraph/JobStatus.java | 2 +- .../yarn/AbstractFlinkYarnCluster.java | 2 +- .../flink/yarn/YARNSessionFIFOITCase.java | 2 +- .../apache/flink/yarn/FlinkYarnClient.java | 4 +-- .../apache/flink/yarn/FlinkYarnCluster.java | 32 ++++++++++++------- .../apache/flink/yarn/ApplicationClient.scala | 2 +- .../flink/yarn/ApplicationMasterActor.scala | 10 ++++-- 10 files changed, 43 insertions(+), 26 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 3e61a3b3b7644..e1bacdefb2b20 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -265,6 +265,7 @@ protected int run(String[] args) { return handleError(t); } + int exitCode = 1; try { int userParallelism = options.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); @@ -276,15 +277,14 @@ protected int run(String[] args) { "To use another parallelism, set it at the ./bin/flink client."); userParallelism = client.getMaxSlots(); } - int exitCode = 0; // check if detached per job yarn cluster is used to start flink if(yarnCluster != null && yarnCluster.isDetached()) { logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getApplicationId()+"\n" + + "yarn application -kill " + yarnCluster.getApplicationId() + "\n" + "Please also note that the temporary files of the YARN session in the home directoy will not be removed."); - executeProgram(program, client, userParallelism, false); + exitCode = executeProgram(program, client, userParallelism, false); } else { // regular (blocking) execution. exitCode = executeProgram(program, client, userParallelism, true); @@ -314,7 +314,7 @@ protected int run(String[] args) { finally { if (yarnCluster != null && !yarnCluster.isDetached()) { logAndSysout("Shutting down YARN cluster"); - yarnCluster.shutdown(); + yarnCluster.shutdown(exitCode != 0); } if (program != null) { program.deleteExtractedLibraries(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 0fa7173081ff8..c11edc7ef4b48 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -302,7 +302,7 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) { if (yarnCluster.hasFailed()) { System.err.println("The YARN cluster has failed"); - yarnCluster.shutdown(); + yarnCluster.shutdown(true); } // wait until CLIENT_POLLING_INTERVALL is over or the user entered something. @@ -439,7 +439,7 @@ public int run(String[] args) { if (!yarnCluster.hasBeenStopped()) { LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(); + yarnCluster.shutdown(false); } try { @@ -458,7 +458,7 @@ public int run(String[] args) { public void stop() { if (yarnCluster != null) { LOG.info("Command line interface is shutting down the yarnCluster"); - yarnCluster.shutdown(); + yarnCluster.shutdown(false); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index e219a38d6b668..c544e8d553045 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -318,6 +318,7 @@ else if (prog.isUsingInteractiveMode()) { ContextEnvironment.enableLocalExecution(true); } + // Job id has been set in the Client passed to the ContextEnvironment return new JobSubmissionResult(lastJobId); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 667a68e261cd6..eb7d017c49a97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -51,7 +51,7 @@ public enum JobStatus { private final boolean terminalState; - private JobStatus(boolean terminalState) { + JobStatus(boolean terminalState) { this.terminalState = terminalState; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java index 398709e26920f..c2e897f9a0fa6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster { public abstract String getWebInterfaceURL(); - public abstract void shutdown(); + public abstract void shutdown(boolean failApplication); public abstract boolean hasBeenStopped(); diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index ccde5d859e034..dd32b0d49ab38 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -647,7 +647,7 @@ public void testJavaAPI() { LOG.info("Shutting down cluster. All tests passed"); // shutdown cluster - yarnCluster.shutdown(); + yarnCluster.shutdown(false); LOG.info("Finished testJavaAPI()"); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 118f4ad288c17..f82f013d9d8dc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -435,11 +435,11 @@ protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exc + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC); } - if( taskManagerMemoryMb > freeClusterMem.containerLimit) { + if(taskManagerMemoryMb > freeClusterMem.containerLimit) { LOG.warn("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than " + "the largest possible YARN container: "+freeClusterMem.containerLimit + NOTE_RSC); } - if( jobManagerMemoryMb > freeClusterMem.containerLimit) { + if(jobManagerMemoryMb > freeClusterMem.containerLimit) { LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than " + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 6dd84d654a6a6..e408edb897e75 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -297,12 +297,12 @@ public boolean hasFailed() { } ApplicationReport lastReport = pollingRunner.getLastReport(); if(lastReport == null) { - LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." + + LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." + "The system might be in an erroneous state"); return false; } else { YarnApplicationState appState = lastReport.getYarnApplicationState(); - boolean status= (appState == YarnApplicationState.FAILED || + boolean status = (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED); if(status) { LOG.warn("YARN reported application state {}", appState); @@ -381,12 +381,13 @@ public List getNewMessages() { // -------------------------- Shutdown handling ------------------------ private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); - @Override - public void shutdown() { - shutdownInternal(true); - } - private void shutdownInternal(boolean removeShutdownHook) { + /** + * Shutdown the YARN cluster. + * @param failApplication whether we should fail the YARN application (in case of errors in Flink) + */ + @Override + public void shutdown(boolean failApplication) { if(!isConnected) { throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); } @@ -394,16 +395,25 @@ private void shutdownInternal(boolean removeShutdownHook) { if(hasBeenShutDown.getAndSet(true)) { return; } - // the session is being stopped explicitly. - if(removeShutdownHook) { + + try { Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } catch (IllegalStateException e) { + // we are already in the shutdown hook } + if(actorSystem != null){ LOG.info("Sending shutdown request to the Application Master"); if(applicationClient != ActorRef.noSender()) { try { + FinalApplicationStatus finalStatus; + if (failApplication) { + finalStatus = FinalApplicationStatus.FAILED; + } else { + finalStatus = FinalApplicationStatus.SUCCEEDED; + } Future response = Patterns.ask(applicationClient, - new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED, + new Messages.StopYarnSession(finalStatus, "Flink YARN Client requested shutdown"), new Timeout(akkaDuration)); @@ -457,7 +467,7 @@ public class ClientShutdownHook extends Thread { @Override public void run() { LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook"); - shutdownInternal(false); + shutdown(true); } } diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index d6760ecf2f24c..ec980d0bc2bf1 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration) case LocalGetYarnClusterStatus => sender() ! latestClusterStatus - // Forward message to Application Master + // Forward message to Application Master case msg: StopAMAfterJob => yarnJobManager foreach { _ forward msg diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 999610fd40623..411808b998ed3 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -26,6 +26,7 @@ import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.ConfigConstants import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} import org.apache.flink.runtime.messages.Messages.Acknowledge @@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages { if(jobStatus.status.isTerminalState) { log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") - self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, - s"The monitored job with ID ${jobStatus.jobID} has finished.") + if (jobStatus.status == JobStatus.FINISHED) { + self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, + s"The monitored job with ID ${jobStatus.jobID} has finished.") + } else { + self ! StopYarnSession(FinalApplicationStatus.FAILED, + s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") + } } else { log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") }