diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 52c4656c271bc..22a553e68439a 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { */ def getExecutorInfos: Array[SparkExecutorInfo] = { val executorIdToRunningTasks: Map[String, Int] = - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors sc.getExecutorStorageStatus.map { status => val bmId = status.blockManagerId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b2ef41e126871..feab4be31e40b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = { + def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 19b6fec9aec83..46c6a935f92a2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -304,7 +304,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty) assert(taskScheduler.taskIdToTaskSetManager.isEmpty) - assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) + assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) } test("if a task finishes with TaskState.LOST its executor is marked as dead") { @@ -335,7 +335,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty) assert(taskScheduler.taskIdToTaskSetManager.isEmpty) - assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) + assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) // Check that the executor has been marked as dead assert(!taskScheduler.isExecutorAlive("executor0"))