Skip to content

Commit

Permalink
SPARK-1713. [YARN] Use a threadpool for launching executor containers
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Sep 10, 2014
1 parent 07ee4a2 commit 036550d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
the environment of the executor launcher.
</td>
</tr>
<tr>
<td><code>spark.yarn.containerLauncherMaxThreads</code></td>
<td>25</td>
<td>
The maximum number of threads to use in the application master for launching executor containers.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.util.{List => JList}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._
Expand All @@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

import com.google.common.util.concurrent.ThreadFactoryBuilder

object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
Expand Down Expand Up @@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)

private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
1, TimeUnit.MINUTES,
new LinkedBlockingQueue[Runnable](),
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)

def getNumExecutorsRunning: Int = numExecutorsRunning.intValue

def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
Expand Down Expand Up @@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
executorMemory,
executorCores,
securityMgr)
new Thread(executorRunnable).start()
launcherPool.execute(executorRunnable)
}
}
logDebug("""
Expand Down

0 comments on commit 036550d

Please sign in to comment.