From 749e75df8bc5fba3ce4e9f7c3e07eaf827c20398 Mon Sep 17 00:00:00 2001 From: victors-oai Date: Mon, 3 Nov 2025 13:03:46 -0800 Subject: [PATCH 1/9] Implement a DeploymentPodsAllocator --- docs/running-on-kubernetes.md | 18 +- .../org/apache/spark/deploy/k8s/Config.scala | 13 +- .../features/BasicExecutorFeatureStep.scala | 11 +- .../cluster/k8s/DeploymentPodsAllocator.scala | 199 ++++++++++++++++++ .../k8s/KubernetesClusterManager.scala | 15 +- .../KubernetesClusterSchedulerBackend.scala | 35 +++ .../spark/deploy/k8s/Fabric8Aliases.scala | 7 +- .../BasicExecutorFeatureStepSuite.scala | 21 ++ .../k8s/DeploymentAllocatorSuite.scala | 190 +++++++++++++++++ .../k8s/KubernetesClusterManagerSuite.scala | 51 ++++- ...bernetesClusterSchedulerBackendSuite.scala | 75 +++++-- 11 files changed, 602 insertions(+), 33 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 68cd9a78d0f3..c4033913442d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1485,6 +1485,16 @@ See the [configuration page](configuration.html) for information on Spark config 3.3.0 + + spark.kubernetes.executor.podDeletionCost + (none) + + Value to apply to the controller.kubernetes.io/pod-deletion-cost annotation + when Spark tells a deployment-based allocator to remove executor pods. Set this to steer + Kubernetes to remove the same pods that Spark selected when the deployment scales down. + + 3.5.2 + spark.kubernetes.executor.scheduler.name (none) @@ -1665,10 +1675,10 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.allocation.pods.allocator direct - Allocator to use for pods. Possible values are direct (the default) - and statefulset, or a full class name of a class implementing `AbstractPodsAllocator`. - Future version may add Job or replicaset. This is a developer API and may change - or be removed at anytime. + Allocator to use for pods. Possible values are direct (the default), + statefulset, deployment, or a full class name of a class + implementing `AbstractPodsAllocator`. Future version may add Job or replicaset. + This is a developer API and may change or be removed at anytime. 3.3.0 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index f4d708f30b43..1d1d08d39a28 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -460,10 +460,19 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_POD_DELETION_COST = + ConfigBuilder("spark.kubernetes.executor.podDeletionCost") + .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" + + " annotation when Spark asks a deployment-based allocator to remove executor pods. This " + + "helps Kubernetes pick the same pods Spark selected when the deployment scales down.") + .version("3.5.2") + .intConf + .createOptional + val KUBERNETES_ALLOCATION_PODS_ALLOCATOR = ConfigBuilder("spark.kubernetes.allocation.pods.allocator") - .doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " + - ", or a full class name of a class implementing AbstractPodsAllocator. " + + .doc("Allocator to use for pods. Possible values are direct (the default), statefulset," + + " deployment, or a full class name of a class implementing AbstractPodsAllocator. " + "Future version may add Job or replicaset. This is a developer API and may change " + "or be removed at anytime.") .version("3.3.0") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 13d1f1bc98a0..5f61c014127a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.features +import java.util.Locale + import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model._ @@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep( // hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters, // so take the last 63 characters of the pod name as the hostname. // This preserves uniqueness since the end of name contains executorId - val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH)) + var hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH)) // Remove non-word characters from the start of the hostname .replaceAll("^[^\\w]+", "") // Replace dangerous characters in the remaining string with a safe alternative. .replaceAll("[^\\w-]+", "_") + // Deployment resource does not support capital characters in the hostname + if (kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) { + hostname = hostname.toLowerCase(Locale.ROOT) + } + val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi") val executorCpuQuantity = new Quantity(executorCoresRequest) val executorResourceQuantities = @@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep( } val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match { - case "statefulset" => "Always" + case "statefulset" | "deployment" => "Always" case _ => "Never" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala new file mode 100644 index 000000000000..a27722b45020 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec} +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference +import org.apache.spark.internal.Logging +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.util.{Clock, Utils} + +/** + * A pods allocator backed by Kubernetes Deployments. + * + * The Deployment controller honours the `controller.kubernetes.io/pod-deletion-cost` + * annotation, so executors selected by Spark for removal can be prioritised when the + * deployment scales down. This provides predictable downscale behaviour for dynamic + * allocation that is not possible with StatefulSets which only remove pods in ordinal order. + */ +class DeploymentPodsAllocator( + conf: SparkConf, + secMgr: SecurityManager, + executorBuilder: KubernetesExecutorBuilder, + kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore, + clock: Clock) extends AbstractPodsAllocator() with Logging { + + private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile] + + private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT) + + private val namespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME) + + val driverPod: Option[Pod] = kubernetesDriverPodName + .map(name => Option(kubernetesClient.pods() + .inNamespace(namespace) + .withName(name) + .get()) + .getOrElse(throw new SparkException( + s"No pod was found named $name in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.)."))) + + private var appId: String = _ + + private val deploymentsCreated = new mutable.HashSet[Int]() + + private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost" + + override def start( + applicationId: String, + schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + appId = applicationId + driverPod.foreach { pod => + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .inNamespace(namespace) + .withName(pod.getMetadata.getName) + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) + } + } + } + + override def setTotalExpectedExecutors( + resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = { + if (appId == null) { + throw new SparkException("setTotalExpectedExecutors called before start of allocator.") + } + resourceProfileToTotalExecs.foreach { case (rp, numExecs) => + rpIdToResourceProfile.getOrElseUpdate(rp.id, rp) + setTargetExecutorsDeployment(numExecs, appId, rp.id) + } + } + + override def isDeleted(executorId: String): Boolean = false + + private def setName(applicationId: String, resourceProfileId: Int): String = { + s"spark-d-$applicationId-$resourceProfileId" + } + + private def setTargetExecutorsDeployment( + expected: Int, + applicationId: String, + resourceProfileId: Int): Unit = { + if (deploymentsCreated.contains(resourceProfileId)) { + kubernetesClient + .apps() + .deployments() + .inNamespace(namespace) + .withName(setName(applicationId, resourceProfileId)) + .scale(expected) + } else { + val executorConf = KubernetesConf.createExecutorConf( + conf, + "EXECID", + applicationId, + driverPod, + resourceProfileId) + val resolvedExecutorSpec = executorBuilder.buildFromFeatures( + executorConf, + secMgr, + kubernetesClient, + rpIdToResourceProfile(resourceProfileId)) + val executorPod = resolvedExecutorSpec.pod + + val podSpecBuilder = executorPod.pod.getSpec match { + case null => new PodSpecBuilder() + case s => new PodSpecBuilder(s) + } + val podWithAttachedContainer: PodSpec = podSpecBuilder + .addToContainers(executorPod.container) + .build() + + val meta = executorPod.pod.getMetadata + val resources = resolvedExecutorSpec.executorKubernetesResources + val failureMessage = + "PersistentVolumeClaims are not supported with the deployment allocator. " + + "Please remove PVC requirements or choose a different pods allocator." + val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim") + if (dynamicVolumeClaims.nonEmpty) { + throw new SparkException(failureMessage) + } + val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes) + .map(_.asScala.filter(_.getPersistentVolumeClaim != null)) + .getOrElse(Seq.empty) + if (staticVolumeClaims.nonEmpty) { + throw new SparkException(failureMessage) + } + + val currentAnnotations = Option(meta.getAnnotations) + .map(_.asScala).getOrElse(Map.empty[String, String]) + if (!currentAnnotations.contains(podDeletionCostAnnotation)) { + meta.setAnnotations((currentAnnotations + (podDeletionCostAnnotation -> "0")).asJava) + } + + val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer) + + val deployment = new DeploymentBuilder() + .withNewMetadata() + .withName(setName(applicationId, resourceProfileId)) + .withNamespace(namespace) + .endMetadata() + .withNewSpec() + .withReplicas(expected) + .withNewSelector() + .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId) + .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfileId.toString) + .endSelector() + .withTemplate(podTemplateSpec) + .endSpec() + .build() + + addOwnerReference(driverPod.get, Seq(deployment)) + kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create() + deploymentsCreated += resourceProfileId + } + } + + override def stop(applicationId: String): Unit = { + deploymentsCreated.foreach { rpid => + Utils.tryLogNonFatalError { + kubernetesClient + .apps() + .deployments() + .inNamespace(namespace) + .withName(setName(applicationId, rpid)) + .delete() + } + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 49eac64745b7..5191b5b2ff81 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -21,7 +21,7 @@ import java.io.File import io.fabric8.kubernetes.client.Config import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex} +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkMasterRegex} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME @@ -160,9 +160,20 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore) = { - val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match { + val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) + if (allocator == "deployment" && + Utils.isDynamicAllocationEnabled(sc.conf) && + sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { + throw new SparkException( + s"Dynamic allocation with the deployment pods allocator requires " + + s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.") + } + + val executorPodsAllocatorName = allocator match { case "statefulset" => classOf[StatefulSetPodsAllocator].getName + case "deployment" => + classOf[DeploymentPodsAllocator].getName case "direct" => classOf[ExecutorPodsAllocator].getName case fullClass => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index aacd8b84199e..a6d9d23fb8df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend( private val namespace = conf.get(KUBERNETES_NAMESPACE) + // KEP 2255: When a Deployment or Replicaset is scaled down, the pods will be deleted in the + // order of the value of this annotation, ascending. + private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost" + // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = { removeExecutor(executorId, reason) @@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend( super.getExecutorIds() } + private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = { + conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost => + logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with deletion cost $cost") + val annotateTask = new Runnable() { + override def run(): Unit = Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .inNamespace(namespace) + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*) + .resources() + .forEach { podResource => + podResource.edit({ p: Pod => + new PodBuilder(p).editOrNewMetadata() + .addToAnnotations(podDeletionCostAnnotation, cost.toString) + .endMetadata() + .build()}) + } + } + } + executorService.execute(annotateTask) + } + } + private def labelDecommissioningExecs(execIds: Seq[String]) = { // Only kick off the labeling task if we have a label. conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label => @@ -228,6 +257,9 @@ private[spark] class KubernetesClusterSchedulerBackend( // picked the pod to evict so we don't need to update the labels. if (!triggeredByExecutor) { labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq) + if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) { + annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq) + } } super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors, triggeredByExecutor) @@ -235,6 +267,9 @@ private[spark] class KubernetesClusterSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { // If we've decided to remove some executors we should tell Kubernetes that we don't care. + if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) { + annotateExecutorDeletionCost(executorIds) + } labelDecommissioningExecs(executorIds) // Tell the executors to exit themselves. diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala index 1a4bc9781da2..86deb3e52bae 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList} -import io.fabric8.kubernetes.api.model.apps.StatefulSet -import io.fabric8.kubernetes.api.model.apps.StatefulSetList +import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentList, StatefulSet, StatefulSetList} import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource} object Fabric8Aliases { @@ -38,6 +37,10 @@ object Fabric8Aliases { type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES] type STATEFUL_SETS_NAMESPACED = NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES] + type DEPLOYMENT_RES = RollableScalableResource[Deployment] + type DEPLOYMENTS = MixedOperation[Deployment, DeploymentList, DEPLOYMENT_RES] + type DEPLOYMENTS_NAMESPACED = + NonNamespaceOperation[Deployment, DeploymentList, DEPLOYMENT_RES] type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList, Resource[PersistentVolumeClaim]] type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, PersistentVolumeClaimList, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index ced1326e7938..b8b5da192a09 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.features +import java.util.Locale + import scala.jdk.CollectionConverters._ import com.google.common.net.InternetDomainName @@ -26,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSui import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation import org.apache.spark.internal.config import org.apache.spark.internal.config._ @@ -266,6 +269,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } } + test("deployment allocator uses restartPolicy Always and lowercase hostnames") { + baseConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment") + initDefaultProfile(baseConf) + val executorConf = KubernetesConf.createExecutorConf( + sparkConf = baseConf, + executorId = "EXECID", + appId = KubernetesTestConf.APP_ID, + driverPod = Some(DRIVER_POD)) + val step = new BasicExecutorFeatureStep(executorConf, new SecurityManager(baseConf), + defaultProfile) + val executor = step.configurePod(SparkPod.initialPod()) + + val hostname = executor.pod.getSpec.getHostname + assert(hostname === hostname.toLowerCase(Locale.ROOT)) + assert(InternetDomainName.isValid(hostname)) + assert(executor.pod.getSpec.getRestartPolicy === "Always") + } + test("classpath and extra java options get translated into environment variables") { baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar") baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala new file mode 100644 index 000000000000..c3f184138ed0 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.Deployment +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{AppsAPIGroupDSL, PodResource} +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito.{never, times, verify, when} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.resource.{ResourceProfile, ResourceProfileBuilder} +import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ + +class DeploymentAllocatorSuite extends SparkFunSuite with BeforeAndAfter { + + private val driverPodName = "driver" + + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(driverPodName) + .withUid("driver-pod-uid") + .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) + .endMetadata() + .build() + + private val conf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + .set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment") + + private val secMgr = new SecurityManager(conf) + private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(conf) + + private val schedulerBackendAppId = "testapp" + + @Mock private var kubernetesClient: KubernetesClient = _ + @Mock private var appsClient: AppsAPIGroupDSL = _ + @Mock private var deployments: DEPLOYMENTS = _ + @Mock private var deploymentsNamespaced: DEPLOYMENTS_NAMESPACED = _ + @Mock private var deploymentResource: DEPLOYMENT_RES = _ + @Mock private var pods: PODS = _ + @Mock private var podsNamespaced: PODS_WITH_NAMESPACE = _ + @Mock private var driverPodResource: PodResource = _ + @Mock private var executorPodResource: PodResource = _ + @Mock private var executorBuilder: KubernetesExecutorBuilder = _ + @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + + private var allocator: DeploymentPodsAllocator = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ + + before { + MockitoAnnotations.openMocks(this).close() + when(kubernetesClient.apps()).thenReturn(appsClient) + when(appsClient.deployments()).thenReturn(deployments) + when(deployments.inNamespace("default")).thenReturn(deploymentsNamespaced) + when(deploymentsNamespaced.resource(any(classOf[Deployment]))).thenReturn(deploymentResource) + when(deploymentsNamespaced.withName(any[String]())).thenReturn(deploymentResource) + + when(kubernetesClient.pods()).thenReturn(pods) + when(pods.inNamespace("default")).thenReturn(podsNamespaced) + when(podsNamespaced.withName(driverPodName)).thenReturn(driverPodResource) + when(podsNamespaced.resource(any(classOf[Pod]))).thenReturn(executorPodResource) + when(driverPodResource.get).thenReturn(driverPod) + when(driverPodResource.waitUntilReady(any(), any())).thenReturn(driverPod) + when(executorBuilder.buildFromFeatures( + any(classOf[KubernetesExecutorConf]), + meq(secMgr), + meq(kubernetesClient), + any(classOf[ResourceProfile]))) + .thenAnswer { invocation => + val k8sConf = invocation.getArgument[KubernetesExecutorConf](0) + KubernetesExecutorSpec( + executorPodWithId(0, k8sConf.resourceProfileId), + Seq.empty) + } + + snapshotsStore = new DeterministicExecutorPodsSnapshotsStore + allocator = new DeploymentPodsAllocator( + conf, + secMgr, + executorBuilder, + kubernetesClient, + snapshotsStore, + snapshotsStore.clock) + + when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) + allocator.start(TEST_SPARK_APP_ID, schedulerBackend) + } + + after { + ResourceProfile.clearDefaultProfile() + } + + test("creates deployments per resource profile and seeds deletion cost annotation") { + val rpBuilder = new ResourceProfileBuilder() + val secondProfile = rpBuilder.build() + + allocator.setTotalExpectedExecutors( + Map(defaultProfile -> 3, secondProfile -> 2)) + + val captor = ArgumentCaptor.forClass(classOf[Deployment]) + verify(deploymentsNamespaced, times(2)).resource(captor.capture()) + verify(deploymentResource, times(2)).create() + + val createdDeployments = captor.getAllValues.asScala + createdDeployments.foreach { deployment => + assert(deployment.getMetadata.getNamespace === "default") + assert(deployment.getSpec.getTemplate.getMetadata + .getAnnotations.get("controller.kubernetes.io/pod-deletion-cost") === "0") + assert(deployment.getSpec.getTemplate.getSpec.getContainers.asScala.exists( + _.getName == "spark-executor")) + val selectorLabels = deployment.getSpec.getSelector.getMatchLabels.asScala + assert(selectorLabels(SPARK_APP_ID_LABEL) === TEST_SPARK_APP_ID) + assert(selectorLabels(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE) + } + } + + test("scales existing deployment when replicas change") { + allocator.setTotalExpectedExecutors(Map(defaultProfile -> 5)) + verify(deploymentResource, times(1)).create() + + allocator.setTotalExpectedExecutors(Map(defaultProfile -> 7)) + verify(deploymentResource).scale(7) + } + + test("throws when executor template contributes dynamic PVCs") { + val pvc = persistentVolumeClaim("spark-pvc", "standard", "1Gi") + when(executorBuilder.buildFromFeatures( + any(classOf[KubernetesExecutorConf]), + meq(secMgr), + meq(kubernetesClient), + any(classOf[ResourceProfile]))) + .thenReturn(KubernetesExecutorSpec( + executorPodWithId(0), + Seq(pvc))) + + val error = intercept[SparkException] { + allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + } + assert(error.getMessage.contains("PersistentVolumeClaims are not supported")) + verify(deploymentResource, never()).create() + } + + test("throws when executor template includes static PVC references") { + when(executorBuilder.buildFromFeatures( + any(classOf[KubernetesExecutorConf]), + meq(secMgr), + meq(kubernetesClient), + any(classOf[ResourceProfile]))) + .thenReturn(KubernetesExecutorSpec( + executorPodWithIdAndVolume(0), + Seq.empty)) + + val error = intercept[SparkException] { + allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + } + assert(error.getMessage.contains("PersistentVolumeClaims are not supported")) + verify(deploymentResource, never()).create() + } + + test("deletes deployments on stop") { + allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + allocator.stop(schedulerBackendAppId) + verify(deploymentResource).delete() + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala index 07410b6a7b71..78e0942cfb82 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID import org.apache.spark.scheduler.local.LocalSchedulerBackend class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { @@ -39,27 +40,33 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var env: SparkEnv = _ - @Mock private var sparkConf: SparkConf = _ before { MockitoAnnotations.openMocks(this).close() + sparkConf = new SparkConf(false) + .set("spark.app.id", TEST_SPARK_APP_ID) + .set("spark.master", "k8s://test") when(sc.conf).thenReturn(sparkConf) - when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None) - when(sc.conf.get(EXECUTOR_INSTANCES)).thenReturn(None) - when(sc.conf.get(MAX_EXECUTOR_FAILURES)).thenReturn(None) - when(sc.conf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS)).thenReturn(None) when(sc.env).thenReturn(env) + when(env.securityManager).thenReturn(new SecurityManager(sparkConf)) + resetDynamicAllocatorConfig() + } + + after { + resetDynamicAllocatorConfig() } test("constructing a AbstractPodsAllocator works") { - val validConfigs = List("statefulset", "direct", + val validConfigs = List("statefulset", "deployment", "direct", classOf[StatefulSetPodsAllocator].getName, + classOf[DeploymentPodsAllocator].getName, classOf[ExecutorPodsAllocator].getName) validConfigs.foreach { c => val manager = new KubernetesClusterManager() - when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c) + sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c) manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) } } @@ -80,4 +87,34 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { assert(backend2.isInstanceOf[LocalSchedulerBackend]) assert(backend2.applicationId() === "user-app-id") } + + test("deployment allocator with dynamic allocation requires deletion cost") { + val manager = new KubernetesClusterManager() + sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment") + sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true") + sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key) + sparkConf.set("spark.shuffle.service.enabled", "true") + + val e = intercept[SparkException] { + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + } + assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)) + } + + test("deployment allocator with dynamic allocation and deletion cost succeeds") { + val manager = new KubernetesClusterManager() + sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment") + sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true") + sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1) + sparkConf.set("spark.shuffle.service.enabled", "true") + + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + } + + private def resetDynamicAllocatorConfig(): Unit = { + sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) + sparkConf.remove(DYN_ALLOCATION_ENABLED.key) + sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key) + sparkConf.remove("spark.shuffle.service.enabled") + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index b2e4a7182a77..d571791ae23f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -18,14 +18,15 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.Arrays import java.util.concurrent.TimeUnit +import java.util.function.UnaryOperator -import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList} +import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.jmock.lib.concurrent.DeterministicScheduler import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => mockitoEq} -import org.mockito.Mockito.{mock, never, spy, verify, when} +import org.mockito.Mockito.{atLeastOnce, mock, never, spy, verify, when} import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} @@ -39,6 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{Register import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID +import scala.collection.JavaConverters._ + class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter { private val schedulerExecutorService = new DeterministicScheduler() @@ -205,17 +208,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled)) verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled)) verify(labeledPods, never()).delete() - verify(pod1op, never()).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) - verify(pod2op, never()).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]])) + verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]])) schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) verify(labeledPods, never()).delete() - verify(pod1op, never()).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) - verify(pod2op, never()).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]])) + verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]])) when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream) val podList = mock(classOf[PodList]) @@ -227,16 +226,64 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) verify(labeledPods, never()).delete() schedulerExecutorService.runUntilIdle() - verify(pod1op).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) - verify(pod2op, never()).edit(any( - classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod1op).edit(any(classOf[UnaryOperator[Pod]])) + verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]])) verify(labeledPods, never()).delete() schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) verify(labeledPods).delete() } + test("Annotates executor pods with deletion cost when configured") { + sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 7) + schedulerBackendUnderTest.start() + + when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods) + when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) + when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) + when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) + when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3")).thenReturn(labeledPods) + + val podResource = mock(classOf[PodResource]) + val basePod = new PodBuilder() + .withNewMetadata() + .withName("exec-3") + .withNamespace("default") + .endMetadata() + .build() + + val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]]) + when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer { invocation => + val fn = invocation.getArgument[UnaryOperator[Pod]](0) + fn.apply(basePod) + } + + when(labeledPods.resources()) + .thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource)) + + val method = classOf[KubernetesClusterSchedulerBackend] + .getDeclaredMethods + .find(_.getName == "annotateExecutorDeletionCost") + .get + method.setAccessible(true) + method.invoke(schedulerBackendUnderTest, Seq("3")) + schedulerExecutorService.runUntilIdle() + + verify(podResource, atLeastOnce()).edit(editCaptor.capture()) + val appliedPods = editCaptor.getAllValues.asScala + .scanLeft(basePod)((pod, fn) => fn.apply(pod)) + .tail + val annotated = appliedPods + .find(_.getMetadata.getAnnotations.asScala + .contains("controller.kubernetes.io/pod-deletion-cost")) + assert(annotated.isDefined, + s"expected controller.kubernetes.io/pod-deletion-cost annotation, got annotations " + + s"${appliedPods.map(_.getMetadata.getAnnotations).asJava}") + val annotations = annotated.get.getMetadata.getAnnotations.asScala + assert(annotations("controller.kubernetes.io/pod-deletion-cost") === "7") + sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key) + } + test("SPARK-34407: CoarseGrainedSchedulerBackend.stop may throw SparkException") { schedulerBackendUnderTest.start() From 347f77f6168bdd5d822d67d200ea9256771dd5e1 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Mon, 3 Nov 2025 18:38:46 -0800 Subject: [PATCH 2/9] compile with 2.13 scala --- .../spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala | 2 +- .../spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala | 2 +- .../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala index a27722b45020..238835df2719 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.mutable import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala index c3f184138ed0..2166cef9d73c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.scheduler.cluster.k8s -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.apps.Deployment diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index d571791ae23f..98803399a055 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{Register import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter { From 218b7338e348465a6d887181c2c77e1e9ce87fba Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 4 Nov 2025 10:54:02 -0800 Subject: [PATCH 3/9] more compile issues --- .../spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala index 238835df2719..d9bb7ae636eb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala @@ -157,7 +157,8 @@ class DeploymentPodsAllocator( val currentAnnotations = Option(meta.getAnnotations) .map(_.asScala).getOrElse(Map.empty[String, String]) if (!currentAnnotations.contains(podDeletionCostAnnotation)) { - meta.setAnnotations((currentAnnotations + (podDeletionCostAnnotation -> "0")).asJava) + val newAnnotations = currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0")) + meta.setAnnotations(newAnnotations.asJava) } val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer) From efa302d7e69607fe065c21e83db92c566060c702 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 4 Nov 2025 12:49:35 -0800 Subject: [PATCH 4/9] import order --- .../spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala index d9bb7ae636eb..213d12301a9c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala @@ -18,8 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.TimeUnit -import scala.jdk.CollectionConverters._ import scala.collection.mutable +import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec} import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder From 4d9725f2db6ce3ef50cbe2888f9a844167fdbe07 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 4 Nov 2025 14:53:40 -0800 Subject: [PATCH 5/9] import order --- .../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 98803399a055..76c2e16782c1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -20,6 +20,8 @@ import java.util.Arrays import java.util.concurrent.TimeUnit import java.util.function.UnaryOperator +import scala.jdk.CollectionConverters._ + import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource @@ -40,8 +42,6 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{Register import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID -import scala.jdk.CollectionConverters._ - class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter { private val schedulerExecutorService = new DeterministicScheduler() From 6bae7d9e7bf4fef29fbac256a00aaecbce28811d Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 4 Nov 2025 17:07:25 -0800 Subject: [PATCH 6/9] formatting --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1d1d08d39a28..57ef2af73a9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -465,7 +465,7 @@ private[spark] object Config extends Logging { .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" + " annotation when Spark asks a deployment-based allocator to remove executor pods. This " + "helps Kubernetes pick the same pods Spark selected when the deployment scales down.") - .version("3.5.2") + .version("4.1.0") .intConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 5191b5b2ff81..6b3c456071ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -161,9 +161,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore) = { val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) - if (allocator == "deployment" && - Utils.isDynamicAllocationEnabled(sc.conf) && - sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { + if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) && + sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { throw new SparkException( s"Dynamic allocation with the deployment pods allocator requires " + s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.") From b7ca2432d4b64f60211f5f6c225accf868fa955a Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 4 Nov 2025 22:02:52 -0800 Subject: [PATCH 7/9] 4.2.0 --- docs/running-on-kubernetes.md | 2 +- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c4033913442d..8060a46042b9 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1493,7 +1493,7 @@ See the [configuration page](configuration.html) for information on Spark config when Spark tells a deployment-based allocator to remove executor pods. Set this to steer Kubernetes to remove the same pods that Spark selected when the deployment scales down. - 3.5.2 + 4.2.0 spark.kubernetes.executor.scheduler.name diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 57ef2af73a9d..6736d74f133a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -465,7 +465,7 @@ private[spark] object Config extends Logging { .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" + " annotation when Spark asks a deployment-based allocator to remove executor pods. This " + "helps Kubernetes pick the same pods Spark selected when the deployment scales down.") - .version("4.1.0") + .version("4.2.0") .intConf .createOptional From f55e2444e12555f94d58c58d475ae714f42c2b5f Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Mon, 10 Nov 2025 09:53:12 -0800 Subject: [PATCH 8/9] address feedback --- .../org/apache/spark/deploy/k8s/Config.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 6736d74f133a..f3c54b04144b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,10 +18,9 @@ package org.apache.spark.deploy.k8s import java.util.Locale import java.util.concurrent.TimeUnit - import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{ConfigBuilder, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} +import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} private[spark] object Config extends Logging { @@ -460,15 +459,6 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_EXECUTOR_POD_DELETION_COST = - ConfigBuilder("spark.kubernetes.executor.podDeletionCost") - .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" + - " annotation when Spark asks a deployment-based allocator to remove executor pods. This " + - "helps Kubernetes pick the same pods Spark selected when the deployment scales down.") - .version("4.2.0") - .intConf - .createOptional - val KUBERNETES_ALLOCATION_PODS_ALLOCATOR = ConfigBuilder("spark.kubernetes.allocation.pods.allocator") .doc("Allocator to use for pods. Possible values are direct (the default), statefulset," + @@ -479,6 +469,17 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("direct") + val KUBERNETES_EXECUTOR_POD_DELETION_COST = + ConfigBuilder("spark.kubernetes.executor.podDeletionCost") + .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" + + " annotation when Spark asks a deployment-based allocator to remove executor pods. This " + + "helps Kubernetes pick the same pods Spark selected when the deployment scales down." + + s" This should only be enabled when both $KUBERNETES_ALLOCATION_PODS_ALLOCATOR is set to " + + s"deployment, and $DYN_ALLOCATION_ENABLED is enabled.") + .version("4.2.0") + .intConf + .createOptional + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") From 568f6f8537e120ab1bea47b930dbf3bc9ae6f14f Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Mon, 10 Nov 2025 09:59:09 -0800 Subject: [PATCH 9/9] unused imports --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 66ebda7ee95a..1fb214c87c53 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s import java.util.Locale import java.util.concurrent.TimeUnit + import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} +import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED} private[spark] object Config extends Logging {