Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _files: Seq[String] = _
private var _archives: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private[spark] var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _resources: immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,34 @@ package object config {
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED =
ConfigBuilder("spark.executor.memoryOverheadBursty.enabled")
.doc("Whether to enable memory overhead bursty")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.executor.memoryOverheadBurstyFactor")
.doc("the bursty control factor controlling the size of memory overhead space shared with" +
s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap + memoryoverhead) *" +
s" (this value - 1), oldMemoryOverhead)")
.version("4.2.0")
.doubleConf
.checkValue((v: Double) => v >= 1.0,
"the value of bursty control factor has to be no less than 1")
.createWithDefault(1.2)

private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder(
"spark.executor.burstyMemoryOverhead")
.doc(s"The adjusted amount of memoryOverhead to be allocated per executor" +
s" (the adjustment happens if ${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," +
" in MiB unless otherwise specified. This parameter is here only for UI demonstration," +
" there is not effect when user sets it directly")
.version("4.2.0")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
.doc("When running on a standalone deploy cluster, " +
"the maximum amount of CPU cores to request for the application from across " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
import org.apache.spark.status.api.v1.ApplicationEnvironmentInfo
import org.apache.spark.util.Utils

/**
Expand All @@ -48,7 +51,7 @@ import org.apache.spark.util.Utils
@Evolving
@Since("3.1.0")
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
var executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {

validate()
Expand Down Expand Up @@ -485,6 +488,7 @@ object ResourceProfile extends Logging {
pysparkMemoryMiB: Long,
memoryOverheadMiB: Long,
totalMemMiB: Long,
totalMemMiBLimit: Option[Long],
customResources: Map[String, ExecutorResourceRequest])

private[spark] case class DefaultProfileExecutorResources(
Expand Down Expand Up @@ -559,12 +563,63 @@ object ResourceProfile extends Logging {
} else {
0L
}
val totalMemMiB =
(executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)

if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
val totalMemMiB =
(executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = None,
finalCustomResources)
} else {
val burstyControlFactor = conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
((executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0)).toLong,
memoryOverheadMiB))
val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB +
pysparkMemToUseMiB
val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + memoryOffHeapMiB +
pysparkMemToUseMiB
logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
s" $newMemoryOverheadMiB MB")
updateEventLogAndUI(newMemoryOverheadMiB, conf)
ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
}
}

private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: SparkConf): Unit = {
conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
val sparkContextOption = SparkContext.getActive
if (sparkContextOption.isDefined) {
val sparkContext = sparkContextOption.get
val klass = classOf[ApplicationEnvironmentInfoWrapper]
val currentAppEnvironment = sparkContext._statusStore.store.read(klass, klass.getName()).info
val newAppEnvironment = ApplicationEnvironmentInfo.create(currentAppEnvironment,
newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
newMemoryOverheadMiB.toString))
sparkContext._statusStore.store.write(new ApplicationEnvironmentInfoWrapper(
newAppEnvironment))
this.synchronized {
if (!loggedBurstyMemoryOverhead) {
SparkContext.getActive.get.eventLogger.foreach { logger =>
logger.onEnvironmentUpdate(SparkListenerEnvironmentUpdate(
Map("Spark Properties" -> newAppEnvironment.sparkProperties,
"JVM Information" -> Seq(("Java Version", newAppEnvironment.runtime.javaVersion),
("Java Home", newAppEnvironment.runtime.javaHome),
("Scala Version", newAppEnvironment.runtime.scalaVersion)),
"Hadoop Properties" -> newAppEnvironment.hadoopProperties,
"System Properties" -> newAppEnvironment.systemProperties,
"Classpath Entries" -> newAppEnvironment.classpathEntries)
))
loggedBurstyMemoryOverhead = true
}
}
}
}
}

private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory"
private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores"
private var loggedBurstyMemoryOverhead: Boolean = false
}
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,30 @@ class ApplicationEnvironmentInfo private[spark] (
val classpathEntries: collection.Seq[(String, String)],
val resourceProfiles: collection.Seq[ResourceProfileInfo])

private[spark] object ApplicationEnvironmentInfo {
def create(appEnv: ApplicationEnvironmentInfo,
newSparkProperties: Map[String, String] = Map(),
newHadoopProperties: Map[String, String] = Map(),
newSystemProperties: Map[String, String] = Map(),
newClasspathProperties: Map[String, String] = Map(),
newResourceProfiles: Seq[ResourceProfileInfo] = Seq()): ApplicationEnvironmentInfo = {
if (newResourceProfiles.nonEmpty) {
require(!newResourceProfiles.exists(newRP => appEnv.resourceProfiles.map(_.id)
.contains(newRP.id)), "duplicate resource profile id in newResourceProfile and existing" +
" resource profiles")
}
new ApplicationEnvironmentInfo(
runtime = appEnv.runtime,
sparkProperties = (appEnv.sparkProperties.toMap ++ newSparkProperties).toSeq,
hadoopProperties = (appEnv.hadoopProperties.toMap ++ newHadoopProperties).toSeq,
systemProperties = (appEnv.systemProperties.toMap ++ newSystemProperties).toSeq,
classpathEntries = (appEnv.classpathEntries.toMap ++ newClasspathProperties).toSeq,
metricsProperties = appEnv.metricsProperties,
resourceProfiles = appEnv.resourceProfiles ++ newResourceProfiles
)
}
}

class RuntimeInfo private[spark](
val javaVersion: String,
val javaHome: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ private[spark] class BasicExecutorFeatureStep(
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)
}

// this is to make resource management easier in K8S by adding memoryOverhead if the user
// didn't specify memoryOverhead
resourceProfile.executorResources = {
if (resourceProfile.executorResources.contains(ResourceProfile.MEMORY)) {
if (resourceProfile.executorResources.contains(ResourceProfile.OVERHEAD_MEM)) {
resourceProfile.executorResources
} else {
// fill memory overhead
resourceProfile.executorResources ++ Map(ResourceProfile.OVERHEAD_MEM ->
new ExecutorResourceRequest(ResourceProfile.OVERHEAD_MEM,
math.floor(resourceProfile.executorResources(ResourceProfile.MEMORY).amount *
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)).toLong))
}
} else {
resourceProfile.executorResources ++ Map(ResourceProfile.MEMORY ->
new ExecutorResourceRequest(ResourceProfile.MEMORY,
kubernetesConf.sparkConf.get(EXECUTOR_MEMORY)))
}
}

val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
resourceProfile.executorResources,
Expand Down Expand Up @@ -128,7 +148,10 @@ private[spark] class BasicExecutorFeatureStep(
hostname = hostname.toLowerCase(Locale.ROOT)
}

val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorMemoryRequestQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorMemoryLimitQuantity = execResources.totalMemMiBLimit.map { mem =>
new Quantity(s"${mem}Mi")
}.getOrElse(executorMemoryRequestQuantity)
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities =
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)
Expand Down Expand Up @@ -212,8 +235,8 @@ private[spark] class BasicExecutorFeatureStep(
.withImage(executorContainerImage)
.withImagePullPolicy(kubernetesConf.imagePullPolicy)
.editOrNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("memory", executorMemoryRequestQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits(executorResourceQuantities.asJava)
.endResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
}

private def newExecutorConf(
environment: Map[String, String] = Map.empty): KubernetesExecutorConf = {
environment: Map[String, String] = Map.empty,
sparkConf: Option[SparkConf] = None): KubernetesExecutorConf = {
KubernetesTestConf.createExecutorConf(
sparkConf = baseConf,
sparkConf = sparkConf.getOrElse(baseConf),
driverPod = Some(DRIVER_POD),
labels = CUSTOM_EXECUTOR_LABELS,
environment = environment)
Expand Down Expand Up @@ -528,6 +529,101 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {

}

test("when turning on bursty memory overhead, configure request and limit correctly with" +
" default memoryOverhead profile") {
baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()

val smallMemoryOverheadConf = baseConf.clone
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
.set("spark.executor.memory", "64g")
initDefaultProfile(smallMemoryOverheadConf)
val step = new BasicExecutorFeatureStep(
newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
new SecurityManager(smallMemoryOverheadConf), defaultProfile)

val podConfigured = step.configurePod(basePod)
val resource = podConfigured.container.getResources
assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
// assert(defaultProfile.executorResources("memoryOverhead").amount === 6.4 * 1024)
assert(resource.getLimits.get("memory").getAmount.toLong === math.floor((64 + 6.4) * 1024))
assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
}

test("when turning on bursty memory overhead, configure request and limit correctly with" +
" small memoryOverhead profile") {
baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()

val smallMemoryOverheadConf = baseConf.clone
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
.set("spark.executor.memory", "64g")
.set("spark.executor.memoryOverhead", "10g")
initDefaultProfile(smallMemoryOverheadConf)
val step = new BasicExecutorFeatureStep(
newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
new SecurityManager(smallMemoryOverheadConf), defaultProfile)

val podConfigured = step.configurePod(basePod)
val resource = podConfigured.container.getResources
assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
assert(defaultProfile.executorResources("memoryOverhead").amount === 10240)
assert(resource.getLimits.get("memory").getAmount.toLong === 74 * 1024)
assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
}

test("when turning on bursty memory overhead, configure request and limit correctly with" +
" big memoryOverhead profile") {
baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()

val bigMemoryOverheadConf = baseConf.clone
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
.set("spark.executor.memory", "64g")
.set("spark.executor.memoryOverhead", "20g")
initDefaultProfile(bigMemoryOverheadConf)
val step = new BasicExecutorFeatureStep(
newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)),
new SecurityManager(bigMemoryOverheadConf), defaultProfile)

val podConfigured = step.configurePod(basePod)
val resource = podConfigured.container.getResources
assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
assert(defaultProfile.executorResources("memoryOverhead").amount === 20480)
assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024)
assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 3.2) * 1024))
}

test("when turning on bursty memory overhead, configure request and limit correctly with" +
" big memoryOverhead profile and non-default factor") {
baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()

val bigMemoryOverheadConf = baseConf.clone
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.1)
.set("spark.executor.memory", "64g")
.set("spark.executor.memoryOverhead", "20g")
initDefaultProfile(bigMemoryOverheadConf)
val step = new BasicExecutorFeatureStep(
newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)),
new SecurityManager(bigMemoryOverheadConf), defaultProfile)

val podConfigured = step.configurePod(basePod)
val resource = podConfigured.container.getResources
assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
assert(defaultProfile.executorResources("memoryOverhead").amount === 20480)
assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024)
assert(resource.getRequests.get("memory").getAmount.toLong === math.ceil((64 + 11.6) * 1024))
}

test("SPARK-36075: Check executor pod respects nodeSelector/executorNodeSelector") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
Expand Down