From 1dd820a98e98daa5178f3407babdd11944f6a21b Mon Sep 17 00:00:00 2001 From: Deependra Patel Date: Tue, 8 Aug 2023 16:10:17 +0530 Subject: [PATCH] [SPARK-44704][CORE] Cleanup shuffle files from host node after migration due to graceful decommissioning --- .../scala/org/apache/spark/storage/BlockManager.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 05d57c67576a5..5089a207d7ec3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -43,7 +43,7 @@ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.Logging import org.apache.spark.internal.config -import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} +import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, STORAGE_DECOMMISSION_ENABLED, STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ @@ -193,9 +193,13 @@ private[spark] class BlockManager( private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) val diskBlockManager = { - // Only perform cleanup if an external service is not serving our shuffle files. + // Perform cleanup if an external service is not serving our shuffle files. Also delete the + // files after migration to free up disk space. val deleteFilesOnStop = - !externalShuffleServiceEnabled || isDriver + !externalShuffleServiceEnabled || isDriver || + (conf.get(STORAGE_DECOMMISSION_ENABLED) && + conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) && + conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) new DiskBlockManager(conf, deleteFilesOnStop = deleteFilesOnStop, isDriver = isDriver) }