From 0ee2f1fe487e4f7defb7a4bc53ab3d69d16c9173 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 6 Jun 2016 08:26:34 -0500 Subject: [PATCH 1/9] increase test timeouts --- .../apache/spark/scheduler/BlacklistIntegrationSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 3a4b7af71b1f3..0ce228fc5e907 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark._ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{ val badHost = "host-0" + val duration = Duration(10, SECONDS) /** * This backend just always fails if the task is executed on a bad host, but otherwise succeeds @@ -45,7 +46,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - val duration = Duration(1, SECONDS) Await.ready(jobFuture, duration) } assertDataStructuresEmpty(noFailure = false) @@ -64,7 +64,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - val duration = Duration(3, SECONDS) Await.ready(jobFuture, duration) } assertDataStructuresEmpty(noFailure = false) @@ -86,7 +85,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - val duration = Duration(1, SECONDS) Await.ready(jobFuture, duration) } assert(results === (0 until 10).map { _ -> 42 }.toMap) From 270a038a20d8f1e2604636f00498fc4dcacc178a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 6 Jun 2016 09:40:54 -0500 Subject: [PATCH 2/9] for delay scheduling to work, the mock backend has to periodically revive all offers --- .../scheduler/SchedulerIntegrationSuite.scala | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 92bd76548e82f..97cc30b45e560 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -31,7 +32,7 @@ import org.apache.spark._ import org.apache.spark.TaskState._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.util.{CallSite, Utils} +import org.apache.spark.util.{CallSite, ThreadUtils, Utils} /** * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, @@ -239,6 +240,17 @@ private[spark] abstract class MockBackend( conf: SparkConf, val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging { + // Periodically revive offers to allow delay scheduling to work + private val reviveThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") + private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms") + + reviveThread.scheduleAtFixedRate(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + reviveOffers() + } + }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) + /** * Test backends should call this to get a task that has been assigned to them by the scheduler. * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]]. @@ -304,7 +316,9 @@ private[spark] abstract class MockBackend( override def start(): Unit = {} - override def stop(): Unit = {} + override def stop(): Unit = { + reviveThread.shutdown() + } val env = SparkEnv.get @@ -328,8 +342,9 @@ private[spark] abstract class MockBackend( } /** - * This is called by the scheduler whenever it has tasks it would like to schedule. It gets - * called in the scheduling thread, not the backend thread. + * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks + * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay + * scheduling. */ override def reviveOffers(): Unit = { val offers: Seq[WorkerOffer] = generateOffers() From 4dc8711993c69fd852da92597473b6852eaa2e21 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 6 Jun 2016 09:51:37 -0500 Subject: [PATCH 3/9] cleanup state before notifying job waiter; stop things to clean up a bunch of threads --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../spark/scheduler/BlacklistIntegrationSuite.scala | 5 ++--- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 ++++++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a2eadbcbd660a..eb1eb8c728772 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1471,8 +1471,10 @@ class DAGScheduler( } if (ableToCancelStages) { - job.listener.jobFailed(error) + // SPARK-15783 important to cleanup state first, just for tests where we have some asserts + // against the state. Otherwise we have a *little* bit of flakiness in the tests. cleanupStateForJobAndIndependentStages(job) + job.listener.jobFailed(error) listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error))) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 0ce228fc5e907..12edaac159f5c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -55,7 +55,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM // of executors on the bad node, then locality preferences will lead to us cycling through // the executors on the bad node, and still failing the job testScheduler( - "With blacklist on, job will still fail if there are too many bad executors on bad host", + "With blacklist on, job will still fail if there are too many bad execs on bad host", extraConfs = Seq( // just set this to something much longer than the test duration ("spark.scheduler.executorTaskBlacklistTime", "10000000") @@ -71,8 +71,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually // schedule on a good node and succeed the job - testScheduler( - "Bad node with multiple executors, job will still succeed with the right confs", + testScheduler("Bad node with multiple executors, job will still succeed with the right confs", extraConfs = Seq( // just set this to something much longer than the test duration ("spark.scheduler.executorTaskBlacklistTime", "10000000"), diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b6765f064524f..27977ba9adee7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou results.clear() securityMgr = new SecurityManager(conf) broadcastManager = new BroadcastManager(true, conf, securityMgr) - mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) + mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { + override def sendTracker(message: Any): Unit = { + // no-op, just so we can stop this to avoid leaking threads + } + } scheduler = new DAGScheduler( sc, taskScheduler, @@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def afterEach(): Unit = { try { scheduler.stop() + dagEventProcessLoopTester.stop() + mapOutputTracker.stop() + broadcastManager.stop() } finally { super.afterEach() } From 7f4e9eb41e3276e4e91f8f262b4e3e25a28e8e7c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jun 2016 15:14:13 -0700 Subject: [PATCH 4/9] repeat tests a lot to check for flakiness --- .../spark/scheduler/BlacklistIntegrationSuite.scala | 9 +++++++++ .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 12edaac159f5c..4647c18901af4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -40,6 +40,15 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM } } + override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { + var lastThreads = Set[Long]() + (0 until 5000).foreach { idx => + super.test(s"$name: $idx", testTags: _*) { + body + } + } + } + // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling // according to locality preferences, and so the job fails testScheduler("If preferred node is bad, without blacklist job will fail") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 27977ba9adee7..4ef499877292a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -326,6 +326,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(JobCancelled(jobId)) } + override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { + var lastThreads = Set[Long]() + (0 until 1000).foreach { idx => + super.test(s"$name: $idx", testTags: _*) { + body + } + } + } + test("[SPARK-3353] parent stage should have lower stage id") { sparkListener.stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() From 5bc48f23324a754e695535e036cf3759c0dfb040 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jun 2016 15:23:39 -0700 Subject: [PATCH 5/9] Revert "[SPARK-15783][CORE] still some flakiness in these blacklist tests so ignore for now" This reverts commit 36d3dfa59a1ec0af6118e0667b80e9b7628e2cb6. --- .../apache/spark/scheduler/BlacklistIntegrationSuite.scala | 6 +++--- .../apache/spark/scheduler/SchedulerIntegrationSuite.scala | 5 ----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index ae1d99eec216c..1b80f6211cf1f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -51,7 +51,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling // according to locality preferences, and so the job fails - ignore("If preferred node is bad, without blacklist job will fail") { + testScheduler("If preferred node is bad, without blacklist job will fail") { val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) @@ -63,7 +63,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM // even with the blacklist turned on, if maxTaskFailures is not more than the number // of executors on the bad node, then locality preferences will lead to us cycling through // the executors on the bad node, and still failing the job - ignoreScheduler( + testScheduler( "With blacklist on, job will still fail if there are too many bad executors on bad host", extraConfs = Seq( // just set this to something much longer than the test duration @@ -80,7 +80,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually // schedule on a good node and succeed the job - ignoreScheduler( + testScheduler( "Bad node with multiple executors, job will still succeed with the right confs", extraConfs = Seq( // just set this to something much longer than the test duration diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index cb331c4269af9..97cc30b45e560 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -90,11 +90,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } } - // still a few races to work out in the blacklist tests, so ignore some tests - def ignoreScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = { - ignore(name)(testBody) - } - /** * A map from partition -> results for all tasks of a job when you call this test framework's * [[submit]] method. Two important considerations: From 41b7b79b366aa3ebbd5e7796e0d3f703250e51cf Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jun 2016 22:25:56 -0700 Subject: [PATCH 6/9] tone it down a bit --- .../org/apache/spark/scheduler/BlacklistIntegrationSuite.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 1b80f6211cf1f..1100156bb5cc3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -42,7 +42,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { var lastThreads = Set[Long]() - (0 until 5000).foreach { idx => + (0 until 500).foreach { idx => super.test(s"$name: $idx", testTags: _*) { body } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 04b84f27727eb..050808171fdce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -328,7 +328,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { var lastThreads = Set[Long]() - (0 until 1000).foreach { idx => + (0 until 50).foreach { idx => super.test(s"$name: $idx", testTags: _*) { body } From 174d0704eb1bf01df6834ca5f437518a2131d45a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 8 Jun 2016 11:48:19 -0700 Subject: [PATCH 7/9] Go back to running tests once --- .../spark/scheduler/BlacklistIntegrationSuite.scala | 9 --------- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 --------- 2 files changed, 18 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 1100156bb5cc3..0ce228fc5e907 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -40,15 +40,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM } } - override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { - var lastThreads = Set[Long]() - (0 until 500).foreach { idx => - super.test(s"$name: $idx", testTags: _*) { - body - } - } - } - // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling // according to locality preferences, and so the job fails testScheduler("If preferred node is bad, without blacklist job will fail") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 050808171fdce..f991c8b9d8044 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -326,15 +326,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(JobCancelled(jobId)) } - override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = { - var lastThreads = Set[Long]() - (0 until 50).foreach { idx => - super.test(s"$name: $idx", testTags: _*) { - body - } - } - } - test("[SPARK-3353] parent stage should have lower stage id") { sparkListener.stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() From 91ea3df33aaebb20b2df0cbbe56dbc99975200ea Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Jun 2016 22:03:15 -0500 Subject: [PATCH 8/9] dont check if map output is available in tasks, b/c of some small races --- .../scheduler/SchedulerIntegrationSuite.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 97cc30b45e560..fc40e68e80ff3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -527,10 +527,11 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor // make sure the required map output is available task.stageId match { - case 1 => assertMapOutputAvailable(b) - case 3 => assertMapOutputAvailable(c) case 4 => assertMapOutputAvailable(d) - case _ => // no shuffle map input, nothing to check + case _ => + // we can't check for the output for the two intermediate stages, unfortunately, + // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell + // us what to check } (task.stageId, task.stageAttemptId, task.partitionId) match { @@ -566,11 +567,9 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor val (taskDescription, task) = backend.beginTask() stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId - // make sure the required map output is available - task.stageId match { - case 1 => assertMapOutputAvailable(shuffledRdd) - case _ => // no shuffle map input, nothing to check - } + // We cannot check if shuffle output is available, because the failed fetch will clear the + // shuffle output. Then we'd have a race, between the already-started task from the first + // attempt, and when the failure clears out the map output status. (task.stageId, task.stageAttemptId, task.partitionId) match { case (0, _, _) => From e35a7d3d0696fd0c864ee8fa9c9ded95abffba2c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Jun 2016 22:35:04 -0500 Subject: [PATCH 9/9] better error msgs when there is an exception in a backend --- .../scheduler/BlacklistIntegrationSuite.scala | 6 +-- .../scheduler/SchedulerIntegrationSuite.scala | 47 ++++++++++++++++--- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 0ce228fc5e907..8ba2697dd9d6b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -46,7 +46,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assertDataStructuresEmpty(noFailure = false) } @@ -64,7 +64,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assertDataStructuresEmpty(noFailure = false) } @@ -85,7 +85,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) withBackend(badHostBackend _) { val jobFuture = submit(rdd, (0 until 10).toArray) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assert(results === (0 until 10).map { _ -> 42 }.toMap) assertDataStructuresEmpty(noFailure = true) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index fc40e68e80ff3..a08f0875b8a92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler import java.util.Properties -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.{Await, Future} @@ -55,6 +55,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } results.clear() failure = null + backendException.set(null) super.beforeEach() } @@ -162,6 +163,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa assert(failure != null) } assert(scheduler.activeJobs.isEmpty) + assert(backendException.get() == null) } /** @@ -199,6 +201,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa new MockRDD(sc, nParts, shuffleDeps) } + val backendException = new AtomicReference[Exception](null) + /** * Helper which makes it a little easier to setup a test, which starts a mock backend in another * thread, responding to tasks with your custom function. You also supply the "body" of your @@ -213,7 +217,17 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa override def run(): Unit = { while (backendContinue.get()) { if (backend.hasTasksWaitingToRun) { - backendFunc() + try { + backendFunc() + } catch { + case ex: Exception => + // Try to do a little error handling around exceptions that might occur here -- + // otherwise it can just look like a TimeoutException in the test itself. + logError("Exception in mock backend:", ex) + backendException.set(ex) + backendContinue.set(false) + throw ex + } } else { Thread.sleep(10) } @@ -229,6 +243,25 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } } + /** + * Helper to do a little extra error checking while waiting for the job to terminate. Primarily + * just does a little extra error handling if there is an exception from the backend. + */ + def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = { + try { + Await.ready(jobFuture, duration) + } catch { + case te: TimeoutException if backendException.get() != null => + val msg = raw""" + | ----- Begin Backend Failure Msg ----- + | ${Utils.exceptionString(backendException.get())} + | ----- End Backend Failure Msg ---- + """. + stripMargin + + fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg") + } + } } /** @@ -493,7 +526,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor withBackend(runBackend _) { val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) val duration = Duration(1, SECONDS) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assert(results === (0 until 10).map { _ -> 42 }.toMap) assertDataStructuresEmpty() @@ -545,7 +578,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor withBackend(runBackend _) { val jobFuture = submit(d, (0 until 30).toArray) val duration = Duration(1, SECONDS) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap) assertDataStructuresEmpty() @@ -585,7 +618,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor withBackend(runBackend _) { val jobFuture = submit(shuffledRdd, (0 until 10).toArray) val duration = Duration(1, SECONDS) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) } assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap) assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1))) @@ -600,7 +633,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor withBackend(runBackend _) { val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) val duration = Duration(1, SECONDS) - Await.ready(jobFuture, duration) + awaitJobTermination(jobFuture, duration) failure.getMessage.contains("test task failure") } assertDataStructuresEmpty(noFailure = false)