From 1b4385abc4b631f193a9af20ff4688ff43fbdac8 Mon Sep 17 00:00:00 2001 From: Yao Wang Date: Sun, 23 Nov 2025 17:41:10 -0800 Subject: [PATCH 1/7] Open source Canon: Burst-aware Memory Allocation Algorithm for Spark@K8S --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/internal/config/package.scala | 27 +++++ .../spark/resource/ResourceProfile.scala | 73 +++++++++++- .../org/apache/spark/status/api/v1/api.scala | 23 ++++ .../features/BasicExecutorFeatureStep.scala | 29 ++++- .../BasicExecutorFeatureStepSuite.scala | 104 +++++++++++++++++- 6 files changed, 247 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f8be49e3959..aa635d5280ac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -238,7 +238,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _files: Seq[String] = _ private var _archives: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ - private var _statusStore: AppStatusStore = _ + private[spark] var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ private var _resources: immutable.Map[String, ResourceInformation] = _ private var _shuffleDriverComponents: ShuffleDriverComponents = _ diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e9a3780f0aaa..7def33bd9b20 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -445,6 +445,33 @@ package object config { "Ensure that memory overhead is a double greater than 0") .createWithDefault(0.1) + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED = + ConfigBuilder("spark.executor.memoryOverheadBursty.enabled") + .doc("Whether to enable memory overhead bursty") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.executor.memoryOverheadBurstyFactor") + .doc("the bursty control factor controlling the size of memory overhead space shared with" + + s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap + memoryoverhead) *" + + s" (this value - 1), oldMemoryOverhead)") + .version("3.2.0") + .doubleConf + .checkValue((v: Double) => v >= 1.0, + "the value of bursty control factor has to be no less than 1") + .createWithDefault(1.2) + + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder( + "spark.executor.burstyMemoryOverhead") + .doc(s"The adjusted amount of non-heap memory to be allocated per executor" + + s" (the adjustment happens if ${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," + + " in MiB unless otherwise specified.") + .version("2.3.0") + .bytesConf(ByteUnit.MiB) + .createOptional + private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") .doc("When running on a standalone deploy cluster, " + "the maximum amount of CPU cores to request for the application from across " + diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 971b14265979..bfcb3564a540 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -31,6 +31,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.apache.spark.status.ApplicationEnvironmentInfoWrapper +import org.apache.spark.status.api.v1.ApplicationEnvironmentInfo import org.apache.spark.util.Utils /** @@ -48,7 +51,7 @@ import org.apache.spark.util.Utils @Evolving @Since("3.1.0") class ResourceProfile( - val executorResources: Map[String, ExecutorResourceRequest], + var executorResources: Map[String, ExecutorResourceRequest], val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { validate() @@ -485,6 +488,7 @@ object ResourceProfile extends Logging { pysparkMemoryMiB: Long, memoryOverheadMiB: Long, totalMemMiB: Long, + totalMemMiBLimit: Option[Long], customResources: Map[String, ExecutorResourceRequest]) private[spark] case class DefaultProfileExecutorResources( @@ -559,12 +563,71 @@ object ResourceProfile extends Logging { } else { 0L } - val totalMemMiB = - (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB) - ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, - pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources) + + if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) { + val totalMemMiB = + (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB) + ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, + pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = None, + finalCustomResources) + } else { + val burstyControlFactor = conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR) + val newMemoryOverheadMiB = (memoryOverheadMiB - math.min( + (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), memoryOverheadMiB)) + .toLong + val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + + pysparkMemToUseMiB + val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + memoryOffHeapMiB + + pysparkMemToUseMiB + logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" + + s" $newMemoryOverheadMiB MB") + updateEventLogAndUI(newMemoryOverheadMiB, conf) + ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, + pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest, + totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources) + } + } + + private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: SparkConf): Unit = { + conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB) + val sparkContextOption = SparkContext.getActive + if (sparkContextOption.isDefined) { + val sparkContext = sparkContextOption.get + val klass = classOf[ApplicationEnvironmentInfoWrapper] + val currentAppEnvironment = sparkContext._statusStore.store.read(klass, klass.getName()).info + logInfo(s"currentAppEnvironment spark properties count:" + + s" ${currentAppEnvironment.sparkProperties.size}") + val newAppEnvironment = ApplicationEnvironmentInfo.create(currentAppEnvironment, + newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key -> + newMemoryOverheadMiB.toString)) + logInfo(s"newAppEnvironment spark properties count:" + + s" ${newAppEnvironment.sparkProperties.size}") + sparkContext._statusStore.store.write(new ApplicationEnvironmentInfoWrapper( + newAppEnvironment)) + // we have to post full information here, but need ensure that the downstream pipeline can + // consume duplicate entries properly + this.synchronized { + if (!loggedBurstyMemoryOverhead) { + SparkContext.getActive.get.eventLogger.foreach { logger => + logger.onEnvironmentUpdate(SparkListenerEnvironmentUpdate( + Map("Spark Properties" -> newAppEnvironment.sparkProperties, + "JVM Information" -> Seq(("Java Version", newAppEnvironment.runtime.javaVersion), + ("Java Home", newAppEnvironment.runtime.javaHome), + ("Scala Version", newAppEnvironment.runtime.scalaVersion)), + "Hadoop Properties" -> newAppEnvironment.hadoopProperties, + "System Properties" -> newAppEnvironment.systemProperties, + "Classpath Entries" -> newAppEnvironment.classpathEntries) + )) + loggedBurstyMemoryOverhead = true + logInfo("make a event log for bursty memory overhead") + } + } + } + logInfo(s"posted memoryoverhead update event") + } } private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory" private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores" + private var loggedBurstyMemoryOverhead: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 6ae1dce57f31..df775f3a6eb8 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -501,6 +501,29 @@ class ApplicationEnvironmentInfo private[spark] ( val classpathEntries: collection.Seq[(String, String)], val resourceProfiles: collection.Seq[ResourceProfileInfo]) +private[spark] object ApplicationEnvironmentInfo { + def create(appEnv: ApplicationEnvironmentInfo, + newSparkProperties: Map[String, String] = Map(), + newHadoopProperties: Map[String, String] = Map(), + newSystemProperties: Map[String, String] = Map(), + newClasspathProperties: Map[String, String] = Map(), + newResourceProfiles: Seq[ResourceProfileInfo] = Seq()): ApplicationEnvironmentInfo = { + if (newResourceProfiles.nonEmpty) { + require(!newResourceProfiles.exists(newRP => appEnv.resourceProfiles.map(_.id) + .contains(newRP.id)), "duplicate resource profile id in newResourceProfile and existing" + + " resource profiles") + } + new ApplicationEnvironmentInfo( + runtime = appEnv.runtime, + sparkProperties = (appEnv.sparkProperties.toMap ++ newSparkProperties).toSeq, + hadoopProperties = (appEnv.hadoopProperties.toMap ++ newHadoopProperties).toSeq, + systemProperties = (appEnv.systemProperties.toMap ++ newSystemProperties).toSeq, + classpathEntries = (appEnv.classpathEntries.toMap ++ newClasspathProperties).toSeq, + resourceProfiles = appEnv.resourceProfiles ++ newResourceProfiles + ) + } +} + class RuntimeInfo private[spark]( val javaVersion: String, val javaHome: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 5f61c014127a..1e0807688b20 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -71,6 +71,26 @@ private[spark] class BasicExecutorFeatureStep( kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) } + // this is to make resource management easier in K8S by adding memoryOverhead if the user + // didn't specify memoryOverhead + resourceProfile.executorResources = { + if (resourceProfile.executorResources.contains(ResourceProfile.MEMORY)) { + if (resourceProfile.executorResources.contains(ResourceProfile.OVERHEAD_MEM)) { + resourceProfile.executorResources + } else { + // fill memory overhead + resourceProfile.executorResources ++ Map(ResourceProfile.OVERHEAD_MEM -> + new ExecutorResourceRequest(ResourceProfile.OVERHEAD_MEM, + math.floor(resourceProfile.executorResources(ResourceProfile.MEMORY).amount * + kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)).toLong)) + } + } else { + resourceProfile.executorResources ++ Map(ResourceProfile.MEMORY -> + new ExecutorResourceRequest(ResourceProfile.MEMORY, + kubernetesConf.sparkConf.get(EXECUTOR_MEMORY))) + } + } + val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, resourceProfile.executorResources, @@ -128,7 +148,10 @@ private[spark] class BasicExecutorFeatureStep( hostname = hostname.toLowerCase(Locale.ROOT) } - val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi") + val executorMemoryRequestQuantity = new Quantity(s"${execResources.totalMemMiBRequest}Mi") + val executorMemoryLimitQuantity = execResources.totalMemMiBLimit.map { mem => + new Quantity(s"${mem}Mi") + }.getOrElse(executorMemoryRequestQuantity) val executorCpuQuantity = new Quantity(executorCoresRequest) val executorResourceQuantities = buildExecutorResourcesQuantities(execResources.customResources.values.toSet) @@ -212,8 +235,8 @@ private[spark] class BasicExecutorFeatureStep( .withImage(executorContainerImage) .withImagePullPolicy(kubernetesConf.imagePullPolicy) .editOrNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryQuantity) + .addToRequests("memory", executorMemoryRequestQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .addToLimits(executorResourceQuantities.asJava) .endResources() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index b8b5da192a09..04eeef1e3bf4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -88,9 +88,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } private def newExecutorConf( - environment: Map[String, String] = Map.empty): KubernetesExecutorConf = { + environment: Map[String, String] = Map.empty, + sparkConf: Option[SparkConf] = None): KubernetesExecutorConf = { KubernetesTestConf.createExecutorConf( - sparkConf = baseConf, + sparkConf = sparkConf.getOrElse(baseConf), driverPod = Some(DRIVER_POD), labels = CUSTOM_EXECUTOR_LABELS, environment = environment) @@ -528,6 +529,105 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " default memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + // scalastyle:off + + val smallMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + initDefaultProfile(smallMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)), + new SecurityManager(smallMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + // assert(defaultProfile.executorResources("memoryOverhead").amount === 6.4 * 1024) + assert(resource.getLimits.get("memory").getAmount.toLong === math.floor((64 + 6.4) * 1024)) + assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " small memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + // scalastyle:off + + val smallMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "10g") + initDefaultProfile(smallMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)), + new SecurityManager(smallMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 10240) + assert(resource.getLimits.get("memory").getAmount.toLong === 74 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " big memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + // scalastyle:off + + val bigMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "20g") + initDefaultProfile(bigMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)), + new SecurityManager(bigMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) + assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === math.floor((64 + 3.2) * 1024)) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " big memoryOverhead profile and non-default factor") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + // scalastyle:off + + val bigMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.1) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "20g") + initDefaultProfile(bigMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)), + new SecurityManager(bigMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) + assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === math.floor((64 + 11.6) * 1024)) + } + test("SPARK-36075: Check executor pod respects nodeSelector/executorNodeSelector") { val initPod = SparkPod.initialPod() val sparkConf = new SparkConf() From 3ace5d2ac4e1cbd113055e67d88d975648f7fcc9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Nov 2025 21:27:47 -0800 Subject: [PATCH 2/7] update --- .../org/apache/spark/internal/config/package.scala | 11 ++++++----- .../org/apache/spark/resource/ResourceProfile.scala | 8 -------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7def33bd9b20..459cbf0f0fa7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -448,7 +448,7 @@ package object config { private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED = ConfigBuilder("spark.executor.memoryOverheadBursty.enabled") .doc("Whether to enable memory overhead bursty") - .version("3.2.0") + .version("4.2.0") .booleanConf .createWithDefault(false) @@ -457,7 +457,7 @@ package object config { .doc("the bursty control factor controlling the size of memory overhead space shared with" + s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap + memoryoverhead) *" + s" (this value - 1), oldMemoryOverhead)") - .version("3.2.0") + .version("4.2.0") .doubleConf .checkValue((v: Double) => v >= 1.0, "the value of bursty control factor has to be no less than 1") @@ -465,10 +465,11 @@ package object config { private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder( "spark.executor.burstyMemoryOverhead") - .doc(s"The adjusted amount of non-heap memory to be allocated per executor" + + .doc(s"The adjusted amount of memoryOverhead to be allocated per executor" + s" (the adjustment happens if ${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," + - " in MiB unless otherwise specified.") - .version("2.3.0") + " in MiB unless otherwise specified. This parameter is here only for UI demonstration," + + " there is not effect when user sets it directly") + .version("4.2.0") .bytesConf(ByteUnit.MiB) .createOptional diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index bfcb3564a540..8a719f20e324 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -595,17 +595,11 @@ object ResourceProfile extends Logging { val sparkContext = sparkContextOption.get val klass = classOf[ApplicationEnvironmentInfoWrapper] val currentAppEnvironment = sparkContext._statusStore.store.read(klass, klass.getName()).info - logInfo(s"currentAppEnvironment spark properties count:" + - s" ${currentAppEnvironment.sparkProperties.size}") val newAppEnvironment = ApplicationEnvironmentInfo.create(currentAppEnvironment, newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key -> newMemoryOverheadMiB.toString)) - logInfo(s"newAppEnvironment spark properties count:" + - s" ${newAppEnvironment.sparkProperties.size}") sparkContext._statusStore.store.write(new ApplicationEnvironmentInfoWrapper( newAppEnvironment)) - // we have to post full information here, but need ensure that the downstream pipeline can - // consume duplicate entries properly this.synchronized { if (!loggedBurstyMemoryOverhead) { SparkContext.getActive.get.eventLogger.foreach { logger => @@ -619,11 +613,9 @@ object ResourceProfile extends Logging { "Classpath Entries" -> newAppEnvironment.classpathEntries) )) loggedBurstyMemoryOverhead = true - logInfo("make a event log for bursty memory overhead") } } } - logInfo(s"posted memoryoverhead update event") } } From 5a8831a6cc302b248a622b7088ea117991032d22 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Nov 2025 21:29:50 -0800 Subject: [PATCH 3/7] more revision --- .../deploy/k8s/features/BasicExecutorFeatureStepSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 04eeef1e3bf4..c4e7965a0e35 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -534,7 +534,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") val basePod = SparkPod.initialPod() - // scalastyle:off val smallMemoryOverheadConf = baseConf.clone .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) @@ -558,7 +557,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") val basePod = SparkPod.initialPod() - // scalastyle:off val smallMemoryOverheadConf = baseConf.clone .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) @@ -583,7 +581,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") val basePod = SparkPod.initialPod() - // scalastyle:off val bigMemoryOverheadConf = baseConf.clone .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) @@ -608,7 +605,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") val basePod = SparkPod.initialPod() - // scalastyle:off val bigMemoryOverheadConf = baseConf.clone .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) From 955901fc89fd5140a2d7307cfcbfa2eb39f75187 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Nov 2025 21:56:30 -0800 Subject: [PATCH 4/7] fix build --- .../scala/org/apache/spark/resource/ResourceProfile.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 8a719f20e324..d102e7e630d6 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -573,8 +573,8 @@ object ResourceProfile extends Logging { } else { val burstyControlFactor = conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR) val newMemoryOverheadMiB = (memoryOverheadMiB - math.min( - (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), memoryOverheadMiB)) - .toLong + ((executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0)).toLong, + memoryOverheadMiB)) val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + memoryOffHeapMiB + From 983f92ea6b54c7dfa61059cd935daaf2b524baa3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Nov 2025 22:16:12 -0800 Subject: [PATCH 5/7] fix build --- core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index df775f3a6eb8..4bcff76ce1f6 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -519,6 +519,7 @@ private[spark] object ApplicationEnvironmentInfo { hadoopProperties = (appEnv.hadoopProperties.toMap ++ newHadoopProperties).toSeq, systemProperties = (appEnv.systemProperties.toMap ++ newSystemProperties).toSeq, classpathEntries = (appEnv.classpathEntries.toMap ++ newClasspathProperties).toSeq, + metricsProperties = appEnv.metricsProperties, resourceProfiles = appEnv.resourceProfiles ++ newResourceProfiles ) } From 7e6220b930d2c88b119592884a84c982fca3b53d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Nov 2025 06:44:34 -0800 Subject: [PATCH 6/7] fix build --- .../spark/deploy/k8s/features/BasicExecutorFeatureStep.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 1e0807688b20..b365cad7115c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -148,7 +148,7 @@ private[spark] class BasicExecutorFeatureStep( hostname = hostname.toLowerCase(Locale.ROOT) } - val executorMemoryRequestQuantity = new Quantity(s"${execResources.totalMemMiBRequest}Mi") + val executorMemoryRequestQuantity = new Quantity(s"${execResources.totalMemMiB}Mi") val executorMemoryLimitQuantity = execResources.totalMemMiBLimit.map { mem => new Quantity(s"${mem}Mi") }.getOrElse(executorMemoryRequestQuantity) From 65cf04645713d1ec493c5667da42b2c84356aea6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Nov 2025 09:16:05 -0800 Subject: [PATCH 7/7] fix test --- .../deploy/k8s/features/BasicExecutorFeatureStepSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index c4e7965a0e35..1beccdb5a2a0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -597,7 +597,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(defaultProfile.executorResources("memory").amount === 64 * 1024) assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) - assert(resource.getRequests.get("memory").getAmount.toLong === math.floor((64 + 3.2) * 1024)) + assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 3.2) * 1024)) } test("when turning on bursty memory overhead, configure request and limit correctly with" + @@ -621,7 +621,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(defaultProfile.executorResources("memory").amount === 64 * 1024) assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) - assert(resource.getRequests.get("memory").getAmount.toLong === math.floor((64 + 11.6) * 1024)) + assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 11.6) * 1024)) } test("SPARK-36075: Check executor pod respects nodeSelector/executorNodeSelector") {