Skip to content

Commit

Permalink
Create new remonveExecutorAsync method
Browse files Browse the repository at this point in the history
  • Loading branch information
robbinspg committed Jun 1, 2016
1 parent c991d70 commit 6a95c50
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutor(executorId)
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ class BlockManagerMaster(

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
// Avoid potential deadlocks by using non-blocking call
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
def removeExecutorAsync(execId: String) {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DistributedSuiteMinThreads extends DistributedSuite {

// This suite runs DistributeSuite with the number of dispatcher
// threads set to the minimum of 2 to help identify deadlocks

val numThreads = System.getProperty("spark.rpc.netty.dispatcher.numThreads")

override def beforeAll() {
Expand Down

0 comments on commit 6a95c50

Please sign in to comment.