diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4191f9182ac17..ff57372dcb72d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -50,10 +50,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - val registeredRatio = conf.getDouble("spark.executor.registeredRatio", 0) - val maxRegisteredWaitingTime = conf.getInt("spark.executor.maxRegisteredWaitingTime", 10000) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0) + val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredWaitingTime", 10000) val createTime = System.currentTimeMillis() - var ready = if(registeredRatio==0)true else false + var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -88,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExecutors.get() * registeredRatio) { + if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) { ready = true } makeOffers() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 039cf4f276119..1e49a34a0b02f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) + totalExecutors.set(args.numExecutors) + // reset default minRegisteredRatio for yarn mode + if (minRegisteredRatio == 0) { + minRegisteredRatio = 0.9 + ready = false + } client = new Client(args, conf) appId = client.runApp() waitForApp()