Skip to content

Conversation

@YaoRazor
Copy link

@YaoRazor YaoRazor commented Nov 24, 2025

What changes were proposed in this pull request?

Intro

This PR represents Pinterest's work to boost Spark cluster efficiency.
A novel burst-aware memory allocation algorithm, Canon, that partitions part of the cluster memory into fixed and burst segments is proposed in this PR. This approach allows the burst segments to be shared among different pods, improving overall memory utilization.

this PR implements Canon: burst aware memory allocation algorithm for memoryOverhead in Spark. The basic idea is that, given the usage of memoryOverhead is pretty bursty, we can separate memoryOverhead into two parts, fixed part (F) and shard part (S). by using K8S request/limit concept, executor pod memory equals to heap size (H) + F and limit is H + F + S

to calculate F and S, we introduced spark.executor.memoryOverheadBurstyFactor (f) as the control factor, assuming users specified spark.executor.memoryOverhead as O

then

F = O - min{(H + O) * (f - 1), O}

users can use spark.executor.memoryOverheadBursty.enabled to control whether enabling this functionality and use spark.executor.memoryOverheadBurstyFactor to control how aggressive we want to share part of memoryOverhead among different pods.

The effectiveness of this algorithm has been validated through production tests at Pinterest.

Acknowledgement

This code in this PR is mainly implemented by Nan Zhu(@CodingCat) while he was working at Pinterest. The algorithm itself is
based on https://www.vldb.org/pvldb/vol17/p3759-shi.pdf

Why are the changes needed?

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT
Production tests at Pinterest

Was this patch authored or co-authored using generative AI tooling?

No

private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED =
ConfigBuilder("spark.executor.memoryOverheadBursty.enabled")
.doc("Whether to enable memory overhead bursty")
.version("3.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change these versions to 4.2.0?

newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
newMemoryOverheadMiB.toString))
logInfo(s"newAppEnvironment spark properties count:" +
s" ${newAppEnvironment.sparkProperties.size}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logging can be removed

val klass = classOf[ApplicationEnvironmentInfoWrapper]
val currentAppEnvironment = sparkContext._statusStore.store.read(klass, klass.getName()).info
logInfo(s"currentAppEnvironment spark properties count:" +
s" ${currentAppEnvironment.sparkProperties.size}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logging can be removed

sparkContext._statusStore.store.write(new ApplicationEnvironmentInfoWrapper(
newAppEnvironment))
// we have to post full information here, but need ensure that the downstream pipeline can
// consume duplicate entries properly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment can also be removed

}
}
}
logInfo(s"posted memoryoverhead update event")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be removed

baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be removed

baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be removed

baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be removed

baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
val basePod = SparkPod.initialPod()
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be removed

@CodingCat
Copy link
Contributor

CodingCat commented Nov 24, 2025

thank you @YaoRazor for open sourcing it, we have deployed Canon to 1000s of machines in PINS and hopefully it will benefit broad community as well

and , most importantly, really appreciate the innovation from the bytedance team ... this algorithm is implemented based on their paper of https://www.vldb.org/pvldb/vol17/p3759-shi.pdf

@YaoRazor would you mind marking this PR as ready to review?

Hi, @sunchao , as we have discussed offline, would you mind giving it a review?

@YaoRazor YaoRazor marked this pull request as ready for review November 24, 2025 21:53
@holdenk
Copy link
Contributor

holdenk commented Nov 24, 2025

Oh this is interesting :)

@holdenk
Copy link
Contributor

holdenk commented Nov 24, 2025

So this probably requires an SPIP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants