Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36059][K8S][FOLLOWUP] Support spark.kubernetes.scheduler.name #35499

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,15 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_SCHEDULER_NAME =
ConfigBuilder("spark.kubernetes.scheduler.name")
.doc("Specify the scheduler name for driver and executor pod, if " +
Yikun marked this conversation as resolved.
Show resolved Hide resolved
s"`${KUBERNETES_DRIVER_SCHEDULER_NAME.key}` or `${KUBERNETES_DRIVER_SCHEDULER_NAME.key}` " +
Yikun marked this conversation as resolved.
Show resolved Hide resolved
"is set, will replace this.")
Yikun marked this conversation as resolved.
Show resolved Hide resolved
.version("3.3.0")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.executor.request.cores")
.doc("Specify the cpu request for each executor pod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ private[spark] class KubernetesDriverConf(
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
}

override def schedulerName: String = get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse("")
override def schedulerName: String = {
get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).getOrElse(""))
}
}

private[spark] class KubernetesExecutorConf(
Expand Down Expand Up @@ -195,7 +197,9 @@ private[spark] class KubernetesExecutorConf(
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
}

override def schedulerName: String = get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse("")
override def schedulerName: String = {
get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).getOrElse(""))
Yikun marked this conversation as resolved.
Show resolved Hide resolved
}

private def checkExecutorEnvKey(key: String): Boolean = {
// Pattern for matching an executorEnv key, which meets certain naming rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,17 @@ class KubernetesConfSuite extends SparkFunSuite {
test("SPARK-36059: Set driver.scheduler and executor.scheduler") {
val sparkConf = new SparkConf(false)
val execUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
val driverUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
val driverUnsetConf = KubernetesTestConf.createDriverConf(sparkConf)
assert(execUnsetConf.schedulerName === "")
assert(driverUnsetConf.schedulerName === "")

sparkConf.set(KUBERNETES_SCHEDULER_NAME, "sameScheduler")
val execCommonConf = KubernetesTestConf.createExecutorConf(sparkConf)
assert(execCommonConf.schedulerName === "sameScheduler")
val driverCommonConf = KubernetesTestConf.createDriverConf(sparkConf)
assert(driverCommonConf.schedulerName === "sameScheduler")

assert(sparkConf.get(KUBERNETES_SCHEDULER_NAME) === Some("sameScheduler"))
sparkConf.set(KUBERNETES_DRIVER_SCHEDULER_NAME, "driverScheduler")
sparkConf.set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "executorScheduler")
val execConf = KubernetesTestConf.createExecutorConf(sparkConf)
Expand Down