Skip to content

Commit

Permalink
[SPARK-30949][K8S][CORE] Decouple requests and parallelism on drivers…
Browse files Browse the repository at this point in the history
… in K8s

### What changes were proposed in this pull request?
`spark.driver.cores` configuration is used to set the amount of parallelism in kubernetes cluster mode drivers. Previously the amount of parallelism in the drivers were the number of cores in the host when running on JDK 8u120 or older, or the maximum of driver containers resource requests and limits when running on [JDK 8u121 or newer](https://bugs.openjdk.java.net/browse/JDK-8173345). This will enable users to specify `spark.driver.cores` to set parallelism, and specify `spark.kubernetes.driver.requests.cores` to limit the resource requests of the driver container, effectively decoupling the two

### Why are the changes needed?
Drivers submitted in kubernetes cluster mode set the parallelism of various components like `RpcEnv`, `MemoryManager`, `BlockManager` from inferring the number of available cores by calling `Runtime.getRuntime().availableProcessors()`. By using this, spark applications running on JDK 8u120 or older incorrectly get the total number of cores in the host, [ignoring the cgroup limits set by kubernetes](https://bugs.openjdk.java.net/browse/JDK-6515172). JDK 8u121 and newer runtimes do not have this problem.

Orthogonal to this, it is currently not possible to decouple resource limits on the driver container with the amount of parallelism of the various network and memory components listed above.

### Does this PR introduce any user-facing change?
Yes. Previously the amount of parallelism in kubernetes cluster mode submitted drivers were the number of cores in the host when running on JDK 8u120 or older, or the maximum of driver containers resource requests and limits when running on JDK 8u121 or newer. Now the value of `spark.driver.cores` is used.

### How was this patch tested?
happy to add tests if my proposal looks reasonable

Closes #27695 from onursatici/os/decouple-requests-and-parallelism.

Authored-by: Onur Satici <onursatici@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
onursatici authored and dongjoon-hyun committed Apr 21, 2020
1 parent ae29cf2 commit ad96510
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -2740,7 +2740,7 @@ object SparkContext extends Logging {
case "local" => 1
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" =>
case "yarn" | SparkMasterRegex.KUBERNETES_REGEX(_) =>
if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
conf.getInt(DRIVER_CORES.key, 0)
} else {
Expand Down Expand Up @@ -2885,6 +2885,8 @@ private object SparkMasterRegex {
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connecting to kubernetes clusters
val KUBERNETES_REGEX = """k8s://(.*)""".r
}

/**
Expand Down

0 comments on commit ad96510

Please sign in to comment.