Skip to content

Commit

Permalink
[SPARK-17742][core] Handle child process exit in SparkLauncher.
Browse files Browse the repository at this point in the history
Currently the launcher handle does not monitor the child spark-submit
process it launches; this means that if the child exits with an error,
the handle's state will never change, and an application will not know
that the application has failed.

This change adds code to monitor the child process, and changes the
handle state appropriately when the child process exits.

Tested with added unit tests.
  • Loading branch information
Marcelo Vanzin committed Aug 7, 2017
1 parent baf5cac commit 15e1a73
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 31 deletions.
Expand Up @@ -116,11 +116,11 @@ public void testChildProcLauncher() throws Exception {
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.redirectError()
.addAppArgs("proc");
final Process app = launcher.launch();

new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
assertEquals(0, app.waitFor());
}

Expand Down
Expand Up @@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
private final String secret;
private final LauncherServer server;

private Process childProc;
private volatile Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
Expand Down Expand Up @@ -96,9 +96,7 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
if (!disposed) {
disconnect();
}
disconnect();
if (childProc != null) {
try {
childProc.exitValue();
Expand All @@ -118,14 +116,40 @@ void setChildProc(Process childProc, String loggerName, InputStream logStream) {
this.childProc = childProc;
if (logStream != null) {
this.redirector = new OutputRedirector(logStream, loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
SparkLauncher.REDIRECTOR_FACTORY, this);
} else {
// If there is no log redirection, spawn a thread that will wait for the child process
// to finish.
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
waiter.setDaemon(true);
waiter.start();
}
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

/**
* Callback for when the child process exits. Forcefully put the application in a final state,
* overwriting the current final state unless it is already FAILED.
*/
synchronized void childProcessExited() {
disconnect();

int ec;
try {
ec = childProc.exitValue();
} catch (Exception e) {
ec = 1;
}

if (!state.isFinal() || (ec != 0 && state != State.FAILED)) {
state = State.LOST;
fireEvent(false);
}
}

LauncherServer getServer() {
return server;
}
Expand All @@ -134,7 +158,7 @@ LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
synchronized void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
Expand All @@ -144,17 +168,12 @@ void setState(State s) {
}
}

void setAppId(String appId) {
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

// Visible for testing.
boolean isRunning() {
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
}

private synchronized void fireEvent(boolean isInfoChanged) {
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
Expand All @@ -166,4 +185,15 @@ private synchronized void fireEvent(boolean isInfoChanged) {
}
}

private void monitorChild() {
while (childProc.isAlive()) {
try {
childProc.waitFor();
} catch (Exception e) {
// Try again.
}
}
childProcessExited();
}

}
Expand Up @@ -34,18 +34,24 @@ class OutputRedirector {
private final BufferedReader reader;
private final Logger sink;
private final Thread thread;
private final ChildProcAppHandle callback;

private volatile boolean active;

OutputRedirector(InputStream in, ThreadFactory tf) {
this(in, OutputRedirector.class.getName(), tf);
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
}

OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
OutputRedirector(
InputStream in,
String loggerName,
ThreadFactory tf,
ChildProcAppHandle callback) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
this.thread = tf.newThread(this::redirect);
this.sink = Logger.getLogger(loggerName);
this.callback = callback;
thread.start();
}

Expand All @@ -59,6 +65,10 @@ private void redirect() {
}
} catch (IOException e) {
sink.log(Level.FINE, "Error reading child process output.", e);
} finally {
if (callback != null) {
callback.childProcessExited();
}
}
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.nio.file.attribute.PosixFilePermission.*;

Expand All @@ -39,7 +40,7 @@

import static org.apache.spark.launcher.CommandBuilderUtils.*;

public class OutputRedirectionSuite extends BaseSuite {
public class ChildProcAppHandleSuite extends BaseSuite {

private static final List<String> MESSAGES = new ArrayList<>();

Expand Down Expand Up @@ -99,7 +100,8 @@ public void testRedirectLastWins() throws Exception {
public void testRedirectToLog() throws Exception {
assumeFalse(isWindows());

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.startApplication();
waitFor(handle);

assertTrue(MESSAGES.contains("output"));
Expand All @@ -112,7 +114,7 @@ public void testRedirectErrorToLog() throws Exception {

Path err = Files.createTempFile("stderr", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectError(err.toFile())
.startApplication();
waitFor(handle);
Expand All @@ -127,7 +129,7 @@ public void testRedirectOutputToLog() throws Exception {

Path out = Files.createTempFile("stdout", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectOutput(out.toFile())
.startApplication();
waitFor(handle);
Expand Down Expand Up @@ -173,17 +175,37 @@ public void testRedirectErrorTwiceFails() throws Exception {
.waitFor();
}

private void waitFor(ChildProcAppHandle handle) throws Exception {
@Test
public void testProcMonitorWithOutputRedirection() throws Exception {
File err = Files.createTempFile("out", "txt").toFile();
SparkAppHandle handle = new TestSparkLauncher()
.redirectError()
.redirectOutput(err)
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

@Test
public void testProcMonitorWithLogRedirection() throws Exception {
SparkAppHandle handle = new TestSparkLauncher()
.redirectToLog(getClass().getName())
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

private void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (handle.isRunning()) {
Thread.sleep(10);
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
// Explicit unregister from server since the handle doesn't yet do that when the
// process finishes by itself.
LauncherServer server = LauncherServer.getServerInstance();
if (server != null) {
server.unregister(handle);
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion launcher/src/test/resources/log4j.properties
Expand Up @@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
log4j.appender.childproc.layout.ConversionPattern=%t: %m%n

log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false

Expand Down

0 comments on commit 15e1a73

Please sign in to comment.