diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f8be49e3959..aa635d5280ac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -238,7 +238,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _files: Seq[String] = _ private var _archives: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ - private var _statusStore: AppStatusStore = _ + private[spark] var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ private var _resources: immutable.Map[String, ResourceInformation] = _ private var _shuffleDriverComponents: ShuffleDriverComponents = _ diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e9a3780f0aaa..459cbf0f0fa7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -445,6 +445,34 @@ package object config { "Ensure that memory overhead is a double greater than 0") .createWithDefault(0.1) + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED = + ConfigBuilder("spark.executor.memoryOverheadBursty.enabled") + .doc("Whether to enable memory overhead bursty") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.executor.memoryOverheadBurstyFactor") + .doc("the bursty control factor controlling the size of memory overhead space shared with" + + s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap + memoryoverhead) *" + + s" (this value - 1), oldMemoryOverhead)") + .version("4.2.0") + .doubleConf + .checkValue((v: Double) => v >= 1.0, + "the value of bursty control factor has to be no less than 1") + .createWithDefault(1.2) + + private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder( + "spark.executor.burstyMemoryOverhead") + .doc(s"The adjusted amount of memoryOverhead to be allocated per executor" + + s" (the adjustment happens if ${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," + + " in MiB unless otherwise specified. This parameter is here only for UI demonstration," + + " there is not effect when user sets it directly") + .version("4.2.0") + .bytesConf(ByteUnit.MiB) + .createOptional + private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") .doc("When running on a standalone deploy cluster, " + "the maximum amount of CPU cores to request for the application from across " + diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 971b14265979..d102e7e630d6 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -31,6 +31,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.apache.spark.status.ApplicationEnvironmentInfoWrapper +import org.apache.spark.status.api.v1.ApplicationEnvironmentInfo import org.apache.spark.util.Utils /** @@ -48,7 +51,7 @@ import org.apache.spark.util.Utils @Evolving @Since("3.1.0") class ResourceProfile( - val executorResources: Map[String, ExecutorResourceRequest], + var executorResources: Map[String, ExecutorResourceRequest], val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { validate() @@ -485,6 +488,7 @@ object ResourceProfile extends Logging { pysparkMemoryMiB: Long, memoryOverheadMiB: Long, totalMemMiB: Long, + totalMemMiBLimit: Option[Long], customResources: Map[String, ExecutorResourceRequest]) private[spark] case class DefaultProfileExecutorResources( @@ -559,12 +563,63 @@ object ResourceProfile extends Logging { } else { 0L } - val totalMemMiB = - (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB) - ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, - pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources) + + if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) { + val totalMemMiB = + (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB) + ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, + pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = None, + finalCustomResources) + } else { + val burstyControlFactor = conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR) + val newMemoryOverheadMiB = (memoryOverheadMiB - math.min( + ((executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0)).toLong, + memoryOverheadMiB)) + val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + + pysparkMemToUseMiB + val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + memoryOffHeapMiB + + pysparkMemToUseMiB + logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" + + s" $newMemoryOverheadMiB MB") + updateEventLogAndUI(newMemoryOverheadMiB, conf) + ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB, + pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest, + totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources) + } + } + + private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: SparkConf): Unit = { + conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB) + val sparkContextOption = SparkContext.getActive + if (sparkContextOption.isDefined) { + val sparkContext = sparkContextOption.get + val klass = classOf[ApplicationEnvironmentInfoWrapper] + val currentAppEnvironment = sparkContext._statusStore.store.read(klass, klass.getName()).info + val newAppEnvironment = ApplicationEnvironmentInfo.create(currentAppEnvironment, + newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key -> + newMemoryOverheadMiB.toString)) + sparkContext._statusStore.store.write(new ApplicationEnvironmentInfoWrapper( + newAppEnvironment)) + this.synchronized { + if (!loggedBurstyMemoryOverhead) { + SparkContext.getActive.get.eventLogger.foreach { logger => + logger.onEnvironmentUpdate(SparkListenerEnvironmentUpdate( + Map("Spark Properties" -> newAppEnvironment.sparkProperties, + "JVM Information" -> Seq(("Java Version", newAppEnvironment.runtime.javaVersion), + ("Java Home", newAppEnvironment.runtime.javaHome), + ("Scala Version", newAppEnvironment.runtime.scalaVersion)), + "Hadoop Properties" -> newAppEnvironment.hadoopProperties, + "System Properties" -> newAppEnvironment.systemProperties, + "Classpath Entries" -> newAppEnvironment.classpathEntries) + )) + loggedBurstyMemoryOverhead = true + } + } + } + } } private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory" private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores" + private var loggedBurstyMemoryOverhead: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 6ae1dce57f31..4bcff76ce1f6 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -501,6 +501,30 @@ class ApplicationEnvironmentInfo private[spark] ( val classpathEntries: collection.Seq[(String, String)], val resourceProfiles: collection.Seq[ResourceProfileInfo]) +private[spark] object ApplicationEnvironmentInfo { + def create(appEnv: ApplicationEnvironmentInfo, + newSparkProperties: Map[String, String] = Map(), + newHadoopProperties: Map[String, String] = Map(), + newSystemProperties: Map[String, String] = Map(), + newClasspathProperties: Map[String, String] = Map(), + newResourceProfiles: Seq[ResourceProfileInfo] = Seq()): ApplicationEnvironmentInfo = { + if (newResourceProfiles.nonEmpty) { + require(!newResourceProfiles.exists(newRP => appEnv.resourceProfiles.map(_.id) + .contains(newRP.id)), "duplicate resource profile id in newResourceProfile and existing" + + " resource profiles") + } + new ApplicationEnvironmentInfo( + runtime = appEnv.runtime, + sparkProperties = (appEnv.sparkProperties.toMap ++ newSparkProperties).toSeq, + hadoopProperties = (appEnv.hadoopProperties.toMap ++ newHadoopProperties).toSeq, + systemProperties = (appEnv.systemProperties.toMap ++ newSystemProperties).toSeq, + classpathEntries = (appEnv.classpathEntries.toMap ++ newClasspathProperties).toSeq, + metricsProperties = appEnv.metricsProperties, + resourceProfiles = appEnv.resourceProfiles ++ newResourceProfiles + ) + } +} + class RuntimeInfo private[spark]( val javaVersion: String, val javaHome: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 5f61c014127a..b365cad7115c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -71,6 +71,26 @@ private[spark] class BasicExecutorFeatureStep( kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) } + // this is to make resource management easier in K8S by adding memoryOverhead if the user + // didn't specify memoryOverhead + resourceProfile.executorResources = { + if (resourceProfile.executorResources.contains(ResourceProfile.MEMORY)) { + if (resourceProfile.executorResources.contains(ResourceProfile.OVERHEAD_MEM)) { + resourceProfile.executorResources + } else { + // fill memory overhead + resourceProfile.executorResources ++ Map(ResourceProfile.OVERHEAD_MEM -> + new ExecutorResourceRequest(ResourceProfile.OVERHEAD_MEM, + math.floor(resourceProfile.executorResources(ResourceProfile.MEMORY).amount * + kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)).toLong)) + } + } else { + resourceProfile.executorResources ++ Map(ResourceProfile.MEMORY -> + new ExecutorResourceRequest(ResourceProfile.MEMORY, + kubernetesConf.sparkConf.get(EXECUTOR_MEMORY))) + } + } + val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, resourceProfile.executorResources, @@ -128,7 +148,10 @@ private[spark] class BasicExecutorFeatureStep( hostname = hostname.toLowerCase(Locale.ROOT) } - val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi") + val executorMemoryRequestQuantity = new Quantity(s"${execResources.totalMemMiB}Mi") + val executorMemoryLimitQuantity = execResources.totalMemMiBLimit.map { mem => + new Quantity(s"${mem}Mi") + }.getOrElse(executorMemoryRequestQuantity) val executorCpuQuantity = new Quantity(executorCoresRequest) val executorResourceQuantities = buildExecutorResourcesQuantities(execResources.customResources.values.toSet) @@ -212,8 +235,8 @@ private[spark] class BasicExecutorFeatureStep( .withImage(executorContainerImage) .withImagePullPolicy(kubernetesConf.imagePullPolicy) .editOrNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryQuantity) + .addToRequests("memory", executorMemoryRequestQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .addToLimits(executorResourceQuantities.asJava) .endResources() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index b8b5da192a09..1beccdb5a2a0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -88,9 +88,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } private def newExecutorConf( - environment: Map[String, String] = Map.empty): KubernetesExecutorConf = { + environment: Map[String, String] = Map.empty, + sparkConf: Option[SparkConf] = None): KubernetesExecutorConf = { KubernetesTestConf.createExecutorConf( - sparkConf = baseConf, + sparkConf = sparkConf.getOrElse(baseConf), driverPod = Some(DRIVER_POD), labels = CUSTOM_EXECUTOR_LABELS, environment = environment) @@ -528,6 +529,101 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " default memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + + val smallMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + initDefaultProfile(smallMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)), + new SecurityManager(smallMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + // assert(defaultProfile.executorResources("memoryOverhead").amount === 6.4 * 1024) + assert(resource.getLimits.get("memory").getAmount.toLong === math.floor((64 + 6.4) * 1024)) + assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " small memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + + val smallMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "10g") + initDefaultProfile(smallMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)), + new SecurityManager(smallMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 10240) + assert(resource.getLimits.get("memory").getAmount.toLong === 74 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " big memoryOverhead profile") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + + val bigMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "20g") + initDefaultProfile(bigMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)), + new SecurityManager(bigMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) + assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 3.2) * 1024)) + } + + test("when turning on bursty memory overhead, configure request and limit correctly with" + + " big memoryOverhead profile and non-default factor") { + baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs") + val basePod = SparkPod.initialPod() + + val bigMemoryOverheadConf = baseConf.clone + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true) + .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.1) + .set("spark.executor.memory", "64g") + .set("spark.executor.memoryOverhead", "20g") + initDefaultProfile(bigMemoryOverheadConf) + val step = new BasicExecutorFeatureStep( + newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)), + new SecurityManager(bigMemoryOverheadConf), defaultProfile) + + val podConfigured = step.configurePod(basePod) + val resource = podConfigured.container.getResources + assert(defaultProfile.executorResources("memory").amount === 64 * 1024) + assert(defaultProfile.executorResources("memoryOverhead").amount === 20480) + assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024) + assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 11.6) * 1024)) + } + test("SPARK-36075: Check executor pod respects nodeSelector/executorNodeSelector") { val initPod = SparkPod.initialPod() val sparkConf = new SparkConf()