Skip to content

Commit

Permalink
SPARK-1154: Clean up app folders in worker nodes
Browse files Browse the repository at this point in the history
This is a fix for [SPARK-1154](https://issues.apache.org/jira/browse/SPARK-1154).   The issue is that worker nodes fill up with a huge number of app-* folders after some time.  This change adds a periodic cleanup task which asynchronously deletes app directories older than a configurable TTL.

Two new configuration parameters have been introduced:
  spark.worker.cleanup_interval
  spark.worker.app_data_ttl

This change does not include moving the downloads of application jars to a location outside of the work directory.  We will address that if we have time, but that potentially involves caching so it will come either as part of this PR or a separate PR.

Author: Evan Chan <ev@ooyala.com>
Author: Kelvin Chu <kelvinkwchu@yahoo.com>

Closes #288 from velvia/SPARK-1154-cleanup-app-folders and squashes the following commits:

0689995 [Evan Chan] CR from @aarondav - move config, clarify for standalone mode
9f10d96 [Evan Chan] CR from @pwendell - rename configs and add cleanup.enabled
f2f6027 [Evan Chan] CR from @andrewor14
553d8c2 [Kelvin Chu] change the variable name to currentTimeMillis since it actually tracks in seconds
8dc9cb5 [Kelvin Chu] Fixed a bug in Utils.findOldFiles() after merge.
cb52f2b [Kelvin Chu] Change the name of findOldestFiles() to findOldFiles()
72f7d2d [Kelvin Chu] Fix a bug of Utils.findOldestFiles(). file.lastModified is returned in milliseconds.
ad99955 [Kelvin Chu] Add unit test for Utils.findOldestFiles()
dc1a311 [Evan Chan] Don't recompute current time with every new file
e3c408e [Evan Chan] Document the two new settings
b92752b [Evan Chan] SPARK-1154: Add a periodic task to clean up app directories
  • Loading branch information
Evan Chan authored and pwendell committed Apr 7, 2014
1 parent 4106558 commit 1440154
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 4 deletions.
Expand Up @@ -86,6 +86,10 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Expand Up @@ -64,6 +64,12 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
// How often worker will clean up old app folders
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0

Expand Down Expand Up @@ -179,12 +185,28 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
}
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}

case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
Expand Down Expand Up @@ -331,7 +353,6 @@ private[spark] class Worker(
}

private[spark] object Worker {

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Expand Up @@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
}

if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
return false;
return false
} else {
return true;
return true
}
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
}
}

Expand Down
15 changes: 14 additions & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import scala.util.Random

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.nio.{ByteBuffer, ByteOrder}

import com.google.common.base.Charsets
Expand Down Expand Up @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
}
}

26 changes: 26 additions & 0 deletions docs/configuration.md
Expand Up @@ -348,6 +348,32 @@ Apart from these, the following properties are also available, and may be useful
receives no heartbeats.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.enabled</td>
<td>true</td>
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.interval</td>
<td>1800 (30 minutes)</td>
<td>
Controls the interval, in seconds, at which the worker cleans up old application work dirs
on the local machine.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.appDataTtl</td>
<td>7 * 24 * 3600 (7 days)</td>
<td>
The number of seconds to retain application work directories on each worker. This is a Time To Live
and should depend on the amount of available disk space you have. Application logs and jars are
downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space,
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td>spark.akka.frameSize</td>
<td>10</td>
Expand Down

0 comments on commit 1440154

Please sign in to comment.