Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37355][CORE]Avoid Block Manager registrations when Executor is shutting down #34629

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,13 @@ private[spark] class BlockManager(
* Note that this method must be called without any BlockInfo locks held.
*/
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
if (!SparkEnv.get.isStopped) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...so, let's assume such a scenario: driver command a shutdown request to the executor. Before the executor receives the shutdown request, it finds that it itself has been removed from the driver when reporting a block to the driver. Then, the executor starts to reregister itself. Since the executor hasn't received shutdown request, which means SparkEnv.get.isStopped=false, it successfully reregisters with the driver. Soon, the executor receives the shutdown request and exits itself.

So, this change doesn't fix the issue thoroughly, right? @wankunde

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for you review.

Yes, this PR can not fix the issue above, but I also think that adding !SparkEnv.get.isStopped constraint is helpful as I have found several executors re-register when they are shutting down by driver.

I very agree to fix this issue in HeartbeatReceiver and this PR can be closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can either update in this PR or in a separate PR as you like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the PR description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean you can update the fix with HeartbeatReceiver in this PR.

// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
maxOnHeapMemory, maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.File
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -663,6 +664,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("reregistration on block update") {
when(sc.stopped).thenReturn(new AtomicBoolean(false))
SparkContext.setActiveContext(sc)
val store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
Expand All @@ -676,8 +679,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()

assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
assert(master.getLocations("a1").size == 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size == 0, "master was not told about a2")
SparkContext.clearActiveContext()
}

test("reregistration doesn't dead lock") {
Expand Down