Skip to content

Commit

Permalink
[SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
Browse files Browse the repository at this point in the history
Adds a new method to SparkAppHandle called getError which returns
the exception (if present) that caused the underlying Spark app to
fail.

New tests added to SparkLauncherSuite for the new method.

Closes #21849

Closes #23221 from vanzin/SPARK-24243.

Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Sahil Takiar authored and Marcelo Vanzin committed Dec 7, 2018
1 parent 1ab3d3e commit 543577a
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 12 deletions.
102 changes: 91 additions & 11 deletions core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
Expand Up @@ -41,6 +41,8 @@
public class SparkLauncherSuite extends BaseSuite {

private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
private static final String EXCEPTION_MESSAGE = "dummy-exception";
private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE);

private final SparkLauncher launcher = new SparkLauncher();

Expand Down Expand Up @@ -130,17 +132,8 @@ public void testInProcessLauncher() throws Exception {
try {
inProcessLauncherTestImpl();
} finally {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
restoreSystemProperties(properties);
waitForSparkContextShutdown();
}
}

Expand Down Expand Up @@ -227,6 +220,82 @@ public void testInProcessLauncherDoesNotKillJvm() throws Exception {
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

@Test
public void testInProcessLauncherGetError() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
// system properties after this test runs.
Map<Object, Object> properties = new HashMap<>(System.getProperties());

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(ErrorInProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication();

final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
});

assertNotNull(handle.getError());
assertTrue(handle.getError().isPresent());
assertSame(handle.getError().get(), DUMMY_EXCEPTION);
} finally {
if (handle != null) {
handle.kill();
}
restoreSystemProperties(properties);
waitForSparkContextShutdown();
}
}

@Test
public void testSparkLauncherGetError() throws Exception {
SparkAppHandle handle = null;
try {
handle = new SparkLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(ErrorInProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication();

final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
});

assertNotNull(handle.getError());
assertTrue(handle.getError().isPresent());
assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE));
} finally {
if (handle != null) {
handle.kill();
}
}
}

private void restoreSystemProperties(Map<Object, Object> properties) {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
}

private void waitForSparkContextShutdown() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
}

public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -264,4 +333,15 @@ public static void main(String[] args) throws Exception {

}

/**
* Similar to {@link InProcessTestApp} except it throws an exception
*/
public static class ErrorInProcessTestApp {

public static void main(String[] args) {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
throw DUMMY_EXCEPTION;
}
}
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.launcher;

import java.io.InputStream;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());

private volatile Process childProc;
private OutputRedirector redirector;
private volatile OutputRedirector redirector;

ChildProcAppHandle(LauncherServer server) {
super(server);
Expand All @@ -46,6 +47,23 @@ public synchronized void disconnect() {
}
}

/**
* Parses the logs of {@code spark-submit} and returns the last exception thrown.
* <p>
* Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, it's difficult to
* accurately retrieve the full {@link Throwable} from the {@code spark-submit} process.
* This method parses the logs of the sub-process and provides a best-effort attempt at
* returning the last exception thrown by the {@code spark-submit} process. Only the exception
* message is parsed, the associated stacktrace is meaningless.
*
* @return an {@link Optional} containing a {@link RuntimeException} with the parsed
* exception, otherwise returns a {@link Optional#EMPTY}
*/
@Override
public Optional<Throwable> getError() {
return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty();
}

@Override
public synchronized void kill() {
if (!isDisposed()) {
Expand Down
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.launcher;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle {
// Avoid really long thread names.
private static final int MAX_APP_NAME_LEN = 16;

private volatile Throwable error;

private Thread app;

InProcessAppHandle(LauncherServer server) {
Expand All @@ -51,6 +55,11 @@ public synchronized void kill() {
}
}

@Override
public Optional<Throwable> getError() {
return Optional.ofNullable(error);
}

synchronized void start(String appName, Method main, String[] args) {
CommandBuilderUtils.checkState(app == null, "Handle already started.");

Expand All @@ -62,7 +71,11 @@ synchronized void start(String appName, Method main, String[] args) {
try {
main.invoke(null, (Object) args);
} catch (Throwable t) {
if (t instanceof InvocationTargetException) {
t = t.getCause();
}
LOG.log(Level.WARNING, "Application failed with exception.", t);
error = t;
setState(State.FAILED);
}

Expand Down
Expand Up @@ -37,6 +37,7 @@ class OutputRedirector {
private final ChildProcAppHandle callback;

private volatile boolean active;
private volatile Throwable error;

OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
Expand All @@ -61,6 +62,10 @@ private void redirect() {
while ((line = reader.readLine()) != null) {
if (active) {
sink.info(line.replaceFirst("\\s*$", ""));
if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) &&
!line.contains("at ")) {
error = new RuntimeException(line);
}
}
}
} catch (IOException e) {
Expand All @@ -85,4 +90,24 @@ boolean isAlive() {
return thread.isAlive();
}

Throwable getError() {
return error;
}

/**
* Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)}
*/
private static boolean containsIgnoreCase(String str, String searchStr) {
if (str == null || searchStr == null) {
return false;
}
int len = searchStr.length();
int max = str.length() - len;
for (int i = 0; i <= max; i++) {
if (str.regionMatches(true, i, searchStr, 0, len)) {
return true;
}
}
return false;
}
}
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.launcher;

import java.util.Optional;

/**
* A handle to a running Spark application.
* <p>
Expand Down Expand Up @@ -100,6 +102,12 @@ public boolean isFinal() {
*/
void disconnect();

/**
* If the application failed due to an error, return the underlying error. If the app
* succeeded, this method returns an empty {@link Optional}.
*/
Optional<Throwable> getError();

/**
* Listener for updates to a handle's state. The callbacks do not receive information about
* what exactly has changed, just that an update has occurred.
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Expand Up @@ -36,6 +36,9 @@ object MimaExcludes {

// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
// [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError"),

// [SPARK-25867] Remove KMeans computeCost
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansModel.computeCost"),

Expand Down

0 comments on commit 543577a

Please sign in to comment.