Skip to content

Commit

Permalink
Move local dirs override logic into Utils; fix bugs:
Browse files Browse the repository at this point in the history
Now, the logic for determining the precedence of the different
configuration options is in Utils.getOrCreateLocalRootDirs().

DiskBlockManager now accepts a SparkConf rather than a list of root
directories and I’ve updated other tests to reflect this.
  • Loading branch information
JoshRosen committed Aug 17, 2014
1 parent b2c4736 commit 3e92d44
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 39 deletions.
25 changes: 0 additions & 25 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ private[spark] class Executor(
val conf = new SparkConf(true)
conf.setAll(properties)

// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE")))) {
conf.set("spark.local.dir", getYarnLocalDirs(conf))
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.set("spark.local.dir", conf.getenv("SPARK_LOCAL_DIRS"))
}

if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
Expand Down Expand Up @@ -134,21 +124,6 @@ private[spark] class Executor(
threadPool.shutdown()
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private[spark] class BlockManager(

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.Utils
Expand All @@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
extends PathResolver with Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Expand All @@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
val localDirs: Array[File] = createLocalDirs()
val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand Down Expand Up @@ -131,10 +132,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
(blockId, getFile(blockId))
}

private def createLocalDirs(): Array[File] = {
logDebug(s"Creating local directories at root dirs '$rootDirs'")
private def createLocalDirs(conf: SparkConf): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").flatMap { rootDir =>
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
Expand Down
50 changes: 48 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,56 @@ private[spark] object Utils extends Logging {
/**
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
* multiple paths. If the SPARK_LOCAL_DIRS environment variable is set, then this will return
* a directory from that variable.
*/
def getLocalDir(conf: SparkConf): String = {
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
getOrCreateLocalRootDirs(conf)(0)
}

/**
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
* and returns only the directories that exist / could be created.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
val isYarn = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE")))
val confValue = if (isYarn) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available.
getYarnLocalDirs(conf)
} else {
Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
}
val rootDirs = confValue.split(',')
logDebug(s"Getting/creating local root dirs at '$rootDirs'")

rootDirs.flatMap { rootDir =>
val localDir: File = new File(rootDir)
val foundLocalDir = localDir.exists || localDir.mkdirs()
if (!foundLocalDir) {
logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
None
} else {
Some(rootDir)
}
}
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("java.io.tmpdir"))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)

when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
}

override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
shuffleBlockManager.idToSegmentMap.clear()
}

Expand Down

0 comments on commit 3e92d44

Please sign in to comment.