Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36059][K8S][FOLLOWUP] Support spark.kubernetes.scheduler.name #35499

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1332,21 +1332,30 @@ See the [configuration page](configuration.html) for information on Spark config
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.scheduler.name<code></td>
<td><code>spark.kubernetes.executor.scheduler.name</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

<td>(none)</td>
<td>
Specify the scheduler name for each executor pod.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.scheduler.name<code></td>
<td><code>spark.kubernetes.driver.scheduler.name</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

<td>(none)</td>
<td>
Specify the scheduler name for driver pod.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.scheduler.name</code></td>
<td>(none)</td>
<td>
Specify the scheduler name for driver and executor pods. If `spark.kubernetes.driver.scheduler.name` or
`spark.kubernetes.executor.scheduler.name` is set, will override this.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.configMap.maxSize</code></td>
<td><code>1572864</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,15 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_SCHEDULER_NAME =
ConfigBuilder("spark.kubernetes.scheduler.name")
.doc("Specify the scheduler name for driver and executor pods. If " +
s"`${KUBERNETES_DRIVER_SCHEDULER_NAME.key}` or " +
s"`${KUBERNETES_EXECUTOR_SCHEDULER_NAME.key}` is set, will override this.")
.version("3.3.0")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.executor.request.cores")
.doc("Specify the cpu request for each executor pod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
def secretEnvNamesToKeyRefs: Map[String, String]
def secretNamesToMountPaths: Map[String, String]
def volumes: Seq[KubernetesVolumeSpec]
def schedulerName: String
def schedulerName: Option[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since schedulerName is added at 3.3.0, it looks like a safe change.

def appId: String

def appName: String = get("spark.app.name", "spark")
Expand Down Expand Up @@ -136,7 +136,9 @@ private[spark] class KubernetesDriverConf(
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
}

override def schedulerName: String = get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse("")
override def schedulerName: Option[String] = {
Option(get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull))
}
}

private[spark] class KubernetesExecutorConf(
Expand Down Expand Up @@ -195,7 +197,9 @@ private[spark] class KubernetesExecutorConf(
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
}

override def schedulerName: String = get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse("")
override def schedulerName: Option[String] = {
Option(get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull))
}

private def checkExecutorEnvKey(key: String): Boolean = {
// Pattern for matching an executorEnv key, which meets certain naming rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.endSpec()
.build()

conf.get(KUBERNETES_DRIVER_SCHEDULER_NAME)
conf.schedulerName
.foreach(driverPod.getSpec.setSchedulerName)

SparkPod(driverPod, driverContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private[spark] class BasicExecutorFeatureStep(
.endSpec()
.build()
}
kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME)
kubernetesConf.schedulerName
.foreach(executorPod.getSpec.setSchedulerName)

SparkPod(executorPod, containerWithLifecycle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,28 @@ class KubernetesConfSuite extends SparkFunSuite {
test("SPARK-36059: Set driver.scheduler and executor.scheduler") {
val sparkConf = new SparkConf(false)
val execUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
val driverUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
assert(execUnsetConf.schedulerName === "")
assert(driverUnsetConf.schedulerName === "")

val driverUnsetConf = KubernetesTestConf.createDriverConf(sparkConf)
assert(execUnsetConf.schedulerName === None)
assert(driverUnsetConf.schedulerName === None)

sparkConf.set(KUBERNETES_SCHEDULER_NAME, "sameScheduler")
// Use KUBERNETES_SCHEDULER_NAME when is NOT set
assert(KubernetesTestConf.createDriverConf(sparkConf).schedulerName === Some("sameScheduler"))
assert(KubernetesTestConf.createExecutorConf(sparkConf).schedulerName === Some("sameScheduler"))

// Override by driver/executor side scheduler when ""
sparkConf.set(KUBERNETES_DRIVER_SCHEDULER_NAME, "")
sparkConf.set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "")
assert(KubernetesTestConf.createDriverConf(sparkConf).schedulerName === Some(""))
assert(KubernetesTestConf.createExecutorConf(sparkConf).schedulerName === Some(""))

// Override by driver/executor side scheduler when set
sparkConf.set(KUBERNETES_DRIVER_SCHEDULER_NAME, "driverScheduler")
sparkConf.set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "executorScheduler")
val execConf = KubernetesTestConf.createExecutorConf(sparkConf)
assert(execConf.schedulerName === "executorScheduler")
assert(execConf.schedulerName === Some("executorScheduler"))
val driverConf = KubernetesTestConf.createDriverConf(sparkConf)
assert(driverConf.schedulerName === "driverScheduler")
assert(driverConf.schedulerName === Some("driverScheduler"))
}

test("SPARK-37735: access appId in KubernetesConf") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ abstract class PodBuilderSuite extends SparkFunSuite {

protected def templateFileConf: ConfigEntry[_]

protected def roleSpecificSchedulerNameConf: ConfigEntry[_]

protected def userFeatureStepsConf: ConfigEntry[_]

protected def userFeatureStepWithExpectedAnnotation: (String, String)
Expand All @@ -53,6 +55,20 @@ abstract class PodBuilderSuite extends SparkFunSuite {
verify(client, never()).pods()
}

test("SPARK-36059: set custom scheduler") {
val client = mockKubernetesClient()
val conf1 = baseConf.clone().set(templateFileConf.key, "template-file.yaml")
.set(Config.KUBERNETES_SCHEDULER_NAME.key, "custom")
val pod1 = buildPod(conf1, client)
assert(pod1.pod.getSpec.getSchedulerName === "custom")

val conf2 = baseConf.clone().set(templateFileConf.key, "template-file.yaml")
.set(Config.KUBERNETES_SCHEDULER_NAME.key, "custom")
.set(roleSpecificSchedulerNameConf.key, "rolescheduler")
val pod2 = buildPod(conf2, client)
assert(pod2.pod.getSpec.getSchedulerName === "rolescheduler")
}

test("load pod template if specified") {
val client = mockKubernetesClient()
val sparkConf = baseConf.clone().set(templateFileConf.key, "template-file.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
}

override protected def roleSpecificSchedulerNameConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_SCHEDULER_NAME
}

override protected def userFeatureStepsConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
}

override protected def roleSpecificSchedulerNameConf: ConfigEntry[_] = {
Config.KUBERNETES_EXECUTOR_SCHEDULER_NAME
}

override protected def userFeatureStepsConf: ConfigEntry[_] = {
Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS
}
Expand Down