From 8a333d2da859fd593bda183413630bc3757529c9 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Thu, 31 Mar 2016 12:04:42 -0700 Subject: [PATCH] [SPARK-14243][CORE] update task metrics when removing blocks ## What changes were proposed in this pull request? This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses` ## How was this patch tested? test("updated block statuses") in BlockManagerSuite.scala Author: jeanlyn Closes #12091 from jeanlyn/updateBlock. --- .../scala/org/apache/spark/storage/BlockManager.scala | 7 +++++-- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0c7763f236647..3014cafc28bd2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1264,9 +1264,12 @@ private[spark] class BlockManager( "the disk, memory, or external block store") } blockInfoManager.removeBlock(blockId) + val removeBlockStatus = getCurrentBlockStatus(blockId, info) if (tellMaster && info.tellMaster) { - val status = getCurrentBlockStatus(blockId, info) - reportBlockStatus(blockId, info, status) + reportBlockStatus(blockId, info, removeBlockStatus) + } + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus))) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6fc32cb30a3b7..9f3a775654096 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!store.diskStore.contains("list3"), "list3 was in disk store") assert(!store.diskStore.contains("list4"), "list4 was in disk store") assert(!store.diskStore.contains("list5"), "list5 was in disk store") + + // remove block - list2 should be removed from disk + val updatedBlocks6 = getUpdatedBlocks { + store.removeBlock( + "list2", tellMaster = true) + } + assert(updatedBlocks6.size === 1) + assert(updatedBlocks6.head._1 === TestBlockId("list2")) + assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE) + assert(!store.diskStore.contains("list2"), "list2 was in disk store") } test("query block statuses") {