From 2aa37a0306803045ae16e55aca9ff43f5fcc2956 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Li Date: Wed, 15 Apr 2026 12:02:34 -0700 Subject: [PATCH 1/2] [SPARK-56238][K8S] Fix app ID propagation in KubernetesClusterSchedulerBackend ### What changes were proposed in this pull request? Cache the application ID at construction time in `KubernetesClusterSchedulerBackend` so that `applicationId()` returns a stable value across calls. Previously, `applicationId()` fell back to `KubernetesConf.getKubernetesAppId()` when `spark.app.id` was not yet set, which generates a new random UUID on every call. In client mode, `SparkContext` sets `spark.app.id` only after `start()` returns, so during `start()` the multiple calls to `applicationId()` (for `podAllocator.start()`, `watchEvents.start()`, `pollEvents.start()`, and `setUpExecutorConfigMap()`) each received a different ID. This caused subsystems to use inconsistent app IDs for pod labeling and filtering. This only affects client mode. In cluster mode, the submission client generates the app ID upfront and writes it into `spark.app.id` via `BasicDriverFeatureStep` before the driver pod starts, so `conf.getOption("spark.app.id")` always returns a value and the `getOrElse` branch is never reached. The fix adds a `private val appId` that resolves the ID once at construction time and returns it consistently, matching the pattern used by `SchedulerBackend`, `LocalSchedulerBackend`, and other backends. ### Why are the changes needed? Without this fix, the Kubernetes scheduler backend could propagate different app IDs to different subsystems during `start()` in client mode, leading to: - Pod allocator, watch events, and poll events using different app IDs - `stop()` unable to clean up resources created by `start()` (services, PVCs, config maps, executor pods) because the label selector uses a different ID ### Does this PR introduce _any_ user-facing change? No direct user-facing change. This fixes an internal consistency issue that could cause resource leaks in Kubernetes client-mode deployments. ### How was this patch tested? - Unit tests in `KubernetesClusterSchedulerBackendSuite` verifying `applicationId()` stability both when `spark.app.id` is set and when it is not set. - Existing tests for `DeploymentAllocatorSuite`, `ExecutorPodsLifecycleManagerSuite`, and `StatefulSetAllocatorSuite` continue to pass. ### Was this patch authored or co-authored using generative AI tooling? Yes, co-authored with Kiro. --- .../KubernetesClusterSchedulerBackend.scala | 9 +++-- ...bernetesClusterSchedulerBackendSuite.scala | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 71ed4a6a5aee0..50c672781b96c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -67,6 +67,9 @@ private[spark] class KubernetesClusterSchedulerBackend( private val minRegisteredExecutors = initialExecutors * minRegisteredRatio + private val appId: String = + conf.getOption("spark.app.id").getOrElse(KubernetesConf.getKubernetesAppId()) + private val namespace = conf.get(KUBERNETES_NAMESPACE) private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE) @@ -98,13 +101,11 @@ private[spark] class KubernetesClusterSchedulerBackend( /** * Get an application ID associated with the job. * This returns the string value of spark.app.id if set, otherwise - * the locally-generated ID. + * a generated Kubernetes app ID. * * @return The application ID */ - override def applicationId(): String = { - conf.getOption("spark.app.id").getOrElse(KubernetesConf.getKubernetesAppId()) - } + override def applicationId(): String = appId override def start(): Unit = { super.start() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index c7165b81671ad..61e7c6e3746aa 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -314,4 +314,43 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger")) verify(context).reply("1") } + + test("SPARK-56238: applicationId() returns consistent value when spark.app.id is set") { + val id1 = schedulerBackendUnderTest.applicationId() + val id2 = schedulerBackendUnderTest.applicationId() + assert(id1 === id2) + assert(id1 === TEST_SPARK_APP_ID) + } + + test("SPARK-56238: applicationId() is stable across calls when spark.app.id is not set") { + // Use isolated mocks so we don't mutate the shared sc/rpcEnv state. + val confWithoutAppId = new SparkConf(false) + .set("spark.executor.instances", "3") + .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong") + .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld") + val localSc = mock(classOf[SparkContext]) + val localEnv = mock(classOf[SparkEnv]) + val localRpcEnv = mock(classOf[RpcEnv]) + when(localSc.conf).thenReturn(confWithoutAppId) + when(localSc.env).thenReturn(localEnv) + when(localSc.resourceProfileManager).thenReturn(resourceProfileManager) + when(localEnv.rpcEnv).thenReturn(localRpcEnv) + when(localRpcEnv.setupEndpoint(any(), any())).thenReturn(driverEndpointRef) + val localTaskScheduler = mock(classOf[TaskSchedulerImpl]) + when(localTaskScheduler.sc).thenReturn(localSc) + val backendWithoutAppId = new KubernetesClusterSchedulerBackend( + localTaskScheduler, + localSc, + kubernetesClient, + schedulerExecutorService, + eventQueue, + podAllocator, + lifecycleManager, + watchEvents, + pollEvents) + val id1 = backendWithoutAppId.applicationId() + val id2 = backendWithoutAppId.applicationId() + assert(id1 === id2, "applicationId() must return the same value on repeated calls") + assert(id1.startsWith("spark-"), "generated app ID should have the spark- prefix") + } } From e2d5ad8e2fa0ea2de20452220de91753137df547 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Li Date: Wed, 15 Apr 2026 13:14:40 -0700 Subject: [PATCH 2/2] remove unrelated test --- .../k8s/KubernetesClusterSchedulerBackendSuite.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 61e7c6e3746aa..e24ffe9ef52b6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -315,13 +315,6 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn verify(context).reply("1") } - test("SPARK-56238: applicationId() returns consistent value when spark.app.id is set") { - val id1 = schedulerBackendUnderTest.applicationId() - val id2 = schedulerBackendUnderTest.applicationId() - assert(id1 === id2) - assert(id1 === TEST_SPARK_APP_ID) - } - test("SPARK-56238: applicationId() is stable across calls when spark.app.id is not set") { // Use isolated mocks so we don't mutate the shared sc/rpcEnv state. val confWithoutAppId = new SparkConf(false)