Skip to content

Commit

Permalink
[SPARK-23020][CORE] Fix races in launcher code, test.
Browse files Browse the repository at this point in the history
The race in the code is because the handle might update
its state to the wrong state if the connection handling
thread is still processing incoming data; so the handle
needs to wait for the connection to finish up before
checking the final state.

The race in the test is because when waiting for a handle
to reach a final state, the waitFor() method needs to wait
until all handle state is updated (which also includes
waiting for the connection thread above to finish).
Otherwise, waitFor() may return too early, which would cause
a bunch of different races (like the listener not being yet
notified of the state change, or being in the middle of
being notified, or the handle not being properly disposed
and causing postChecks() to assert).

On top of that I found, by code inspection, a couple of
potential races that could make a handle end up in the
wrong state when being killed.

The original version of this fix introduced the flipped
version of the first race described above; the connection
closing might override the handle state before the
handle might have a chance to do cleanup. The fix there
is to only dispose of the handle from the connection
when there is an error, and let the handle dispose
itself in the normal case.

The fix also caused a bug in YarnClusterSuite to be surfaced;
the code was checking for a file in the classpath that was
not expected to be there in client mode. Because of the above
issues, the error was not propagating correctly and the (buggy)
test was incorrectly passing.

Tested by running the existing unit tests a lot (and not
seeing the errors I was seeing before).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20297 from vanzin/SPARK-23020.

(cherry picked from commit ec22897)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Marcelo Vanzin authored and cloud-fan committed Jan 22, 2018
1 parent 36af73b commit 57c320a
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 81 deletions.
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -25,13 +26,13 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -121,8 +122,7 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

// TODO: [SPARK-23020] Re-enable this
@Ignore
@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
Expand All @@ -139,7 +139,9 @@ public void testInProcessLauncher() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
TimeUnit.MILLISECONDS.sleep(500);
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
}
}

Expand All @@ -148,26 +150,35 @@ private void inProcessLauncherTestImpl() throws Exception {
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
transitions.add(h.getState());
synchronized (transitions) {
transitions.add(h.getState());
}
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
} finally {
if (handle != null) {
handle.kill();
}
}
}

public static class SparkLauncherTestApp {
Expand Down
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
private List<Listener> listeners;
private State state;
private String appId;
private boolean disposed;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
Expand Down Expand Up @@ -70,16 +70,15 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (!isDisposed()) {
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
dispose();
}
}

Expand All @@ -95,6 +94,21 @@ boolean isDisposed() {
return disposed;
}

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*/
synchronized void dispose() {
if (!isDisposed()) {
// Unregister first to make sure that the connection with the app has been really
// terminated.
server.unregister(this);
if (!getState().isFinal()) {
setState(State.LOST);
}
this.disposed = true;
}
}

void setState(State s) {
setState(s, false);
}
Expand Down
Expand Up @@ -48,14 +48,16 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
if (!isDisposed()) {
setState(State.KILLED);
disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
}
childProc = null;
}
childProc = null;
}
setState(State.KILLED);
}

void setChildProc(Process childProc, String loggerName, InputStream logStream) {
Expand Down Expand Up @@ -94,8 +96,6 @@ void monitorChild() {
return;
}

disconnect();

int ec;
try {
ec = proc.exitValue();
Expand All @@ -118,6 +118,8 @@ void monitorChild() {
if (newState != null) {
setState(newState, true);
}

disconnect();
}
}

Expand Down
Expand Up @@ -39,15 +39,16 @@ class InProcessAppHandle extends AbstractAppHandle {

@Override
public synchronized void kill() {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
if (!isDisposed()) {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
setState(State.KILLED);
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
}
}

setState(State.KILLED);
}

synchronized void start(String appName, Method main, String[] args) {
Expand Down
Expand Up @@ -53,7 +53,7 @@ abstract class LauncherConnection implements Closeable, Runnable {
public void run() {
try {
FilteredObjectInputStream in = new FilteredObjectInputStream(socket.getInputStream());
while (!closed) {
while (isOpen()) {
Message msg = (Message) in.readObject();
handle(msg);
}
Expand Down Expand Up @@ -95,15 +95,15 @@ protected synchronized void send(Message msg) throws IOException {
}

@Override
public void close() throws IOException {
if (!closed) {
synchronized (this) {
if (!closed) {
closed = true;
socket.close();
}
}
public synchronized void close() throws IOException {
if (isOpen()) {
closed = true;
socket.close();
}
}

boolean isOpen() {
return !closed;
}

}
Expand Up @@ -217,6 +217,33 @@ void unregister(AbstractAppHandle handle) {
break;
}
}

// If there is a live connection for this handle, we need to wait for it to finish before
// returning, otherwise there might be a race between the connection thread processing
// buffered data and the handle cleaning up after itself, leading to potentially the wrong
// state being reported for the handle.
ServerConnection conn = null;
synchronized (clients) {
for (ServerConnection c : clients) {
if (c.handle == handle) {
conn = c;
break;
}
}
}

if (conn != null) {
synchronized (conn) {
if (conn.isOpen()) {
try {
conn.wait();
} catch (InterruptedException ie) {
// Ignore.
}
}
}
}

unref();
}

Expand Down Expand Up @@ -288,7 +315,7 @@ private String createSecret() {
private class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private AbstractAppHandle handle;
volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
Expand All @@ -313,7 +340,7 @@ protected void handle(Message msg) throws IOException {
} else {
if (handle == null) {
throw new IllegalArgumentException("Expected hello, got: " +
msg != null ? msg.getClass().getName() : null);
msg != null ? msg.getClass().getName() : null);
}
if (msg instanceof SetAppId) {
SetAppId set = (SetAppId) msg;
Expand All @@ -331,23 +358,27 @@ protected void handle(Message msg) throws IOException {
timeout.cancel();
}
close();
if (handle != null) {
handle.dispose();
}
} finally {
timeoutTimer.purge();
}
}

@Override
public void close() throws IOException {
if (!isOpen()) {
return;
}

synchronized (clients) {
clients.remove(this);
}
super.close();
if (handle != null) {
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();

synchronized (this) {
super.close();
notifyAll();
}
}

Expand Down
42 changes: 35 additions & 7 deletions launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.junit.After;
Expand Down Expand Up @@ -47,19 +48,46 @@ public void postChecks() {
assertNull(server);
}

protected void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
protected void waitFor(final SparkAppHandle handle) throws Exception {
try {
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is not in final state.", handle.getState().isFinal());
});
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}

// Wait until the handle has been marked as disposed, to make sure all cleanup tasks
// have been performed.
AbstractAppHandle ahandle = (AbstractAppHandle) handle;
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
});
}

/**
* Call a closure that performs a check every "period" until it succeeds, or the timeout
* elapses.
*/
protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
long deadline = System.nanoTime() + timeout.toNanos();
int count = 0;
while (true) {
try {
count++;
check.run();
return;
} catch (Throwable t) {
if (System.nanoTime() >= deadline) {
String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
throw new IllegalStateException(msg, t);
}
Thread.sleep(period.toMillis());
}
}
}

}

0 comments on commit 57c320a

Please sign in to comment.