Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19522] Fix executor memory in local-cluster mode #16975

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 20 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -470,12 +470,25 @@ class SparkContext(config: SparkConf) extends Logging {
files.foreach(addFile)
}

_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
_executorMemory = {
val defaultMemory = 1024
val configuredMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
.map(Utils.memoryStringToMb)
// In local-cluster mode, always use the slave memory specified in the master string
// In other modes, use the configured memory if it exists
master match {
case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, em) =>
if (configuredMemory.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you at least change this so that spark.executor.memory takes precedence if it's set? Then both use cases are possible. (Maybe someone is crazy enough to be trying dynamic allocation in local-cluster mode, or something else...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

logWarning(s"Ignoring explicit setting of executor" +
s"memory $configuredMemory in local-cluster mode")
}
em.toInt
case _ =>
configuredMemory.getOrElse(defaultMemory)
}
}

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
Expand Down Expand Up @@ -2707,14 +2720,8 @@ object SparkContext extends Logging {
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}

assert(sc.executorMemory == memoryPerSlaveInt)
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
Expand Down
Expand Up @@ -466,7 +466,7 @@ object SparkSubmit extends CommandLineUtils {
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the change in SparkContext needed? Seems like this should be all that's needed.

As far as I understand, the last value in the local-cluster master is the amount of memory the worker has available; you may, for whatever reason, want to run executors with less than that, which your change doesn't seem to allow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this were the only change then specifying local-cluster[2,1,2048] doesn't actually do anything because we're not setting spark.executor.memory=2048mb anywhere. You could do --master local-cluster[2,1,2048] --conf spark.executor.memory=2048mb but that's cumbersome and now there are two ways to set the executor memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may, for whatever reason, want to run executors with less than that, which your change doesn't seem to allow.

Yeah, I thought about this long and hard but I just couldn't come up with a case where you would possibly want the worker size to be different from executor size in local-cluster mode. If you want to launch 5 workers (2GB), each with 2 executors (1GB), then you might as well just launch 10 executors (1GB) or run real standalone mode locally. I think it's better to fix the out-of-the-box case than to try to cover all potentially non-existent corner cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it would make local-cluster[] work like any other master, where you have to explicitly set the executor memory. I understand the desire to simplify things, but this is doing it at the cost of being inconsistent with other cluster managers.

(e.g. the same command line with a different master would behave differently - you'd fall back to having 1g of memory for executors instead of whatever was defined in the local-cluster string.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Anyway, either way is probably fine, so go with your judgement. It just seems like a lot of code in SparkContext just to support that use case.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistency is already inherent with the parameters in local-cluster[], so I'm not introducing it here with this change. I personally think it's a really bad interface to force the user set executor memory in two different places and require that these two values match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we're talking about a net addition of 7 LOC in SparkContext.scala, about half of which are comments and warning logs. It's really not that much code.

sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Expand Up @@ -388,6 +388,22 @@ class SparkSubmitSuite
runSparkSubmit(args)
}

test("executor memory in local-cluster mode") {
val executorMemoryMb = 1888
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", LocalClusterExecutorMemoryTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", s"local-cluster[2,1,$executorMemoryMb]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.executor.memory=${executorMemoryMb * 2}", // not used
"--conf", "spark.testing.reservedMemory=0", // needed to avoid SPARK-12759
unusedJar.toString,
executorMemoryMb.toString)
runSparkSubmit(args)
}

test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
Expand Down Expand Up @@ -720,6 +736,21 @@ object JarCreationTest extends Logging {
}
}

object LocalClusterExecutorMemoryTest {
def main(args: Array[String]): Unit = {
Utils.configTestLog4j("INFO")
val sc = new SparkContext
if (args.length != 1) {
throw new IllegalArgumentException("Excepted exactly 1 argument, got " + args.length)
}
val executorMemory = args.head.toInt
if (sc.executorMemory != executorMemory) {
throw new SparkException(
"Expected executor memory to be %s, was %s".format(executorMemory, sc.executorMemory))
}
}
}

object SimpleApplicationTest {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")
Expand Down