Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29236][CORE] Access 'executorDataMap' out of 'DriverEndpoint' should be protected by lock #25922

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -68,10 +68,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
private val createTimeNs = System.nanoTime()

// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
// must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
// only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
// Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need
// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing executorDataMap in the inherited methods from ThreadSafeRpcEndpoint should also be OK.

private val executorDataMap = new HashMap[String, ExecutorData]

Expand Down Expand Up @@ -535,24 +535,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Return the number of executors currently registered with this backend.
*/
private def numExistingExecutors: Int = executorDataMap.size
private def numExistingExecutors: Int = synchronized { executorDataMap.size }

override def getExecutorIds(): Seq[String] = {
override def getExecutorIds(): Seq[String] = synchronized {
executorDataMap.keySet.toSeq
}

override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
}

override def maxNumConcurrentTasks(): Int = {
override def maxNumConcurrentTasks(): Int = synchronized {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
}

// this function is for testing only
def getExecutorAvailableResources(executorId: String): Map[String, ExecutorResourceInfo] = {
def getExecutorAvailableResources(
executorId: String): Map[String, ExecutorResourceInfo] = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu So, are these all of them which needs synchronized?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this method is testing-only. We may not need for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjoon-hyun, thanks for reviewing. Only those who accessing executorDataMap out of DriverEndpoint. I suggest this method to synchronize too because it will not add too much overhead for the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. No problem.

executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty)
}

Expand Down