Skip to content

Commit

Permalink
Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Jun 27, 2014
1 parent 4d6d847 commit c6f0522
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
Expand Up @@ -51,8 +51,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered executors / total expected executors)
// is equal to at least this value.
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
if (minRegisteredRatio > 1) minRegisteredRatio = 1
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
Expand Down
Expand Up @@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
var numExecutors = 2
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS

parseArgs(args.toList)

Expand Down Expand Up @@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.exit(exitCode)
}
}

object ApplicationMasterArguments {
val DEFAULT_NUMBER_EXECUTORS = 2
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.ApplicationMasterArguments
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam

Expand All @@ -28,12 +29,12 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
var numExecutors = 2
if (sc.getConf.contains("spark.executor.instances")) {
numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedExecutors.set(numExecutors)
}
}

0 comments on commit c6f0522

Please sign in to comment.