Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23285][K8S] Allow fractional values for spark.executor.cores #20460

Closed
wants to merge 4 commits into from
Closed

[SPARK-23285][K8S] Allow fractional values for spark.executor.cores #20460

wants to merge 4 commits into from

Conversation

liyinan926
Copy link
Contributor

@liyinan926 liyinan926 commented Jan 31, 2018

What changes were proposed in this pull request?

K8s treats CPU as a "compressible resource" and can actually assign millicpus to individual containers, e.g., 0.1 (100 millicpus). In https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala#L94, we already parse spark.executor.cores as a double value. This PR enables use of fractional values for spark.executor.cores. It also allows fractional values for spark.task.cpus correspondingly. The various places where spark.executor.cores and spark.task.cpus are used are updated to do a conversion from double to integer values. The conversion is needed to not introduce a significant behavioral change to the way the other backends deal with executor cores. The maximum tasks per executor is now calculated as spark.executor.cores / spark.task.cpus rounded to an integer using the FLOOR mode.

How was this patch tested?

Manual tests.

@foxish @vanzin

K8s treats CPU as a "compressible resource" and can actually assign millicpus to individual containers, e.g., 0.1 or 100m. In https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala#L94, we already parse `spark.executor.cores` as a double value. This PR simply bypasses the check for integral values for the property in Kubernetes mode.
// the Kubernetes mode.
if (!master.startsWith("k8s")
&& executorCores != null
&& Try(executorCores.toInt).getOrElse(-1) <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be nested if..() statements?
That's more readable IMO but not sure what's considered more idiomatic here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intent here is just to check that it's positive, then just try to get it as a double and check it. This doesn't require making the logic dependent on a particular resource manager. Of course it relaxes the condition from what it is now, which would reject input like "1.5". But it would reject it not with a graceful exit but an exception, so I kind of think it's not what this code intends to catch anyway right now. (It would cause an error pretty quickly in YARN et al anyway)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done.

Copy link
Contributor

@foxish foxish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm modulo one question

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/442/

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/442/

standalone and Mesos coarse-grained modes.
</td>
<td>
The number of cores to use on each executor.

In standalone and Mesos coarse-grained modes, for more detail, see
<a href="spark-standalone.html#Executors Scheduling">this description</a>.
<a href="spark-standalone.html#Executors Scheduling">this description</a>. In Kubernetes mode,
a fractional value can be used, e.g., 0.1 or 100m.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100m isn't fractional though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. Fixed.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86894 has finished for PR 20460 at commit 712a5ff.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/446/

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/446/

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #86890 has finished for PR 20460 at commit a971870.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #86895 has finished for PR 20460 at commit d9805c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/462/

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/462/

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #86915 has finished for PR 20460 at commit d9805c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -267,7 +267,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
if (executorCores != null && Try(executorCores.toDouble).getOrElse(-1d) <= 0d) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think "0.0" is more readable than "0d", don't bother changing it here. Just an aside

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #4088 has finished for PR 20460 at commit d9805c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

Do we also want to update the comment of SPARK_EXECUTOR_CORES in spark-env.sh ?

@jiangxb1987
Copy link
Contributor

jiangxb1987 commented Feb 1, 2018

Actually I think this may fail some check (though may not throw exceptions) for instance this one:

if (contains("spark.cores.max") && contains("spark.executor.cores")) {
val totalCores = getInt("spark.cores.max", 1)
val executorCores = getInt("spark.executor.cores", 1)
val leftCores = totalCores % executorCores
if (leftCores != 0) {
logWarning(s"Total executor cores: ${totalCores} is not " +
s"divisible by cores per executor: ${executorCores}, " +
s"the left cores: ${leftCores} will not be allocated")
}
}

@liyinan926
Copy link
Contributor Author

@jiangxb1987 it seems spark.cores.max only applies to standalone and Mesos coarse-grained modes. In k8s mode, spark.cores.max is not applicable. I think the check really should not be here, but rather pushed to specific modes where this applies.

@jiangxb1987
Copy link
Contributor

Yea, we should either move the check to elsewhere or modify it, to avoid potential failure.

@liyinan926
Copy link
Contributor Author

@jiangxb1987 fixed the check in 44e489e.

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/488/

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/488/

@jerryshao
Copy link
Contributor

I think here (

) should also be fixed.

Besides, I think "spark.task.cpus" should also be fixed, right? BTW, looks like dynamic allocation can not be well supported if cores are fractional.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86949 has finished for PR 20460 at commit 44e489e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

I'd target this 2.3 & master. Waiting for tests

@felixcheung is it too risky to target to 2.3, this is a fundamental behavior change. We should make sure k8s could well use fractional cores, in the mean time we should also guard other cluster manager to use it. I think we need more tests on it.

@jiangxb1987
Copy link
Contributor

This may require more changes than it appears to be, agree we may need more tests to ensure it don't break anything, so +1 on target it to 2.4 instead of 2.3

@liyinan926
Copy link
Contributor Author

Agreed. It seems much more complicated than expected, considering the interaction between spark.executor.cores and spark.task.cpus and the implication for each scheduler backend. Will do a more thorough investigation.

@felixcheung
Copy link
Member

ah, sounds like this is more impactful than we thought

@liyinan926
Copy link
Contributor Author

liyinan926 commented Feb 2, 2018

@jiangxb1987 @felixcheung @jerryshao @srowen this PR has been updated to allow both spark.executor.cores and spark.task.cpus to have fractional values. Please take a look at the updated PR description. Thanks!

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/527/

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86992 has finished for PR 20460 at commit dc7483a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WorkerOffer(executorId: String, host: String, cores: Double)

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/527/

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86993 has finished for PR 20460 at commit 2d61782.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86995 has finished for PR 20460 at commit 476221d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86998 has finished for PR 20460 at commit b21e864.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liyinan926
Copy link
Contributor Author

Just realized that the update I did still won't work when dynamic resource allocation is enabled. So please ignore the update. This is definitely much more impactful than I thought.

@jerryshao
Copy link
Contributor

I would suggest to bring out a discussion or even a design on dev mail list before doing such ground changing. This may affect not only dynamic allocation, but also scheduler. It is better to collect all the feedbacks (especially those who works on scheduler side).

@liyinan926
Copy link
Contributor Author

Agreed. This is a fundamental change to the way Spark handles task scheduling, task parallelism, and dynamic resource allocation, etc., and it impacts every scheduler backends. I'm closing this PR for now. Thanks for reviewing and giving feedbacks!

@zenglian
Copy link

zenglian commented Sep 7, 2018

I suggest to introduce a new property spark.kubernetes.coreUnit = cpu | mcpu (cpu by default), which would affect k8s only and the change should be minimized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants