Skip to content

Commit

Permalink
[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of…
Browse files Browse the repository at this point in the history
… SparkLauncher

## What changes were proposed in this pull request?
This situation can happen when the LauncherConnection gets an exception while reading through the socket and terminating silently without notifying making the client/listener think that the job is still in previous state.
The fix force sends a notification to client that the job finished with unknown status and let client handle it accordingly.

## How was this patch tested?
Added a unit test.

Author: Subroto Sanyal <ssanyal@datameer.com>

Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.
  • Loading branch information
Subroto Sanyal authored and Marcelo Vanzin committed Jun 6, 2016
1 parent 36d3dfa commit c409e23
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ public void close() throws IOException {
}
super.close();
if (handle != null) {
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ enum State {
/** The application finished with a failed status. */
FAILED(true),
/** The application was killed. */
KILLED(true);
KILLED(true),
/** The Spark Submit JVM exited with a unknown status. */
LOST(true);

private final boolean isFinal;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,37 @@ public void testTimeout() throws Exception {
}
}

@Test
public void testSparkSubmitVmShutsDown() throws Exception {
ChildProcAppHandle handle = LauncherServer.newAppHandle();
TestClient client = null;
final Semaphore semaphore = new Semaphore(0);
try {
Socket s = new Socket(InetAddress.getLoopbackAddress(),
LauncherServer.getServerInstance().getPort());
handle.addListener(new SparkAppHandle.Listener() {
public void stateChanged(SparkAppHandle handle) {
semaphore.release();
}
public void infoChanged(SparkAppHandle handle) {
semaphore.release();
}
});
client = new TestClient(s);
client.send(new Hello(handle.getSecret(), "1.4.0"));
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
// Make sure the server matched the client to the handle.
assertNotNull(handle.getConnection());
close(client);
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
assertEquals(SparkAppHandle.State.LOST, handle.getState());
} finally {
kill(handle);
close(client);
client.clientThread.join();
}
}

private void kill(SparkAppHandle handle) {
if (handle != null) {
handle.kill();
Expand Down

0 comments on commit c409e23

Please sign in to comment.