Skip to content

Commit

Permalink
[FLINK-2226][YARN] fail application on failed single-job cluster job
Browse files Browse the repository at this point in the history
Failing jobs executed in the YARN cluster mode leave the application
container in the "SUCCEEDED" final state. While for long-running Flink
YARN clusters where multiple jobs are run, this is fine, for single jobs
it is appropriate to mark the application as failed.

This closes #838.
  • Loading branch information
mxm committed Jun 16, 2015
1 parent 1bd0af7 commit 46ad405
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 26 deletions.
Expand Up @@ -265,6 +265,7 @@ protected int run(String[] args) {
return handleError(t);
}

int exitCode = 1;
try {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
Expand All @@ -276,15 +277,14 @@ protected int run(String[] args) {
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}
int exitCode = 0;

// check if detached per job yarn cluster is used to start flink
if(yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
executeProgram(program, client, userParallelism, false);
exitCode = executeProgram(program, client, userParallelism, false);
} else {
// regular (blocking) execution.
exitCode = executeProgram(program, client, userParallelism, true);
Expand Down Expand Up @@ -314,7 +314,7 @@ protected int run(String[] args) {
finally {
if (yarnCluster != null && !yarnCluster.isDetached()) {
logAndSysout("Shutting down YARN cluster");
yarnCluster.shutdown();
yarnCluster.shutdown(exitCode != 0);
}
if (program != null) {
program.deleteExtractedLibraries();
Expand Down
Expand Up @@ -302,7 +302,7 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {

if (yarnCluster.hasFailed()) {
System.err.println("The YARN cluster has failed");
yarnCluster.shutdown();
yarnCluster.shutdown(true);
}

// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
Expand Down Expand Up @@ -439,7 +439,7 @@ public int run(String[] args) {

if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown();
yarnCluster.shutdown(false);
}

try {
Expand All @@ -458,7 +458,7 @@ public int run(String[] args) {
public void stop() {
if (yarnCluster != null) {
LOG.info("Command line interface is shutting down the yarnCluster");
yarnCluster.shutdown();
yarnCluster.shutdown(false);
}
}
}
Expand Up @@ -318,6 +318,7 @@ else if (prog.isUsingInteractiveMode()) {
ContextEnvironment.enableLocalExecution(true);
}

// Job id has been set in the Client passed to the ContextEnvironment
return new JobSubmissionResult(lastJobId);
}
else {
Expand Down
Expand Up @@ -51,7 +51,7 @@ public enum JobStatus {

private final boolean terminalState;

private JobStatus(boolean terminalState) {
JobStatus(boolean terminalState) {
this.terminalState = terminalState;
}

Expand Down
Expand Up @@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster {

public abstract String getWebInterfaceURL();

public abstract void shutdown();
public abstract void shutdown(boolean failApplication);

public abstract boolean hasBeenStopped();

Expand Down
Expand Up @@ -647,7 +647,7 @@ public void testJavaAPI() {

LOG.info("Shutting down cluster. All tests passed");
// shutdown cluster
yarnCluster.shutdown();
yarnCluster.shutdown(false);
LOG.info("Finished testJavaAPI()");
}
}
Expand Up @@ -435,11 +435,11 @@ protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exc
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);

}
if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
+ "the largest possible YARN container: "+freeClusterMem.containerLimit + NOTE_RSC);
}
if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
}
Expand Down
Expand Up @@ -297,12 +297,12 @@ public boolean hasFailed() {
}
ApplicationReport lastReport = pollingRunner.getLastReport();
if(lastReport == null) {
LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." +
LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." +
"The system might be in an erroneous state");
return false;
} else {
YarnApplicationState appState = lastReport.getYarnApplicationState();
boolean status= (appState == YarnApplicationState.FAILED ||
boolean status = (appState == YarnApplicationState.FAILED ||
appState == YarnApplicationState.KILLED);
if(status) {
LOG.warn("YARN reported application state {}", appState);
Expand Down Expand Up @@ -381,29 +381,39 @@ public List<String> getNewMessages() {
// -------------------------- Shutdown handling ------------------------

private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
@Override
public void shutdown() {
shutdownInternal(true);
}

private void shutdownInternal(boolean removeShutdownHook) {
/**
* Shutdown the YARN cluster.
* @param failApplication whether we should fail the YARN application (in case of errors in Flink)
*/
@Override
public void shutdown(boolean failApplication) {
if(!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
}

if(hasBeenShutDown.getAndSet(true)) {
return;
}
// the session is being stopped explicitly.
if(removeShutdownHook) {

try {
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
} catch (IllegalStateException e) {
// we are already in the shutdown hook
}

if(actorSystem != null){
LOG.info("Sending shutdown request to the Application Master");
if(applicationClient != ActorRef.noSender()) {
try {
FinalApplicationStatus finalStatus;
if (failApplication) {
finalStatus = FinalApplicationStatus.FAILED;
} else {
finalStatus = FinalApplicationStatus.SUCCEEDED;
}
Future<Object> response = Patterns.ask(applicationClient,
new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED,
new Messages.StopYarnSession(finalStatus,
"Flink YARN Client requested shutdown"),
new Timeout(akkaDuration));

Expand Down Expand Up @@ -457,7 +467,7 @@ public class ClientShutdownHook extends Thread {
@Override
public void run() {
LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook");
shutdownInternal(false);
shutdown(true);
}
}

Expand Down
Expand Up @@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration)
case LocalGetYarnClusterStatus =>
sender() ! latestClusterStatus

// Forward message to Application Master
// Forward message to Application Master
case msg: StopAMAfterJob =>
yarnJobManager foreach {
_ forward msg
Expand Down
Expand Up @@ -26,6 +26,7 @@ import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
import org.apache.flink.runtime.messages.Messages.Acknowledge
Expand Down Expand Up @@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
if(jobStatus.status.isTerminalState) {
log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
s"Shutting down YARN session")
self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
s"The monitored job with ID ${jobStatus.jobID} has finished.")
if (jobStatus.status == JobStatus.FINISHED) {
self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
s"The monitored job with ID ${jobStatus.jobID} has finished.")
} else {
self ! StopYarnSession(FinalApplicationStatus.FAILED,
s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
}
} else {
log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
}
Expand Down

0 comments on commit 46ad405

Please sign in to comment.