Skip to content

Commit

Permalink
[SPARK-27012][CORE] Storage tab shows rdd details even after executor…
Browse files Browse the repository at this point in the history
… ended

## What changes were proposed in this pull request?

After we cache a table, we can see its details in Storage Tab of spark UI. If the executor has shutdown ( graceful shutdown/ Dynamic executor scenario) UI still shows the rdd as cached and when we click the link it throws error. This is because on executor remove event, we fail to adjust rdd partition details  org.apache.spark.status.AppStatusListener#onExecutorRemoved

## How was this patch tested?

Have tested this fix in UI manually
Edit: Added UT

Closes #23920 from ajithme/cachestorage.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
ajithme authored and Marcelo Vanzin committed Mar 5, 2019
1 parent b99610e commit 6207360
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
Expand Up @@ -211,6 +211,30 @@ private[spark] class AppStatusListener(
update(rdd, now)
}
}
// Remove all RDD partitions that reference the removed executor
liveRDDs.values.foreach { rdd =>
rdd.getPartitions.values
.filter(_.executors.contains(event.executorId))
.foreach { partition =>
if (partition.executors.length == 1) {
rdd.removePartition(partition.blockName)
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, partition.memoryUsed * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, partition.diskUsed * -1)
} else {
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
(partition.diskUsed / partition.executors.length) * -1)
partition.update(partition.executors
.filter(!_.equals(event.executorId)), rdd.storageLevel,
addDeltaToValue(partition.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1),
addDeltaToValue(partition.diskUsed,
(partition.diskUsed / partition.executors.length) * -1))
}
}
update(rdd, now)
}
if (isExecutorActiveForLiveStages(exec)) {
// the executor was running for a currently active stage, so save it for now in
// deadExecutors, and remove when there are no active stages overlapping with the
Expand Down
Expand Up @@ -1520,6 +1520,106 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("storage information on executor lost/down") {
val listener = new AppStatusListener(store, conf, true)
val maxMemory = 42L

// Register a couple of block managers.
val bm1 = BlockManagerId("1", "1.example.com", 42)
val bm2 = BlockManagerId("2", "2.example.com", 84)
Seq(bm1, bm2).foreach { bm =>
listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory))
}

val rdd1b1 = RddBlock(1, 1, 1L, 2L)
val rdd1b2 = RddBlock(1, 2, 3L, 4L)
val level = StorageLevel.MEMORY_AND_DISK

// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

// Add partition 1 replicated on two block managers.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))

listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm2, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))

// Add a second partition only to bm 1.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b2.blockId, level, rdd1b2.memSize, rdd1b2.diskSize)))

check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === 2 * rdd1b1.memSize + rdd1b2.memSize)
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
assert(wrapper.info.dataDistribution.get.size === 2L)
assert(wrapper.info.partitions.get.size === 2L)

val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get
assert(dist.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(dist.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)

val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part1.storageLevel === level.description)
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))

val part2 = wrapper.info.partitions.get.find(_.blockName === rdd1b2.blockId.name).get
assert(part2.storageLevel === level.description)
assert(part2.memoryUsed === rdd1b2.memSize)
assert(part2.diskUsed === rdd1b2.diskSize)
assert(part2.executors === Seq(bm1.executorId))
}

check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 2L)
assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}

// Remove Executor 1.
listener.onExecutorRemoved(createExecutorRemovedEvent(1))

// check that partition info now contains only details about what is remaining in bm2
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 1L)
assert(wrapper.info.memoryUsed === rdd1b1.memSize)
assert(wrapper.info.diskUsed === rdd1b1.diskSize)
assert(wrapper.info.dataDistribution.get.size === 1L)
assert(wrapper.info.partitions.get.size === 1L)

val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get
assert(dist.memoryUsed === rdd1b1.memSize)
assert(dist.diskUsed === rdd1b1.diskSize)
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)

val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part.storageLevel === level.description)
assert(part.memoryUsed === rdd1b1.memSize)
assert(part.diskUsed === rdd1b1.diskSize)
assert(part.executors === Seq(bm2.executorId))
}

// Remove Executor 2.
listener.onExecutorRemoved(createExecutorRemovedEvent(2))
// Check that storage cost is zero as both exec are down
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 0)
assert(wrapper.info.memoryUsed === 0)
assert(wrapper.info.diskUsed === 0)
assert(wrapper.info.dataDistribution.isEmpty)
assert(wrapper.info.partitions.get.isEmpty)
}
}


private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)

private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
Expand Down

0 comments on commit 6207360

Please sign in to comment.