Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17742][core] Handle child process exit in SparkLauncher. #18877

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -116,11 +116,11 @@ public void testChildProcLauncher() throws Exception {
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.redirectError()
.addAppArgs("proc");
final Process app = launcher.launch();

new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
assertEquals(0, app.waitFor());
}

Expand Down
Expand Up @@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
private final String secret;
private final LauncherServer server;

private Process childProc;
private volatile Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
Expand Down Expand Up @@ -96,9 +96,7 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
if (!disposed) {
disconnect();
}
disconnect();
if (childProc != null) {
try {
childProc.exitValue();
Expand All @@ -118,14 +116,40 @@ void setChildProc(Process childProc, String loggerName, InputStream logStream) {
this.childProc = childProc;
if (logStream != null) {
this.redirector = new OutputRedirector(logStream, loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
SparkLauncher.REDIRECTOR_FACTORY, this);
} else {
// If there is no log redirection, spawn a thread that will wait for the child process
// to finish.
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
waiter.setDaemon(true);
waiter.start();
}
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

/**
* Callback for when the child process exits. Forcefully put the application in a final state,
* overwriting the current final state unless it is already FAILED.
*/
synchronized void childProcessExited() {
disconnect();

int ec;
try {
ec = childProc.exitValue();
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to log the exception here

ec = 1;
}

if (!state.isFinal() || (ec != 0 && state != State.FAILED)) {
state = State.LOST;
fireEvent(false);
}
}

LauncherServer getServer() {
return server;
}
Expand All @@ -134,7 +158,7 @@ LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
synchronized void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
Expand All @@ -144,17 +168,12 @@ void setState(State s) {
}
}

void setAppId(String appId) {
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

// Visible for testing.
boolean isRunning() {
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
}

private synchronized void fireEvent(boolean isInfoChanged) {
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
Expand All @@ -166,4 +185,15 @@ private synchronized void fireEvent(boolean isInfoChanged) {
}
}

private void monitorChild() {
while (childProc.isAlive()) {
try {
childProc.waitFor();
} catch (Exception e) {
// Try again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log exception here?

}
}
childProcessExited();
}

}
Expand Up @@ -34,18 +34,24 @@ class OutputRedirector {
private final BufferedReader reader;
private final Logger sink;
private final Thread thread;
private final ChildProcAppHandle callback;

private volatile boolean active;

OutputRedirector(InputStream in, ThreadFactory tf) {
this(in, OutputRedirector.class.getName(), tf);
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
}

OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
OutputRedirector(
InputStream in,
String loggerName,
ThreadFactory tf,
ChildProcAppHandle callback) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
this.thread = tf.newThread(this::redirect);
this.sink = Logger.getLogger(loggerName);
this.callback = callback;
thread.start();
}

Expand All @@ -59,6 +65,10 @@ private void redirect() {
}
} catch (IOException e) {
sink.log(Level.FINE, "Error reading child process output.", e);
} finally {
if (callback != null) {
callback.childProcessExited();
}
}
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.nio.file.attribute.PosixFilePermission.*;

Expand All @@ -39,7 +40,7 @@

import static org.apache.spark.launcher.CommandBuilderUtils.*;

public class OutputRedirectionSuite extends BaseSuite {
public class ChildProcAppHandleSuite extends BaseSuite {

private static final List<String> MESSAGES = new ArrayList<>();

Expand Down Expand Up @@ -99,7 +100,8 @@ public void testRedirectLastWins() throws Exception {
public void testRedirectToLog() throws Exception {
assumeFalse(isWindows());

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.startApplication();
waitFor(handle);

assertTrue(MESSAGES.contains("output"));
Expand All @@ -112,7 +114,7 @@ public void testRedirectErrorToLog() throws Exception {

Path err = Files.createTempFile("stderr", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectError(err.toFile())
.startApplication();
waitFor(handle);
Expand All @@ -127,7 +129,7 @@ public void testRedirectOutputToLog() throws Exception {

Path out = Files.createTempFile("stdout", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectOutput(out.toFile())
.startApplication();
waitFor(handle);
Expand Down Expand Up @@ -173,17 +175,37 @@ public void testRedirectErrorTwiceFails() throws Exception {
.waitFor();
}

private void waitFor(ChildProcAppHandle handle) throws Exception {
@Test
public void testProcMonitorWithOutputRedirection() throws Exception {
File err = Files.createTempFile("out", "txt").toFile();
SparkAppHandle handle = new TestSparkLauncher()
.redirectError()
.redirectOutput(err)
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

@Test
public void testProcMonitorWithLogRedirection() throws Exception {
SparkAppHandle handle = new TestSparkLauncher()
.redirectToLog(getClass().getName())
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

private void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (handle.isRunning()) {
Thread.sleep(10);
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
// Explicit unregister from server since the handle doesn't yet do that when the
// process finishes by itself.
LauncherServer server = LauncherServer.getServerInstance();
if (server != null) {
server.unregister(handle);
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion launcher/src/test/resources/log4j.properties
Expand Up @@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
log4j.appender.childproc.layout.ConversionPattern=%t: %m%n

log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false

Expand Down