-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34949][CORE][3.0] Prevent BlockManager reregister when Executor is shutting down #33770
Conversation
…shutting down This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`. This change is required since Spark reports executors as active, even they are removed. I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1. No Added a new test. Following are the logs of the executor(Id:303) which re-registers `BlockManager` ``` 21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076 21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076) 21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache 21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3 21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps) 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB) 21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations 21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks 21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps) 21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms 21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver . . . 21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat 21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master 21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master. 21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared 21/04/02 21:34:16 INFO BlockManager: BlockManager stopped 21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv 21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent 21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent 21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called 21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea ``` Closes apache#32043 from sumeetgajjar/SPARK-34949. Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Jenkins ok to test |
LGTM pending CI. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #142625 has finished for PR 33770 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
…r is shutting down This PR backports #32043 to branch-3.0 <hr> ### What changes were proposed in this pull request? This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`. ### Why are the changes needed? This change is required since Spark reports executors as active, even they are removed. I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test. ## Logs Following are the logs of the executor(Id:303) which re-registers `BlockManager` ``` 21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076 21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076) 21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache 21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3 21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps) 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB) 21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations 21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks 21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps) 21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms 21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver . . . 21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat 21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master 21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master. 21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared 21/04/02 21:34:16 INFO BlockManager: BlockManager stopped 21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv 21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent 21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent 21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called 21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea ``` Closes #33770 from sumeetgajjar/SPARK-34949-br-3.0. Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Merged to branch-3.0. |
Thank you @dongjoon-hyun and @holdenk for taking a look at this PR. |
This PR backports #32043 to branch-3.0
What changes were proposed in this pull request?
This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking
executorShutdown
before callingenv.blockManager.reregister()
.Why are the changes needed?
This change is required since Spark reports executors as active, even they are removed.
I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to
spark.dynamicAllocation.executorIdleTimeout
, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. spark.sparkContext.statusTracker.getExecutorInfos.length also returned a value greater than 1.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a new test.
Logs
Following are the logs of the executor(Id:303) which re-registers
BlockManager