Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ private[spark] class ExecutorMonitor(
}

override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
if (!client.isExecutorActive(event.blockUpdatedInfo.blockManagerId.executorId)) {
return
}

val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
UNKNOWN_RESOURCE_PROFILE_ID)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
}

test("keeps track of stored blocks for each rdd and split") {
knownExecs ++= Set("1", "2")

monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))

monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
Expand Down Expand Up @@ -234,6 +236,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
}

test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
knownExecs += "1"
// First make sure that blocks on disk are counted when no shuffle service is available.
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY))
Expand Down Expand Up @@ -443,6 +446,22 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}

test("SPARK-37688: ignore SparkListenerBlockUpdated event if executor was not active") {
conf
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())

monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1",
"heartbeats timeout"))
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_AND_DISK))

assert(monitor.executorCount == 0 )
}


private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1
Expand Down