From d3d5624f61cf2ad6d949604e08b6090c69c11193 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Aug 2016 14:04:29 -0700 Subject: [PATCH 1/2] Add failing regression test for SPARK-16925 --- .../scala/org/apache/spark/DistributedSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 0515e6e3a6319..6beae842b04d1 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -134,6 +134,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } } + test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") { + // Ensures that if a task which causes the JVM to exit with a zero exit code will cause the + // Spark job to eventually fail. + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } + } + assert(thrown.getClass === classOf[SparkException]) + assert(thrown.getMessage.contains("failed 4 times")) + } + // Check that the cluster is still usable: + sc.parallelize(1 to 10).count() + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache() From c567a7e0f612a31df9197a07206a5dea8f45d1f5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Aug 2016 14:07:20 -0700 Subject: [PATCH 2/2] Fix bug by always calling schedule(). --- .../org/apache/spark/deploy/master/Master.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f8aac3008cefa..fded8475a0916 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -265,19 +265,16 @@ private[deploy] class Master( val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. - if (!normalExit) { - if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { - schedule() - } else { - val execs = appInfo.executors.values - if (!execs.exists(_.state == ExecutorState.RUNNING)) { - logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + - s"${appInfo.retryCount} times; removing it") - removeApplication(appInfo, ApplicationState.FAILED) - } + if (!normalExit && appInfo.incrementRetryCount() >= ApplicationState.MAX_NUM_RETRY) { + val execs = appInfo.executors.values + if (!execs.exists(_.state == ExecutorState.RUNNING)) { + logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + + s"${appInfo.retryCount} times; removing it") + removeApplication(appInfo, ApplicationState.FAILED) } } } + schedule() case None => logWarning(s"Got status update for unknown executor $appId/$execId") }