Skip to content

Commit

Permalink
[SPARK-36036][CORE] Fix cleanup of DownloadFile resources
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

There was a regression since Spark started storing large remote files on disk (https://issues.apache.org/jira/browse/SPARK-22062). In 2018 a refactoring introduced a hidden reference preventing the auto-deletion of the files (a97001d#diff-42a673b8fa5f2b999371dc97a5de7ebd2c2ec19447353d39efb7e8ebc012fe32L1677). Since then all underlying files of DownloadFile instances are kept on disk for the duration of the Spark application which sometimes results in "no space left" errors.

`ReferenceWithCleanup` class uses `file` (the `DownloadFile`) in `cleanUp(): Unit` method so it has to keep a reference to it which prevents it from being garbage-collected.
```
def cleanUp(): Unit = {
  logDebug(s"Clean up file $filePath")

  if (!file.delete()) {                                      <--- here
    logDebug(s"Fail to delete file $filePath")
  }
}
```

### Why are the changes needed?

Long-running Spark applications require freeing resources when they are not needed anymore, and iterative algorithms could use all the disk space quickly too.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a test in BlockManagerSuite and tested manually.

Closes #33251 from dtarima/fix-download-file-cleanup.

Authored-by: Denis Tarima <dtarima@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
dtarima authored and srowen committed Jul 11, 2021
1 parent 83b3b75 commit cfcd094
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
Expand Up @@ -41,7 +41,7 @@ public interface DownloadFile {
DownloadFileWritableChannel openForWriting() throws IOException;

/**
* The path of the file, intended only for debug purposes.
* The absolute path of the file.
*/
String path();
}
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -255,7 +255,9 @@ private[spark] class BlockManager(
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
private[storage] val remoteBlockTempFileManager =
new BlockManager.RemoteBlockDownloadFileManager(this)
new BlockManager.RemoteBlockDownloadFileManager(
this,
securityManager.getIOEncryptionKey())
private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

var hostLocalDirManager: Option[HostLocalDirManager] = None
Expand Down Expand Up @@ -1998,22 +2000,23 @@ private[spark] object BlockManager {
metricRegistry.registerAll(metricSet)
}

class RemoteBlockDownloadFileManager(blockManager: BlockManager)
class RemoteBlockDownloadFileManager(
blockManager: BlockManager,
encryptionKey: Option[Array[Byte]])
extends DownloadFileManager with Logging {
// lazy because SparkEnv is set after this
lazy val encryptionKey = SparkEnv.get.securityManager.getIOEncryptionKey()

private class ReferenceWithCleanup(
file: DownloadFile,
referenceQueue: JReferenceQueue[DownloadFile]
) extends WeakReference[DownloadFile](file, referenceQueue) {

// we cannot use `file.delete()` here otherwise it won't be garbage-collected
val filePath = file.path()

def cleanUp(): Unit = {
logDebug(s"Clean up file $filePath")

if (!file.delete()) {
if (!new File(filePath).delete()) {
logDebug(s"Fail to delete file $filePath")
}
}
Expand Down
Expand Up @@ -52,6 +52,7 @@ import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport
import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, MapStatus, MergeStatus, SparkListenerBlockUpdated}
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
Expand Down Expand Up @@ -305,6 +306,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.updateBlockInfo(bm2Id, RDDBlockId(0, 1), StorageLevel.MEMORY_ONLY, 100, 0)
}

test("SPARK-36036: make sure temporary download files are deleted") {
val store = makeBlockManager(8000, "executor")

def createAndRegisterTempFileForDeletion(): String = {
val transportConf = new TransportConf("test", MapConfigProvider.EMPTY)
val tempDownloadFile = store.remoteBlockTempFileManager.createTempFile(transportConf)

tempDownloadFile.openForWriting().close()
assert(new File(tempDownloadFile.path()).exists(), "The file has been created")

val registered = store.remoteBlockTempFileManager.registerTempFileToClean(tempDownloadFile)
assert(registered, "The file has been successfully registered for auto clean up")

// tempDownloadFile and the channel for writing are local to the function so the references
// are going to be eliminated on exit
tempDownloadFile.path()
}

val filePath = createAndRegisterTempFileForDeletion()

val numberOfTries = 100 // try increasing if the test starts to behave flaky
val fileHasBeenDeleted = (1 to numberOfTries).exists { tryNo =>
// Unless -XX:-DisableExplicitGC is set it works in Hotspot JVM
System.gc()
Thread.sleep(tryNo)
val fileStillExists = new File(filePath).exists()
!fileStillExists
}

assert(fileHasBeenDeleted,
s"The file was supposed to be auto deleted (GC hinted $numberOfTries times)")
}

test("SPARK-32091: count failures from active executors when remove rdd/broadcast/shuffle") {
setupBlockManagerMasterWithBlocks(false)
// fail because bm2 will timeout and it's not lost anymore
Expand Down

0 comments on commit cfcd094

Please sign in to comment.