Skip to content

Commit

Permalink
Don't block in BlockManagerMaster.removeExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
robbinspg committed May 31, 2016
1 parent 139e875 commit c991d70
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {

/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
// Use a minimum of 3 threads to avoid deadlocks
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(3, Runtime.getRuntime.availableProcessors()))
math.max(2, Runtime.getRuntime.availableProcessors()))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class BlockManagerMaster(

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
// Avoid potential deadlocks by using non-blocking call
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark


class DistributedSuiteMinThreads extends DistributedSuite {

// This suite runs DistributeSuite with the number of dispatcher
// threads set to the minimum of 2 to help identify deadlocks

val numThreads = System.getProperty("spark.rpc.netty.dispatcher.numThreads")

override def beforeAll() {
super.beforeAll()
System.setProperty("spark.rpc.netty.dispatcher.numThreads", "2")
}

override def afterAll() {
if (numThreads == null) {
System.clearProperty("spark.rpc.netty.dispatcher.numThreads")
} else {
System.setProperty("spark.rpc.netty.dispatcher.numThreads", numThreads)
}
super.afterAll()
}

}

0 comments on commit c991d70

Please sign in to comment.