From 33b37391c9ea33cc587f48ae7a2e123941b18b7f Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 1 Jun 2016 16:32:14 -0500 Subject: [PATCH 1/4] do not check if results is empty on a failure -- instead just make sure we got a failure exception --- .../apache/spark/scheduler/BlacklistIntegrationSuite.scala | 2 -- .../apache/spark/scheduler/SchedulerIntegrationSuite.scala | 6 +++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 6c9d4fb6f3bcc..bd8c3f5e3cea2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -48,7 +48,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val duration = Duration(1, SECONDS) Await.ready(jobFuture, duration) } - assert(results.isEmpty) assertDataStructuresEmpty(noFailure = false) } @@ -68,7 +67,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM val duration = Duration(3, SECONDS) Await.ready(jobFuture, duration) } - assert(results.isEmpty) assertDataStructuresEmpty(noFailure = false) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 02aa5caa731ff..a634c01838acc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -133,6 +133,11 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa // when the job succeeds assert(taskScheduler.runningTaskSets.isEmpty) assert(!backend.hasTasks) + } else { + // Note that we CANNOT check for empty results on a failure -- the resultHandler will + // record results from successful tasks, even if the job fails overall. We just check + // that we got a failure. + assert(failure != null) } assert(scheduler.activeJobs.isEmpty) } @@ -558,7 +563,6 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor Await.ready(jobFuture, duration) failure.getMessage.contains("test task failure") } - assert(results.isEmpty) assertDataStructuresEmpty(noFailure = false) } } From ccb6c2090567e863bf572a5cfbe55a8d3a16d225 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 1 Jun 2016 16:51:12 -0500 Subject: [PATCH 2/4] taskScheduler.taskIdToTaskSetManager must be protected by taskScheduler; cleanup runningTasks --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../scheduler/BlacklistIntegrationSuite.scala | 8 ++-- .../scheduler/SchedulerIntegrationSuite.scala | 41 +++++++++++-------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5cb1af9db0cbd..c3adc286851e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -83,6 +83,7 @@ private[spark] class TaskSchedulerImpl( // on this class. private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] + // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index bd8c3f5e3cea2..681700fe77951 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -30,12 +30,12 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM * all tasks. */ def badHostBackend(): Unit = { - val task = backend.beginTask() - val host = backend.executorIdToExecutor(task.executorId).host + val (taskDescription, task) = backend.beginTask() + val host = backend.executorIdToExecutor(taskDescription.executorId).host if (host == badHost) { - backend.taskFailed(task, new RuntimeException("I'm a bad host!")) + backend.taskFailed(taskDescription, new RuntimeException("I'm a bad host!")) } else { - backend.taskSuccess(task, 42) + backend.taskSuccess(taskDescription, 42) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index a634c01838acc..2ce881386b59d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -222,10 +222,10 @@ private[spark] abstract class MockBackend( * Test backends should call this to get a task that has been assigned to them by the scheduler. * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]]. */ - def beginTask(): TaskDescription = { + def beginTask(): (TaskDescription, Task[_]) = { synchronized { val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1) - runningTasks += toRun + runningTasks += toRun._1.taskId toRun } } @@ -260,7 +260,7 @@ private[spark] abstract class MockBackend( taskScheduler.statusUpdate(task.taskId, state, resultBytes) if (TaskState.isFinished(state)) { synchronized { - runningTasks -= task + runningTasks -= task.taskId executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK freeCores += taskScheduler.CPUS_PER_TASK } @@ -269,9 +269,9 @@ private[spark] abstract class MockBackend( } // protected by this - private val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000) + private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, Task[_])](10000) // protected by this - private val runningTasks = ArrayBuffer[TaskDescription]() + private val runningTasks = HashSet[Long]() def hasTasks: Boolean = synchronized { assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty @@ -312,10 +312,19 @@ private[spark] abstract class MockBackend( */ override def reviveOffers(): Unit = { val offers: Seq[WorkerOffer] = generateOffers() - val newTasks = taskScheduler.resourceOffers(offers).flatten + val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten + // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual + // tests for introducing a race if they need it + val newTasks = taskScheduler.synchronized { + newTaskDescriptions.map { taskDescription => + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + (taskDescription, task) + } + } synchronized { - newTasks.foreach { task => - executorIdToExecutor(task.executorId).freeCores -= taskScheduler.CPUS_PER_TASK + newTasks.foreach { case (taskDescription, task) => + executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK assignedTasksWaitingToRun ++= newTasks @@ -442,8 +451,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor */ testScheduler("super simple job") { def runBackend(): Unit = { - val task = backend.beginTask() - backend.taskSuccess(task, 42) + val (taskDescripition, task) = backend.beginTask() + backend.taskSuccess(taskDescripition, 42) } withBackend(runBackend _) { val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) @@ -478,9 +487,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor val d = join(30, b, c) def runBackend(): Unit = { - val taskDescription = backend.beginTask() - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet - val task = taskSet.tasks(taskDescription.index) + val (taskDescription, task) = backend.beginTask() // make sure the required map output is available task.stageId match { @@ -520,9 +527,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor val stageToAttempts = new HashMap[Int, HashSet[Int]]() def runBackend(): Unit = { - val taskDescription = backend.beginTask() - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet - val task = taskSet.tasks(taskDescription.index) + val (taskDescription, task) = backend.beginTask() stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId // make sure the required map output is available @@ -554,8 +559,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor testScheduler("job failure after 4 attempts") { def runBackend(): Unit = { - val task = backend.beginTask() - backend.taskFailed(task, new RuntimeException("test task failure")) + val (taskDescription, task) = backend.beginTask() + backend.taskFailed(taskDescription, new RuntimeException("test task failure")) } withBackend(runBackend _) { val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) From 6ef06c4ad80388a3a11e06197b1917e0fec42c62 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 3 Jun 2016 09:32:36 -0500 Subject: [PATCH 3/4] review feedback --- .../scheduler/BlacklistIntegrationSuite.scala | 2 +- .../scheduler/SchedulerIntegrationSuite.scala | 35 +++++++++++++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 681700fe77951..3a4b7af71b1f3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -30,7 +30,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM * all tasks. */ def badHostBackend(): Unit = { - val (taskDescription, task) = backend.beginTask() + val (taskDescription, _) = backend.beginTask() val host = backend.executorIdToExecutor(taskDescription.executorId).host if (host == badHost) { backend.taskFailed(taskDescription, new RuntimeException("I'm a bad host!")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 2ce881386b59d..7fb9d4821def2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -89,7 +89,26 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } } + /** + * A map from partition -> results for all tasks of a job when you call this test framework's + * [[submit]] method. Two important considerations: + * + * 1. If there is a job failure, results may or may not be empty. If any tasks succeed before + * the job has failed, they will get included in `results`. Instead, check for job failure by + * checking [[failure]]. (Also see [[assertDataStructuresEmpty()]]) + * + * 2. The only gets cleared between tests. So it won't make much sense if you submit more + * than one job inside one test (or at least, you need to be sure to do your own handling for it) + */ val results = new HashMap[Int, Any]() + + /** + * If a call to [[submit]] results in a job failure, this will hold the exception, else it will + * be null. + * + * As with [[results]], this only gets cleared between tests, so care must be taken if you are + * submitting more than one job in one test. + */ var failure: Throwable = _ /** @@ -113,6 +132,11 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } } + /** + * Helper to run a few common asserts after a job has completed, in particular some internal + * datastructures for bookkeeping. This only does a very minimal check for whether the job + * failed or succeeded -- often you will want extra asserts on [[results]] or [[failure]]. + */ protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = { if (noFailure) { if (failure != null) { @@ -134,9 +158,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa assert(taskScheduler.runningTaskSets.isEmpty) assert(!backend.hasTasks) } else { - // Note that we CANNOT check for empty results on a failure -- the resultHandler will - // record results from successful tasks, even if the job fails overall. We just check - // that we got a failure. assert(failure != null) } assert(scheduler.activeJobs.isEmpty) @@ -314,7 +335,7 @@ private[spark] abstract class MockBackend( val offers: Seq[WorkerOffer] = generateOffers() val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual - // tests for introducing a race if they need it + // tests from introducing a race if they need it val newTasks = taskScheduler.synchronized { newTaskDescriptions.map { taskDescription => val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet @@ -323,7 +344,7 @@ private[spark] abstract class MockBackend( } } synchronized { - newTasks.foreach { case (taskDescription, task) => + newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK @@ -451,7 +472,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor */ testScheduler("super simple job") { def runBackend(): Unit = { - val (taskDescripition, task) = backend.beginTask() + val (taskDescripition, _) = backend.beginTask() backend.taskSuccess(taskDescripition, 42) } withBackend(runBackend _) { @@ -559,7 +580,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor testScheduler("job failure after 4 attempts") { def runBackend(): Unit = { - val (taskDescription, task) = backend.beginTask() + val (taskDescription, _) = backend.beginTask() backend.taskFailed(taskDescription, new RuntimeException("test task failure")) } withBackend(runBackend _) { From 08ab5d5616dbf501bf8d14126a4aa865ba552ba1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 3 Jun 2016 09:37:06 -0500 Subject: [PATCH 4/4] reword --- .../apache/spark/scheduler/SchedulerIntegrationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 7fb9d4821def2..92bd76548e82f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -97,8 +97,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa * the job has failed, they will get included in `results`. Instead, check for job failure by * checking [[failure]]. (Also see [[assertDataStructuresEmpty()]]) * - * 2. The only gets cleared between tests. So it won't make much sense if you submit more - * than one job inside one test (or at least, you need to be sure to do your own handling for it) + * 2. This only gets cleared between tests. So you'll need to do special handling if you submit + * more than one job in one test. */ val results = new HashMap[Int, Any]()