Skip to content

Commit

Permalink
[SPARK-15783][CORE] Fix Flakiness in BlacklistIntegrationSuite
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Three changes here -- first two were causing failures w/ BlacklistIntegrationSuite

1. The testing framework didn't include the reviveOffers thread, so the test which involved delay scheduling might never submit offers late enough for the delay scheduling to kick in.  So added in the periodic revive offers, just like the real scheduler.

2. `assertEmptyDataStructures` would occasionally fail, because it appeared there was still an active job.  This is because in DAGScheduler, the jobWaiter is notified of the job completion before the data structures are cleaned up.  Most of the time the test code that is waiting on the jobWaiter won't become active until after the data structures are cleared, but occasionally the race goes the other way, and the assertions fail.

3. `DAGSchedulerSuite` was not stopping all the inner parts it was setting up, so each test was leaking a number of threads.  So we stop those parts too.

4. Turns out that `assertMapOutputAvailable` is not terribly useful in this framework -- most of the places I was trying to use it suffer from some race.

5. When there is an exception in the backend, try to improve the error msg a little bit.  Before the exception was printed to the console, but the test would fail w/ a timeout, and the logs wouldn't show anything.

## How was this patch tested?

I ran all the tests in `BlacklistIntegrationSuite` 5k times and everything in `DAGSchedulerSuite` 1k times on my laptop.  Also I ran a full jenkins build with `BlacklistIntegrationSuite` 500 times and `DAGSchedulerSuite` 50 times, see #13548.  (I tried more times but jenkins timed out.)

To check for more leaked threads, I added some code to dump the list of all threads at the end of each test in DAGSchedulerSuite, which is how I discovered the mapOutputTracker and eventLoop were leaking threads.  (I removed that code from the final pr, just part of the testing.)

And I'll run Jenkins on this a couple of times to do one more check.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13565 from squito/blacklist_extra_tests.
  • Loading branch information
squito committed Jun 22, 2016
1 parent 01277d4 commit cf1995a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1465,8 +1465,10 @@ class DAGScheduler(
}

if (ableToCancelStages) {
job.listener.jobFailed(error)
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"
val duration = Duration(10, SECONDS)

/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
Expand All @@ -41,20 +42,19 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM

// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
ignore("If preferred node is bad, without blacklist job will fail") {
testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}

// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
ignoreScheduler(
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -64,15 +64,14 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}

// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
ignoreScheduler(
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -86,8 +85,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty(noFailure = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}
scheduler = new DAGScheduler(
sc,
taskScheduler,
Expand All @@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def afterEach(): Unit = {
try {
scheduler.stop()
dagEventProcessLoopTester.stop()
mapOutputTracker.stop()
broadcastManager.stop()
} finally {
super.afterEach()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
Expand All @@ -32,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{CallSite, Utils}
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}

/**
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
Expand All @@ -55,6 +56,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
results.clear()
failure = null
backendException.set(null)
super.beforeEach()
}

Expand Down Expand Up @@ -90,11 +92,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}

// still a few races to work out in the blacklist tests, so ignore some tests
def ignoreScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
ignore(name)(testBody)
}

/**
* A map from partition -> results for all tasks of a job when you call this test framework's
* [[submit]] method. Two important considerations:
Expand Down Expand Up @@ -167,6 +164,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
assert(failure != null)
}
assert(scheduler.activeJobs.isEmpty)
assert(backendException.get() == null)
}

/**
Expand Down Expand Up @@ -204,6 +202,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
new MockRDD(sc, nParts, shuffleDeps)
}

val backendException = new AtomicReference[Exception](null)

/**
* Helper which makes it a little easier to setup a test, which starts a mock backend in another
* thread, responding to tasks with your custom function. You also supply the "body" of your
Expand All @@ -218,7 +218,17 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
override def run(): Unit = {
while (backendContinue.get()) {
if (backend.hasTasksWaitingToRun) {
backendFunc()
try {
backendFunc()
} catch {
case ex: Exception =>
// Try to do a little error handling around exceptions that might occur here --
// otherwise it can just look like a TimeoutException in the test itself.
logError("Exception in mock backend:", ex)
backendException.set(ex)
backendContinue.set(false)
throw ex
}
} else {
Thread.sleep(10)
}
Expand All @@ -234,6 +244,25 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}

/**
* Helper to do a little extra error checking while waiting for the job to terminate. Primarily
* just does a little extra error handling if there is an exception from the backend.
*/
def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
try {
Await.ready(jobFuture, duration)
} catch {
case te: TimeoutException if backendException.get() != null =>
val msg = raw"""
| ----- Begin Backend Failure Msg -----
| ${Utils.exceptionString(backendException.get())}
| ----- End Backend Failure Msg ----
""".
stripMargin

fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg")
}
}
}

/**
Expand All @@ -245,6 +274,17 @@ private[spark] abstract class MockBackend(
conf: SparkConf,
val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {

// Periodically revive offers to allow delay scheduling to work
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
reviveOffers()
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
Expand Down Expand Up @@ -310,7 +350,9 @@ private[spark] abstract class MockBackend(

override def start(): Unit = {}

override def stop(): Unit = {}
override def stop(): Unit = {
reviveThread.shutdown()
}

val env = SparkEnv.get

Expand All @@ -334,8 +376,9 @@ private[spark] abstract class MockBackend(
}

/**
* This is called by the scheduler whenever it has tasks it would like to schedule. It gets
* called in the scheduling thread, not the backend thread.
* This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
* completes (which will be in a result-getter thread), and by the reviveOffers thread for delay
* scheduling.
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
Expand Down Expand Up @@ -484,7 +527,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -536,7 +579,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(d, (0 until 30).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -576,7 +619,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
Expand All @@ -591,7 +634,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
failure.getMessage.contains("test task failure")
}
assertDataStructuresEmpty(noFailure = false)
Expand Down

0 comments on commit cf1995a

Please sign in to comment.