From 05ce9446d386b2ad7827c2ca9f1cb38bf343e9a7 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 7 Feb 2017 19:25:31 +0800 Subject: [PATCH 1/2] [SPARK-19263] fix race in SchedulerIntegrationSuite. --- .../scheduler/SchedulerIntegrationSuite.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 2ba63da881be3..6ebfe46bff415 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -391,17 +391,18 @@ private[spark] abstract class MockBackend( * scheduling. */ override def reviveOffers(): Unit = { - val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten - // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual - // tests from introducing a race if they need it - val newTasks = taskScheduler.synchronized { - newTaskDescriptions.map { taskDescription => - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet - val task = taskSet.tasks(taskDescription.index) - (taskDescription, task) - } - } - synchronized { + // Need a lock on the entire scheduler to protect freeCores -- otherwise, multiple threads + // may make offers at the same time, though they are using the same set of freeCores. + taskScheduler.synchronized { + val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten + // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual + // tests from introducing a race if they need it. + val newTasks = + newTaskDescriptions.map { taskDescription => + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + (taskDescription, task) + } newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } From 67fe5dfe9d00c628c15078d8d99c5b0de3962946 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 8 Feb 2017 10:08:12 +0800 Subject: [PATCH 2/2] fix spacing --- .../spark/scheduler/SchedulerIntegrationSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 6ebfe46bff415..398ac3d6202db 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -397,12 +397,11 @@ private[spark] abstract class MockBackend( val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual // tests from introducing a race if they need it. - val newTasks = - newTaskDescriptions.map { taskDescription => - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet - val task = taskSet.tasks(taskDescription.index) - (taskDescription, task) - } + val newTasks = newTaskDescriptions.map { taskDescription => + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + (taskDescription, task) + } newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK }