Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37355][CORE]Avoid Block Manager registrations when Executor is shutting down #34629

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,13 @@ private[spark] class BlockManager(
* Note that this method must be called without any BlockInfo locks held.
*/
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
if (!SparkEnv.get.isStopped) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...so, let's assume such a scenario: driver command a shutdown request to the executor. Before the executor receives the shutdown request, it finds that it itself has been removed from the driver when reporting a block to the driver. Then, the executor starts to reregister itself. Since the executor hasn't received shutdown request, which means SparkEnv.get.isStopped=false, it successfully reregisters with the driver. Soon, the executor receives the shutdown request and exits itself.

So, this change doesn't fix the issue thoroughly, right? @wankunde

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for you review.

Yes, this PR can not fix the issue above, but I also think that adding !SparkEnv.get.isStopped constraint is helpful as I have found several executors re-register when they are shutting down by driver.

I very agree to fix this issue in HeartbeatReceiver and this PR can be closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can either update in this PR or in a separate PR as you like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the PR description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean you can update the fix with HeartbeatReceiver in this PR.

// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
maxOnHeapMemory, maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()

assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
assert(master.getLocations("a1").size == 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size == 0, "master was not told about a2")
}

test("reregistration doesn't dead lock") {
Expand Down