New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen #38702
Conversation
Can we change the JIRA format in the title such as "[SPARK-41187][CORE] ...". Check the Spark contribution guide also would helpful! |
Can one of the admins verify this patch? |
cc @cloud-fan |
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
0ff13b8
to
0476a76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me.
+CC @Ngone51
Btw, do you also want to remove the if (event.taskInfo == null) {
check in beginning of onTaskEnd
?
Make it a precondition check ? Preconditions.checkNotNull(event.taskInfo)
Yes, it can be change to a precondition check. Maybe I can change it in a new pr after test. |
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter | |||
checkInfoPopulated(listener, logUrlMap, processId) | |||
} | |||
|
|||
test(s"SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test(s"SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") { | |
test("SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") { |
@mridulm Since the latest PR fix doesn't involve the metrics, I think we can skip this removal to keep the current changes as much simpler as possible. We can back to it when working on metrics stuff. |
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
Outdated
Show resolved
Hide resolved
634eefe
to
ea1307c
Compare
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Outdated
Show resolved
Hide resolved
…ExecutorLost happen Co-authored-by: wuyi <yi.wu@databricks.com>
ea1307c
to
e8e2318
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @wineternity and all.
my pleasure |
… ExecutorLost happen ### What changes were proposed in this pull request? Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in AppStatusListener to avoid memory leak ### Why are the changes needed? For a long running spark thriftserver, LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly. For a every task, actually always sent out a `SparkListenerTaskStart` event and a `SparkListenerTaskEnd` event, they are always pairs. But in a executor lost situation, it send out event like following steps. a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a task end event with reason `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr. The processing of the `Resubmitted` task end event in AppStatusListener can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks will make the stage always remains in the live stage list as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see isExecutorActiveForLiveStages). Since this kind of `SparkListenerTaskEnd` is useless here, we simply ignore it. Check [SPARK-41187](https://issues.apache.org/jira/browse/SPARK-41187) for evidences. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT Added Test in thriftserver env ### The way to reproduce I try to reproduce it in spark shell, but it is a little bit handy 1. start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for convient ` bin/spark-shell --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"` 2. run a job with shuffle `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()` 3. After some ShuffleMapTask finished, kill one or two executor to let tasks resubmitted 4. check by heap dump or debug or log Closes #38702 from wineternity/SPARK-41187. Authored-by: yuanyimeng <yuanyimeng@youzan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 7e7bc94) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Merged to master and brach-3.3 |
… ExecutorLost happen ### What changes were proposed in this pull request? Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in AppStatusListener to avoid memory leak ### Why are the changes needed? For a long running spark thriftserver, LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly. For a every task, actually always sent out a `SparkListenerTaskStart` event and a `SparkListenerTaskEnd` event, they are always pairs. But in a executor lost situation, it send out event like following steps. a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a task end event with reason `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr. The processing of the `Resubmitted` task end event in AppStatusListener can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks will make the stage always remains in the live stage list as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see isExecutorActiveForLiveStages). Since this kind of `SparkListenerTaskEnd` is useless here, we simply ignore it. Check [SPARK-41187](https://issues.apache.org/jira/browse/SPARK-41187) for evidences. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT Added Test in thriftserver env ### The way to reproduce I try to reproduce it in spark shell, but it is a little bit handy 1. start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for convient ` bin/spark-shell --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"` 2. run a job with shuffle `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()` 3. After some ShuffleMapTask finished, kill one or two executor to let tasks resubmitted 4. check by heap dump or debug or log Closes apache#38702 from wineternity/SPARK-41187. Authored-by: yuanyimeng <yuanyimeng@youzan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in AppStatusListener to avoid memory leak
Why are the changes needed?
For a long running spark thriftserver, LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly.
For a every task, actually always sent out a
SparkListenerTaskStart
event and aSparkListenerTaskEnd
event, they are always pairs. But in a executor lost situation, it send out event like following steps.a) There was a pair of task start and task end event which were fired for the task (let us call it Tr)
b) When executor which ran Tr was lost, while stage is still running, a task end event with reason
Resubmitted
is fired for Tr.c) Subsequently, a new task start and task end will be fired for the retry of Tr.
The processing of the
Resubmitted
task end event in AppStatusListener can lead to negativeLiveStage.activeTasks
since there's no correspondingSparkListenerTaskStart
event for each of them. The negative activeTasks will make the stage always remains in the live stage list as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see isExecutorActiveForLiveStages). Since this kind ofSparkListenerTaskEnd
is useless here, we simply ignore it.Check SPARK-41187 for evidences.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New UT Added
Test in thriftserver env
The way to reproduce
I try to reproduce it in spark shell, but it is a little bit handy
bin/spark-shell --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"
sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()