Skip to content

Commit

Permalink
[SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExec…
Browse files Browse the repository at this point in the history
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.

(cherry picked from commit c51c772)
Signed-off-by: Andrew Or <andrewor14@gmail.com>
  • Loading branch information
JoshRosen authored and andrewor14 committed Nov 30, 2016
1 parent 8b33aa0 commit 1b1c849
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 4 deletions.
Expand Up @@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] =
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors

sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
Expand Down
Expand Up @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl(
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

def runningTasksByExecutors(): Map[String, Int] = {
def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size)
}

Expand Down
Expand Up @@ -304,7 +304,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
}

test("if a task finishes with TaskState.LOST its executor is marked as dead") {
Expand Down Expand Up @@ -335,7 +335,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)

// Check that the executor has been marked as dead
assert(!taskScheduler.isExecutorAlive("executor0"))
Expand Down

0 comments on commit 1b1c849

Please sign in to comment.