Skip to content

Commit

Permalink
[FLINK-2994] [client] Report error cause when jobs switch to failing.
Browse files Browse the repository at this point in the history
For jobs that do not switch to FAILED, but rather RESTARTING, this now prints the error cause
as well. Also minor improvement to exception printing in CliFrontend.
  • Loading branch information
StephanEwen committed Nov 10, 2015
1 parent 0de13b5 commit 976bacc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
Expand Up @@ -125,8 +125,6 @@ public class CliFrontend {

private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);

private final File configDirectory;

private final Configuration config;

private final FiniteDuration askTimeout;
Expand Down Expand Up @@ -155,12 +153,12 @@ public CliFrontend() throws Exception {
public CliFrontend(String configDir) throws Exception {

// configure the config directory
this.configDirectory = new File(configDir);
LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath());
File configDirectory = new File(configDir);
LOG.info("Using configuration directory " + configDirectory.getAbsolutePath());

// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath());
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();

// load the YARN properties
Expand All @@ -175,13 +173,9 @@ public CliFrontend(String configDir) throws Exception {

Properties yarnProperties = new Properties();
try {
InputStream is = new FileInputStream(propertiesFile);
try {
try (InputStream is = new FileInputStream(propertiesFile)) {
yarnProperties.load(is);
}
finally {
is.close();
}
}
catch (IOException e) {
throw new Exception("Cannot read the YARN properties file", e);
Expand Down Expand Up @@ -915,9 +909,9 @@ private int handleError(Throwable t) {
}
LOG.error("Error while running the command.", t);

System.err.println("\n------------------------------------------------------------");
System.err.println(" The program finished with the following exception:\n");

This comment has been minimized.

Copy link
@mxm

mxm Nov 11, 2015

Contributor

\n is not compatible with a non Unix environment.

This comment has been minimized.

Copy link
@StephanEwen

StephanEwen Nov 11, 2015

Author Contributor

true, so this gives formatting errors on Windows

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2015

Contributor

Adjusted in 9a911c8.

t.printStackTrace();
System.err.println();
System.err.println("The exception above occurred while trying to run your command.");
return 1;
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
Expand Down Expand Up @@ -113,9 +114,9 @@ protected void handleMessage(Object message) {
// =========== State Change Messages ===============

if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
logAndPrintMessage(message);
logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
logAndPrintMessage(message);
logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
}

// ============ JobManager ActorRef resolution ===============
Expand Down Expand Up @@ -276,13 +277,30 @@ protected UUID getLeaderSessionID() {
return leaderSessionID;
}

private void logAndPrintMessage(Object message) {
private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) {
LOG.info(message.toString());
if (sysoutUpdates) {
System.out.println(message.toString());
}
}

private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) {
// by default, this only prints the status, and not any exception.
// in state FAILING, we report the exception in addition
if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) {
LOG.info(message.toString());
if (sysoutUpdates) {
System.out.println(message.toString());
}
} else {
LOG.info(message.toString(), message.error());
if (sysoutUpdates) {
System.out.println(message.toString());
message.error().printStackTrace(System.out);
}
}
}

@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
getSelf().tell(
Expand Down

0 comments on commit 976bacc

Please sign in to comment.