From 5a357e6956385dcd1ba7c5c322294d275e5144ee Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 17 Nov 2021 16:53:23 +0800 Subject: [PATCH 1/4] [SPARK-37355]Avoid Block Manager registrations when Executor is shutting down --- .../org/apache/spark/storage/BlockManager.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9ebf26b6120de..d5c70ca981493 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -620,11 +620,15 @@ private[spark] class BlockManager( * Note that this method must be called without any BlockInfo locks held. */ def reregister(): Unit = { - // TODO: We might need to rate limit re-registering. - logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, - maxOffHeapMemory, storageEndpoint) - reportAllBlocks() + SparkContext.getActive.map { context => + if (!context.stopped.get()) { + // TODO: We might need to rate limit re-registering. + logInfo(s"BlockManager $blockManagerId re-registering with master") + master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, + maxOnHeapMemory, maxOffHeapMemory, storageEndpoint) + reportAllBlocks() + } + } } /** From 38ec74818db85e69cfa84668e08de8e2f753c523 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 17 Nov 2021 17:44:01 +0800 Subject: [PATCH 2/4] [SPARK-37355]Avoid Block Manager registrations when Executor is shutting down --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2cb281d4682b0..5e09de47b6a16 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.File import java.nio.ByteBuffer import java.nio.file.Files +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable @@ -663,6 +664,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("reregistration on block update") { + when(sc.stopped).thenReturn(new AtomicBoolean(false)) + SparkContext.setActiveContext(sc) val store = makeBlockManager(2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -678,6 +681,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") assert(master.getLocations("a2").size > 0, "master was not told about a2") + SparkContext.clearActiveContext() } test("reregistration doesn't dead lock") { From a2288a9d03064dddb9643a721f3be91b4920065c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 13 Dec 2021 11:37:23 +0800 Subject: [PATCH 3/4] [SPARK-37355]Avoid Block Manager registrations when Executor is shutting down --- .../org/apache/spark/storage/BlockManager.scala | 14 ++++++-------- .../apache/spark/storage/BlockManagerSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d5c70ca981493..4ddeb19c89085 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -620,14 +620,12 @@ private[spark] class BlockManager( * Note that this method must be called without any BlockInfo locks held. */ def reregister(): Unit = { - SparkContext.getActive.map { context => - if (!context.stopped.get()) { - // TODO: We might need to rate limit re-registering. - logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, - maxOnHeapMemory, maxOffHeapMemory, storageEndpoint) - reportAllBlocks() - } + if (!SparkEnv.get.isStopped) { + // TODO: We might need to rate limit re-registering. + logInfo(s"BlockManager $blockManagerId re-registering with master") + master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, + maxOnHeapMemory, maxOffHeapMemory, storageEndpoint) + reportAllBlocks() } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5e09de47b6a16..34e62990cf722 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -679,8 +679,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister() - assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") - assert(master.getLocations("a2").size > 0, "master was not told about a2") + assert(master.getLocations("a1").size == 0, "a1 was not reregistered with master") + assert(master.getLocations("a2").size == 0, "master was not told about a2") SparkContext.clearActiveContext() } From 41fdbbcce56e535d8be887bb5e1cf18bed1576e2 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 13 Dec 2021 15:47:24 +0800 Subject: [PATCH 4/4] [SPARK-37355]Avoid Block Manager registrations when Executor is shutting down --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 34e62990cf722..117655b1fb8ac 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import java.io.File import java.nio.ByteBuffer import java.nio.file.Files -import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable @@ -664,8 +663,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("reregistration on block update") { - when(sc.stopped).thenReturn(new AtomicBoolean(false)) - SparkContext.setActiveContext(sc) val store = makeBlockManager(2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -681,7 +678,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a1").size == 0, "a1 was not reregistered with master") assert(master.getLocations("a2").size == 0, "master was not told about a2") - SparkContext.clearActiveContext() } test("reregistration doesn't dead lock") {