diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4939dab5702a7..4af55d86eed3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -378,6 +378,10 @@ private[spark] class ExecutorMonitor( } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + if (!client.isExecutorActive(event.blockUpdatedInfo.blockManagerId.executorId)) { + return + } + val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 69afdb57ef404..424c9670c10ba 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -169,6 +169,8 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("keeps track of stored blocks for each rdd and split") { + knownExecs ++= Set("1", "2") + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1")) @@ -234,6 +236,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("SPARK-27677: don't track blocks stored on disk when using shuffle service") { + knownExecs += "1" // First make sure that blocks on disk are counted when no shuffle service is available. monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY)) @@ -443,6 +446,22 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline).isEmpty) } + test("SPARK-37688: ignore SparkListenerBlockUpdated event if executor was not active") { + conf + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue) + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) + .set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1", + "heartbeats timeout")) + monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_AND_DISK)) + + assert(monitor.executorCount == 0 ) + } + + private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1 private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1 private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1