From 8f9f98fc768ddc2541903a64fbd25d9c1a0b6556 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 29 Apr 2026 18:12:59 -0700 Subject: [PATCH] [SPARK-56670] Restrict ExecutorResizePlugin to direct pods allocator --- .../cluster/k8s/ExecutorResizePlugin.scala | 10 ++++++++- .../k8s/ExecutorResizePluginSuite.scala | 21 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala index 77fd37dd538b5..d5d882978dfc5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys.{EXECUTOR_ID, MEMORY_SIZE} +import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, EXECUTOR_ID, MEMORY_SIZE} import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -53,6 +53,14 @@ class ExecutorResizeDriverPlugin extends DriverPlugin with Logging { ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-resize-plugin") override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) + if (allocator != "direct") { + logWarning(log"ExecutorResizePlugin requires the 'direct' pods allocator; " + + log"${MDC(CONFIG, KUBERNETES_ALLOCATION_PODS_ALLOCATOR.key)} is " + + log"${MDC(CONFIG2, allocator)}. Plugin will not start.") + return Map.empty[String, String].asJava + } + val interval = Utils.timeStringAsSeconds( sc.conf.get(EXECUTOR_RESIZE_INTERVAL.key, "1m")) val threshold = sc.conf.getDouble(EXECUTOR_RESIZE_THRESHOLD.key, 0.9) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala index 649ccd8a945df..e8ce7d44abd06 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala @@ -29,7 +29,9 @@ import org.mockito.Mockito.{mock, never, times, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.deploy.k8s.Config.KUBERNETES_ALLOCATION_PODS_ALLOCATOR import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ @@ -287,4 +289,21 @@ class ExecutorResizePluginSuite verify(podResource, times(1)).patch(any(), any(classOf[Pod])) } + + Seq("statefulset", "deployment").foreach { allocator => + test(s"init returns early when pods allocator is '$allocator'") { + val plugin = new ExecutorResizeDriverPlugin() + val sparkConf = new SparkConf().set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, allocator) + val sc = mock(classOf[SparkContext]) + when(sc.conf).thenReturn(sparkConf) + val pluginCtx = mock(classOf[PluginContext]) + + val result = plugin.init(sc, pluginCtx) + + assert(result.isEmpty) + val clientField = plugin.getClass.getDeclaredField("kubernetesClient") + clientField.setAccessible(true) + assert(clientField.get(plugin) == null) + } + } }