diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index bdc2a1a156b0..ababe38e4e25 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1474,6 +1474,16 @@ See the [configuration page](configuration.html) for information on Spark config
3.3.0 |
+
+ spark.kubernetes.executor.podDeletionCost |
+ (none) |
+
+ Value to apply to the controller.kubernetes.io/pod-deletion-cost annotation
+ when Spark tells a deployment-based allocator to remove executor pods. Set this to steer
+ Kubernetes to remove the same pods that Spark selected when the deployment scales down.
+ |
+ 4.2.0 |
+
spark.kubernetes.executor.scheduler.name |
(none) |
@@ -1654,10 +1664,10 @@ See the [configuration page](configuration.html) for information on Spark config
spark.kubernetes.allocation.pods.allocator |
direct |
- Allocator to use for pods. Possible values are direct (the default)
- and statefulset, or a full class name of a class implementing `AbstractPodsAllocator`.
- Future version may add Job or replicaset. This is a developer API and may change
- or be removed at anytime.
+ Allocator to use for pods. Possible values are direct (the default),
+ statefulset, deployment, or a full class name of a class
+ implementing `AbstractPodsAllocator`. Future version may add Job or replicaset.
+ This is a developer API and may change or be removed at anytime.
|
3.3.0 |
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index fafff5046b9d..1fb214c87c53 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED}
private[spark] object Config extends Logging {
@@ -462,14 +462,25 @@ private[spark] object Config extends Logging {
val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
- .doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " +
- ", or a full class name of a class implementing AbstractPodsAllocator. " +
+ .doc("Allocator to use for pods. Possible values are direct (the default), statefulset," +
+ " deployment, or a full class name of a class implementing AbstractPodsAllocator. " +
"Future version may add Job or replicaset. This is a developer API and may change " +
"or be removed at anytime.")
.version("3.3.0")
.stringConf
.createWithDefault("direct")
+ val KUBERNETES_EXECUTOR_POD_DELETION_COST =
+ ConfigBuilder("spark.kubernetes.executor.podDeletionCost")
+ .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" +
+ " annotation when Spark asks a deployment-based allocator to remove executor pods. This " +
+ "helps Kubernetes pick the same pods Spark selected when the deployment scales down." +
+ s" This should only be enabled when both $KUBERNETES_ALLOCATION_PODS_ALLOCATOR is set to " +
+ s"deployment, and $DYN_ALLOCATION_ENABLED is enabled.")
+ .version("4.2.0")
+ .intConf
+ .createOptional
+
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 13d1f1bc98a0..5f61c014127a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features
+import java.util.Locale
+
import scala.jdk.CollectionConverters._
import io.fabric8.kubernetes.api.model._
@@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep(
// hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
// so take the last 63 characters of the pod name as the hostname.
// This preserves uniqueness since the end of name contains executorId
- val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
+ var hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
// Remove non-word characters from the start of the hostname
.replaceAll("^[^\\w]+", "")
// Replace dangerous characters in the remaining string with a safe alternative.
.replaceAll("[^\\w-]+", "_")
+ // Deployment resource does not support capital characters in the hostname
+ if (kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
+ hostname = hostname.toLowerCase(Locale.ROOT)
+ }
+
val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities =
@@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
}
val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
- case "statefulset" => "Always"
+ case "statefulset" | "deployment" => "Always"
case _ => "Never"
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
new file mode 100644
index 000000000000..213d12301a9c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec}
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * A pods allocator backed by Kubernetes Deployments.
+ *
+ * The Deployment controller honours the `controller.kubernetes.io/pod-deletion-cost`
+ * annotation, so executors selected by Spark for removal can be prioritised when the
+ * deployment scales down. This provides predictable downscale behaviour for dynamic
+ * allocation that is not possible with StatefulSets which only remove pods in ordinal order.
+ */
+class DeploymentPodsAllocator(
+ conf: SparkConf,
+ secMgr: SecurityManager,
+ executorBuilder: KubernetesExecutorBuilder,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ clock: Clock) extends AbstractPodsAllocator() with Logging {
+
+ private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
+
+ private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
+
+ private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
+ private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME)
+
+ val driverPod: Option[Pod] = kubernetesDriverPodName
+ .map(name => Option(kubernetesClient.pods()
+ .inNamespace(namespace)
+ .withName(name)
+ .get())
+ .getOrElse(throw new SparkException(
+ s"No pod was found named $name in the cluster in the " +
+ s"namespace $namespace (this was supposed to be the driver pod.).")))
+
+ private var appId: String = _
+
+ private val deploymentsCreated = new mutable.HashSet[Int]()
+
+ private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"
+
+ override def start(
+ applicationId: String,
+ schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+ appId = applicationId
+ driverPod.foreach { pod =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(pod.getMetadata.getName)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+ }
+ }
+ }
+
+ override def setTotalExpectedExecutors(
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+ if (appId == null) {
+ throw new SparkException("setTotalExpectedExecutors called before start of allocator.")
+ }
+ resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
+ rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
+ setTargetExecutorsDeployment(numExecs, appId, rp.id)
+ }
+ }
+
+ override def isDeleted(executorId: String): Boolean = false
+
+ private def setName(applicationId: String, resourceProfileId: Int): String = {
+ s"spark-d-$applicationId-$resourceProfileId"
+ }
+
+ private def setTargetExecutorsDeployment(
+ expected: Int,
+ applicationId: String,
+ resourceProfileId: Int): Unit = {
+ if (deploymentsCreated.contains(resourceProfileId)) {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(setName(applicationId, resourceProfileId))
+ .scale(expected)
+ } else {
+ val executorConf = KubernetesConf.createExecutorConf(
+ conf,
+ "EXECID",
+ applicationId,
+ driverPod,
+ resourceProfileId)
+ val resolvedExecutorSpec = executorBuilder.buildFromFeatures(
+ executorConf,
+ secMgr,
+ kubernetesClient,
+ rpIdToResourceProfile(resourceProfileId))
+ val executorPod = resolvedExecutorSpec.pod
+
+ val podSpecBuilder = executorPod.pod.getSpec match {
+ case null => new PodSpecBuilder()
+ case s => new PodSpecBuilder(s)
+ }
+ val podWithAttachedContainer: PodSpec = podSpecBuilder
+ .addToContainers(executorPod.container)
+ .build()
+
+ val meta = executorPod.pod.getMetadata
+ val resources = resolvedExecutorSpec.executorKubernetesResources
+ val failureMessage =
+ "PersistentVolumeClaims are not supported with the deployment allocator. " +
+ "Please remove PVC requirements or choose a different pods allocator."
+ val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim")
+ if (dynamicVolumeClaims.nonEmpty) {
+ throw new SparkException(failureMessage)
+ }
+ val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes)
+ .map(_.asScala.filter(_.getPersistentVolumeClaim != null))
+ .getOrElse(Seq.empty)
+ if (staticVolumeClaims.nonEmpty) {
+ throw new SparkException(failureMessage)
+ }
+
+ val currentAnnotations = Option(meta.getAnnotations)
+ .map(_.asScala).getOrElse(Map.empty[String, String])
+ if (!currentAnnotations.contains(podDeletionCostAnnotation)) {
+ val newAnnotations = currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0"))
+ meta.setAnnotations(newAnnotations.asJava)
+ }
+
+ val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)
+
+ val deployment = new DeploymentBuilder()
+ .withNewMetadata()
+ .withName(setName(applicationId, resourceProfileId))
+ .withNamespace(namespace)
+ .endMetadata()
+ .withNewSpec()
+ .withReplicas(expected)
+ .withNewSelector()
+ .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
+ .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfileId.toString)
+ .endSelector()
+ .withTemplate(podTemplateSpec)
+ .endSpec()
+ .build()
+
+ addOwnerReference(driverPod.get, Seq(deployment))
+ kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create()
+ deploymentsCreated += resourceProfileId
+ }
+ }
+
+ override def stop(applicationId: String): Unit = {
+ deploymentsCreated.foreach { rpid =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(setName(applicationId, rpid))
+ .delete()
+ }
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 3fb1ed0c9c0f..15fcfe001a58 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkMasterRegex}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
@@ -160,9 +160,19 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore) = {
- val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
+ val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+ if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) &&
+ sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
+ throw new SparkException(
+ s"Dynamic allocation with the deployment pods allocator requires " +
+ s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.")
+ }
+
+ val executorPodsAllocatorName = allocator match {
case "statefulset" =>
classOf[StatefulSetPodsAllocator].getName
+ case "deployment" =>
+ classOf[DeploymentPodsAllocator].getName
case "direct" =>
classOf[ExecutorPodsAllocator].getName
case fullClass =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index aacd8b84199e..a6d9d23fb8df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val namespace = conf.get(KUBERNETES_NAMESPACE)
+ // KEP 2255: When a Deployment or Replicaset is scaled down, the pods will be deleted in the
+ // order of the value of this annotation, ascending.
+ private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"
+
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
@@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.getExecutorIds()
}
+ private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = {
+ conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost =>
+ logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with deletion cost $cost")
+ val annotateTask = new Runnable() {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabel(SPARK_APP_ID_LABEL, applicationId())
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
+ .resources()
+ .forEach { podResource =>
+ podResource.edit({ p: Pod =>
+ new PodBuilder(p).editOrNewMetadata()
+ .addToAnnotations(podDeletionCostAnnotation, cost.toString)
+ .endMetadata()
+ .build()})
+ }
+ }
+ }
+ executorService.execute(annotateTask)
+ }
+ }
+
private def labelDecommissioningExecs(execIds: Seq[String]) = {
// Only kick off the labeling task if we have a label.
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
@@ -228,6 +257,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
// picked the pod to evict so we don't need to update the labels.
if (!triggeredByExecutor) {
labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+ if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
+ annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+ }
}
super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors,
triggeredByExecutor)
@@ -235,6 +267,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
// If we've decided to remove some executors we should tell Kubernetes that we don't care.
+ if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
+ annotateExecutorDeletionCost(executorIds)
+ }
labelDecommissioningExecs(executorIds)
// Tell the executors to exit themselves.
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
index 1a4bc9781da2..86deb3e52bae 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
-import io.fabric8.kubernetes.api.model.apps.StatefulSet
-import io.fabric8.kubernetes.api.model.apps.StatefulSetList
+import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentList, StatefulSet, StatefulSetList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
object Fabric8Aliases {
@@ -38,6 +37,10 @@ object Fabric8Aliases {
type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type STATEFUL_SETS_NAMESPACED =
NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
+ type DEPLOYMENT_RES = RollableScalableResource[Deployment]
+ type DEPLOYMENTS = MixedOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
+ type DEPLOYMENTS_NAMESPACED =
+ NonNamespaceOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index ced1326e7938..b8b5da192a09 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features
+import java.util.Locale
+
import scala.jdk.CollectionConverters._
import com.google.common.net.InternetDomainName
@@ -26,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSui
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
import org.apache.spark.internal.config
import org.apache.spark.internal.config._
@@ -266,6 +269,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
}
}
+ test("deployment allocator uses restartPolicy Always and lowercase hostnames") {
+ baseConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ initDefaultProfile(baseConf)
+ val executorConf = KubernetesConf.createExecutorConf(
+ sparkConf = baseConf,
+ executorId = "EXECID",
+ appId = KubernetesTestConf.APP_ID,
+ driverPod = Some(DRIVER_POD))
+ val step = new BasicExecutorFeatureStep(executorConf, new SecurityManager(baseConf),
+ defaultProfile)
+ val executor = step.configurePod(SparkPod.initialPod())
+
+ val hostname = executor.pod.getSpec.getHostname
+ assert(hostname === hostname.toLowerCase(Locale.ROOT))
+ assert(InternetDomainName.isValid(hostname))
+ assert(executor.pod.getSpec.getRestartPolicy === "Always")
+ }
+
test("classpath and extra java options get translated into environment variables") {
baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
new file mode 100644
index 000000000000..2166cef9d73c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.Deployment
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{AppsAPIGroupDSL, PodResource}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileBuilder}
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class DeploymentAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val driverPodName = "driver"
+
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(driverPodName)
+ .withUid("driver-pod-uid")
+ .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .endMetadata()
+ .build()
+
+ private val conf = new SparkConf()
+ .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+ .set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+
+ private val secMgr = new SecurityManager(conf)
+ private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
+
+ private val schedulerBackendAppId = "testapp"
+
+ @Mock private var kubernetesClient: KubernetesClient = _
+ @Mock private var appsClient: AppsAPIGroupDSL = _
+ @Mock private var deployments: DEPLOYMENTS = _
+ @Mock private var deploymentsNamespaced: DEPLOYMENTS_NAMESPACED = _
+ @Mock private var deploymentResource: DEPLOYMENT_RES = _
+ @Mock private var pods: PODS = _
+ @Mock private var podsNamespaced: PODS_WITH_NAMESPACE = _
+ @Mock private var driverPodResource: PodResource = _
+ @Mock private var executorPodResource: PodResource = _
+ @Mock private var executorBuilder: KubernetesExecutorBuilder = _
+ @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+ private var allocator: DeploymentPodsAllocator = _
+ private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+ before {
+ MockitoAnnotations.openMocks(this).close()
+ when(kubernetesClient.apps()).thenReturn(appsClient)
+ when(appsClient.deployments()).thenReturn(deployments)
+ when(deployments.inNamespace("default")).thenReturn(deploymentsNamespaced)
+ when(deploymentsNamespaced.resource(any(classOf[Deployment]))).thenReturn(deploymentResource)
+ when(deploymentsNamespaced.withName(any[String]())).thenReturn(deploymentResource)
+
+ when(kubernetesClient.pods()).thenReturn(pods)
+ when(pods.inNamespace("default")).thenReturn(podsNamespaced)
+ when(podsNamespaced.withName(driverPodName)).thenReturn(driverPodResource)
+ when(podsNamespaced.resource(any(classOf[Pod]))).thenReturn(executorPodResource)
+ when(driverPodResource.get).thenReturn(driverPod)
+ when(driverPodResource.waitUntilReady(any(), any())).thenReturn(driverPod)
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenAnswer { invocation =>
+ val k8sConf = invocation.getArgument[KubernetesExecutorConf](0)
+ KubernetesExecutorSpec(
+ executorPodWithId(0, k8sConf.resourceProfileId),
+ Seq.empty)
+ }
+
+ snapshotsStore = new DeterministicExecutorPodsSnapshotsStore
+ allocator = new DeploymentPodsAllocator(
+ conf,
+ secMgr,
+ executorBuilder,
+ kubernetesClient,
+ snapshotsStore,
+ snapshotsStore.clock)
+
+ when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
+ allocator.start(TEST_SPARK_APP_ID, schedulerBackend)
+ }
+
+ after {
+ ResourceProfile.clearDefaultProfile()
+ }
+
+ test("creates deployments per resource profile and seeds deletion cost annotation") {
+ val rpBuilder = new ResourceProfileBuilder()
+ val secondProfile = rpBuilder.build()
+
+ allocator.setTotalExpectedExecutors(
+ Map(defaultProfile -> 3, secondProfile -> 2))
+
+ val captor = ArgumentCaptor.forClass(classOf[Deployment])
+ verify(deploymentsNamespaced, times(2)).resource(captor.capture())
+ verify(deploymentResource, times(2)).create()
+
+ val createdDeployments = captor.getAllValues.asScala
+ createdDeployments.foreach { deployment =>
+ assert(deployment.getMetadata.getNamespace === "default")
+ assert(deployment.getSpec.getTemplate.getMetadata
+ .getAnnotations.get("controller.kubernetes.io/pod-deletion-cost") === "0")
+ assert(deployment.getSpec.getTemplate.getSpec.getContainers.asScala.exists(
+ _.getName == "spark-executor"))
+ val selectorLabels = deployment.getSpec.getSelector.getMatchLabels.asScala
+ assert(selectorLabels(SPARK_APP_ID_LABEL) === TEST_SPARK_APP_ID)
+ assert(selectorLabels(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE)
+ }
+ }
+
+ test("scales existing deployment when replicas change") {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 5))
+ verify(deploymentResource, times(1)).create()
+
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 7))
+ verify(deploymentResource).scale(7)
+ }
+
+ test("throws when executor template contributes dynamic PVCs") {
+ val pvc = persistentVolumeClaim("spark-pvc", "standard", "1Gi")
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenReturn(KubernetesExecutorSpec(
+ executorPodWithId(0),
+ Seq(pvc)))
+
+ val error = intercept[SparkException] {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ }
+ assert(error.getMessage.contains("PersistentVolumeClaims are not supported"))
+ verify(deploymentResource, never()).create()
+ }
+
+ test("throws when executor template includes static PVC references") {
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenReturn(KubernetesExecutorSpec(
+ executorPodWithIdAndVolume(0),
+ Seq.empty))
+
+ val error = intercept[SparkException] {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ }
+ assert(error.getMessage.contains("PersistentVolumeClaims are not supported"))
+ verify(deploymentResource, never()).create()
+ }
+
+ test("deletes deployments on stop") {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ allocator.stop(schedulerBackendAppId)
+ verify(deploymentResource).delete()
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
index 07410b6a7b71..78e0942cfb82 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
import org.apache.spark.scheduler.local.LocalSchedulerBackend
class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
@@ -39,27 +40,33 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var env: SparkEnv = _
- @Mock
private var sparkConf: SparkConf = _
before {
MockitoAnnotations.openMocks(this).close()
+ sparkConf = new SparkConf(false)
+ .set("spark.app.id", TEST_SPARK_APP_ID)
+ .set("spark.master", "k8s://test")
when(sc.conf).thenReturn(sparkConf)
- when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None)
- when(sc.conf.get(EXECUTOR_INSTANCES)).thenReturn(None)
- when(sc.conf.get(MAX_EXECUTOR_FAILURES)).thenReturn(None)
- when(sc.conf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS)).thenReturn(None)
when(sc.env).thenReturn(env)
+ when(env.securityManager).thenReturn(new SecurityManager(sparkConf))
+ resetDynamicAllocatorConfig()
+ }
+
+ after {
+ resetDynamicAllocatorConfig()
}
test("constructing a AbstractPodsAllocator works") {
- val validConfigs = List("statefulset", "direct",
+ val validConfigs = List("statefulset", "deployment", "direct",
classOf[StatefulSetPodsAllocator].getName,
+ classOf[DeploymentPodsAllocator].getName,
classOf[ExecutorPodsAllocator].getName)
validConfigs.foreach { c =>
val manager = new KubernetesClusterManager()
- when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c)
manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
}
}
@@ -80,4 +87,34 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
assert(backend2.isInstanceOf[LocalSchedulerBackend])
assert(backend2.applicationId() === "user-app-id")
}
+
+ test("deployment allocator with dynamic allocation requires deletion cost") {
+ val manager = new KubernetesClusterManager()
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+
+ val e = intercept[SparkException] {
+ manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ }
+ assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key))
+ }
+
+ test("deployment allocator with dynamic allocation and deletion cost succeeds") {
+ val manager = new KubernetesClusterManager()
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+ sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1)
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+
+ manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ }
+
+ private def resetDynamicAllocatorConfig(): Unit = {
+ sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+ sparkConf.remove(DYN_ALLOCATION_ENABLED.key)
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ sparkConf.remove("spark.shuffle.service.enabled")
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2e4a7182a77..76c2e16782c1 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.Arrays
import java.util.concurrent.TimeUnit
+import java.util.function.UnaryOperator
-import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodBuilder, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.Mockito.{atLeastOnce, mock, never, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
@@ -205,17 +208,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
verify(labeledPods, never()).delete()
- verify(pod1op, never()).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods, never()).delete()
- verify(pod1op, never()).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
val podList = mock(classOf[PodList])
@@ -227,16 +226,64 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
schedulerExecutorService.runUntilIdle()
- verify(pod1op).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
- classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
verify(labeledPods, never()).delete()
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods).delete()
}
+ test("Annotates executor pods with deletion cost when configured") {
+ sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 7)
+ schedulerBackendUnderTest.start()
+
+ when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3")).thenReturn(labeledPods)
+
+ val podResource = mock(classOf[PodResource])
+ val basePod = new PodBuilder()
+ .withNewMetadata()
+ .withName("exec-3")
+ .withNamespace("default")
+ .endMetadata()
+ .build()
+
+ val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
+ when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer { invocation =>
+ val fn = invocation.getArgument[UnaryOperator[Pod]](0)
+ fn.apply(basePod)
+ }
+
+ when(labeledPods.resources())
+ .thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource))
+
+ val method = classOf[KubernetesClusterSchedulerBackend]
+ .getDeclaredMethods
+ .find(_.getName == "annotateExecutorDeletionCost")
+ .get
+ method.setAccessible(true)
+ method.invoke(schedulerBackendUnderTest, Seq("3"))
+ schedulerExecutorService.runUntilIdle()
+
+ verify(podResource, atLeastOnce()).edit(editCaptor.capture())
+ val appliedPods = editCaptor.getAllValues.asScala
+ .scanLeft(basePod)((pod, fn) => fn.apply(pod))
+ .tail
+ val annotated = appliedPods
+ .find(_.getMetadata.getAnnotations.asScala
+ .contains("controller.kubernetes.io/pod-deletion-cost"))
+ assert(annotated.isDefined,
+ s"expected controller.kubernetes.io/pod-deletion-cost annotation, got annotations " +
+ s"${appliedPods.map(_.getMetadata.getAnnotations).asJava}")
+ val annotations = annotated.get.getMetadata.getAnnotations.asScala
+ assert(annotations("controller.kubernetes.io/pod-deletion-cost") === "7")
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ }
+
test("SPARK-34407: CoarseGrainedSchedulerBackend.stop may throw SparkException") {
schedulerBackendUnderTest.start()