-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded #45052
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, this is a documented API, @sunchao .
I believe we can add a new contractor for Driver-plugin use case only.
I'll add some tests later. Marking as a draft for now to run through all existing tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look, and it looks fine to me overall.
Please do ping me when it is ready for review though !
Sure, thanks @mridulm in advance! |
@@ -77,6 +76,12 @@ class SparkEnv ( | |||
|
|||
def shuffleManager: ShuffleManager = _shuffleManager | |||
|
|||
// We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded | |||
// to allow the plugin to overwrite memory configurations | |||
private var _memoryManager: MemoryManager = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move the definition along with _shuffleManager
above ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure will do.
*/ | ||
private[spark] class BlockManager( | ||
val executorId: String, | ||
rpcEnv: RpcEnv, | ||
val master: BlockManagerMaster, | ||
val serializerManager: SerializerManager, | ||
val conf: SparkConf, | ||
memoryManager: MemoryManager, | ||
var memoryManager: MemoryManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to follow the same pattern as what shuffleManager
does here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that at the beginning, but found out in certain cases there may be race conditions in:
private lazy val _memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)
Since a different thread can call SparkEnv.set
right after the memoryManger
is updated in the current SparkEnv
. As result, the memoryManager
could be null. This is revealed in JobCancellationSuite
.
The current approach makes the memoryManager
a mutable field and updated later when the driver plugin is loaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I follow the corner case - can you point me to which test is causing this issue ? Thanks !
BlockManager
being used would be associated with the corresponding SparkEnv
- and if SparkEnv
is being mutated, the new env is what we should be referencing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. It is "job group with interruption". I think it is flaky though and doesn't always happen. When I tried this locally it doesn't always reproduce. The job link: https://github.com/sunchao/spark/actions/runs/7923522243/job/21637267400
[info] - job group with interruption *** FAILED *** (34 milliseconds)
[info] java.lang.NullPointerException: Cannot invoke "org.apache.spark.memory.MemoryManager.maxOnHeapStorageMemory()" because the return value of "org.apache.spark.storage.BlockManager.memoryManager()" is null
[info] at org.apache.spark.storage.BlockManager.maxOnHeapMemory$lzycompute(BlockManager.scala:243)
[info] at org.apache.spark.storage.BlockManager.maxOnHeapMemory(BlockManager.scala:243)
[info] at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:565)
[info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:633)
[info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:159)
[info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:172)
[info] at org.apache.spark.JobCancellationSuite.$anonfun$new$41(JobCancellationSuite.scala:397)
[info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be able to change shuffleManager
in the same manner to avoid the potential concurrency issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to look more - I dont this my hypothesis is the RC here ... even though _env.blockManager.initialize
happens after _taskScheduler.start()
, _env.initializeMemoryManager
happens before and should have initialized the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stack trace of the NPE that we saw earlier was part of spark context initialization ... not an access from task, right ?
Thanks @mridulm for checking! I think that stack trace doesn't reveal the root cause of the issue. I added a bunch of debugging messages in the code and found out the task that was causing the issue:
setting active env to org.apache.spark.SparkEnv@5ab3ee8b in pool-1-thread-1-ScalaTest-running-JobCancellationSuite
active env = org.apache.spark.SparkEnv@5ab3ee8b, thread = Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
org.apache.spark.storage.BlockManager.memoryManager$lzycompute(BlockManager.scala:210)
org.apache.spark.storage.BlockManager.memoryManager(BlockManager.scala:204)
org.apache.spark.storage.BlockManager.memoryStore$lzycompute(BlockManager.scala:248)
org.apache.spark.storage.BlockManager.memoryStore(BlockManager.scala:247)
org.apache.spark.scheduler.Task.$anonfun$run$3(Task.scala:146)
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1287)
org.apache.spark.scheduler.Task.run(Task.scala:144)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:633)
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:636)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:840)
memory manager of org.apache.spark.SparkEnv@5ab3ee8b is null, _memoryManager = null, thread = Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
set memory manager for org.apache.spark.SparkEnv@5ab3ee8b, threadName = pool-1-thread-1-ScalaTest-running-JobCancellationSuite
java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
org.apache.spark.SparkContext.<init>(SparkContext.scala:141)
org.apache.spark.JobCancellationSuite.$anonfun$new$45(JobCancellationSuite.scala:430)
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
org.scalatest.Transformer.apply(Transformer.scala:22)
org.scalatest.Transformer.apply(Transformer.scala:20)
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
The "setting active env" and "set memory manager" messages are logged in SparkContext
initialization, while the "active env =" and "memory manager of " are logged in BlockManager
when trying to access the memoryManager
. The first stack trace shows it is from the separate worker thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the line number may not match since I added several changes in my local repo for debugging purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao Can you please take a look at this ?
It should fix the issue we are discussing - the test is for illustration purpose only, please do adapt and clean it up :-)
Essentially it is a minor modification to your fix - the issue is that blockManager
referenced in task cleanup in finally is incorrect - as you had fixed.
The only change I introduced is to not need this to be passed in at a Task level - but simply grab it at task start time - and also added a test which validates this is indeed the issue.
This also means that, given the risk with blockManager
being in potentially inconsistent state until initialization is complete - we have to add some documentation to it - so that this buggy pattern does not get introduced in future again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mridulm . I like your solution which is simpler. Saving the blockManager
at the beginning of Task.run
should be sufficient. Let me adapt the code and the test case in this PR.
+CC @dongjoon-hyun and @tgravescs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM for AS-IS PR from my side.
For the on-going discussion, feel free to continue.
*/ | ||
private[spark] class BlockManager( | ||
val executorId: String, | ||
rpcEnv: RpcEnv, | ||
val master: BlockManagerMaster, | ||
val serializerManager: SerializerManager, | ||
val conf: SparkConf, | ||
memoryManager: MemoryManager, | ||
var memoryManager: MemoryManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing how this is failing, can you clarify? The blockmanager initialize is called on line 632 after the memory manager is initialized in the block manager, so how do we get a null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for working on this @sunchao and going over the various corner cases !
+1 |
Thanks @mridulm @dongjoon-hyun @tgravescs for the review! merged to master. |
…plugin is loaded ### What changes were proposed in this pull request? This changes the initialization of `SparkEnv.memoryManager` to after the `DriverPlugin` is loaded, to allow the plugin to customize memory related configurations. A minor fix has been made to `Task` to make sure that it uses the same `BlockManager` through out the task execution. Previous a different `BlockManager` could be used in some corner cases. Also added a test for the fix. ### Why are the changes needed? Today, there is no way for a custom `DriverPlugin` to override memory configurations such as `spark.executor.memory`, `spark.executor.memoryOverhead`, `spark.memory.offheap.size` etc This is because the memory manager is initialized before `DriverPlugin` is loaded. A similar change has been made to `shuffleManager` in apache#43627. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Also added new tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45052 from sunchao/SPARK-46947. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apache.org>
…plugin is loaded ### What changes were proposed in this pull request? This changes the initialization of `SparkEnv.memoryManager` to after the `DriverPlugin` is loaded, to allow the plugin to customize memory related configurations. A minor fix has been made to `Task` to make sure that it uses the same `BlockManager` through out the task execution. Previous a different `BlockManager` could be used in some corner cases. Also added a test for the fix. ### Why are the changes needed? Today, there is no way for a custom `DriverPlugin` to override memory configurations such as `spark.executor.memory`, `spark.executor.memoryOverhead`, `spark.memory.offheap.size` etc This is because the memory manager is initialized before `DriverPlugin` is loaded. A similar change has been made to `shuffleManager` in apache#43627. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Also added new tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45052 from sunchao/SPARK-46947. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apache.org>
releaseTaskSem.acquire() | ||
} catch { | ||
case _: InterruptedException => | ||
// ignore thread interruption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao What's the purpose to leave a running task from the old SparkContext
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see it's probably for the correct usage of BlockManager for the local test with SparkContext restart.
What changes were proposed in this pull request?
This changes the initialization of
SparkEnv.memoryManager
to after theDriverPlugin
is loaded, to allow the plugin to customize memory related configurations.A minor fix has been made to
Task
to make sure that it uses the sameBlockManager
through out the task execution. Previous a differentBlockManager
could be used in some corner cases. Also added a test for the fix.Why are the changes needed?
Today, there is no way for a custom
DriverPlugin
to override memory configurations such asspark.executor.memory
,spark.executor.memoryOverhead
,spark.memory.offheap.size
etc This is because the memory manager is initialized beforeDriverPlugin
is loaded.A similar change has been made to
shuffleManager
in #43627.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests. Also added new tests.
Was this patch authored or co-authored using generative AI tooling?
No