Skip to content

Commit

Permalink
support yarn mode(percentage style)
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Jun 19, 2014
1 parent 3f8c941 commit 37f7dc2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Expand Up @@ -50,10 +50,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
val registeredRatio = conf.getDouble("spark.executor.registeredRatio", 0)
val maxRegisteredWaitingTime = conf.getInt("spark.executor.maxRegisteredWaitingTime", 10000)
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0)
val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredWaitingTime", 10000)
val createTime = System.currentTimeMillis()
var ready = if(registeredRatio==0)true else false
var ready = if (minRegisteredRatio <= 0) true else false

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -88,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
if (executorActor.size >= totalExecutors.get() * registeredRatio) {
if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) {
ready = true
}
makeOffers()
Expand Down
Expand Up @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(

logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExecutors.set(args.numExecutors)
// reset default minRegisteredRatio for yarn mode
if (minRegisteredRatio == 0) {
minRegisteredRatio = 0.9
ready = false
}
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
Expand Down

0 comments on commit 37f7dc2

Please sign in to comment.