Skip to content

Commit

Permalink
[SPARK-29236][CORE] Access 'executorDataMap' out of 'DriverEndpoint' …
Browse files Browse the repository at this point in the history
…should be protected by lock

### What changes were proposed in this pull request?

Protected the `executorDataMap` under lock when accessing it out of 'DriverEndpoint''s methods.

### Why are the changes needed?

Just as the comments:

>

// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
// must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
// only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
// `CoarseGrainedSchedulerBackend.this`.

`executorDataMap` is not threadsafe, it should be protected by lock when accessing it out of `DriverEndpoint`

### Does this PR introduce any user-facing change?

NO

### How was this patch tested?

Existed UT.

Closes #25922 from ConeyLiu/executorDataMap.

Authored-by: Xianyang Liu <xianyang.liu@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ConeyLiu authored and cloud-fan committed Sep 25, 2019
1 parent 58989cd commit e07cbbe
Showing 1 changed file with 9 additions and 8 deletions.
Expand Up @@ -68,10 +68,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
private val createTimeNs = System.nanoTime()

// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
// must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
// only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
// Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need
// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]

Expand Down Expand Up @@ -535,24 +535,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Return the number of executors currently registered with this backend.
*/
private def numExistingExecutors: Int = executorDataMap.size
private def numExistingExecutors: Int = synchronized { executorDataMap.size }

override def getExecutorIds(): Seq[String] = {
override def getExecutorIds(): Seq[String] = synchronized {
executorDataMap.keySet.toSeq
}

override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
}

override def maxNumConcurrentTasks(): Int = {
override def maxNumConcurrentTasks(): Int = synchronized {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
}

// this function is for testing only
def getExecutorAvailableResources(executorId: String): Map[String, ExecutorResourceInfo] = {
def getExecutorAvailableResources(
executorId: String): Map[String, ExecutorResourceInfo] = synchronized {
executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty)
}

Expand Down

0 comments on commit e07cbbe

Please sign in to comment.