Skip to content

Commit

Permalink
[SPARK-11251] Fix page size calculation in local mode
Browse files Browse the repository at this point in the history
```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()

Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```

Author: Andrew Or <andrew@databricks.com>

Closes #9209 from andrewor14/fix-local-page-size.

(cherry picked from commit 34e71c6)
Signed-off-by: Reynold Xin <rxin@databricks.com>
  • Loading branch information
Andrew Or authored and rxin committed Oct 22, 2015
1 parent e405c2a commit a76cf51
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
48 changes: 35 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

private[spark] def env: SparkEnv = _env
Expand Down Expand Up @@ -2547,25 +2547,29 @@ object SparkContext extends Logging {
res
}

/**
* The number of driver cores to use for execution in local mode, 0 otherwise.
*/
private[spark] def numDriverCores(master: String): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
master match {
case "local" => 1
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => 0 // driver is not used for execution
}
}

/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster by mesos:// or zk:// url
val MESOS_REGEX = """(mesos|zk)://.*""".r
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
import SparkMasterRegex._

// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
Expand Down Expand Up @@ -2706,6 +2710,24 @@ object SparkContext extends Logging {
}
}

/**
* A collection of regexes for extracting information from the master string.
*/
private object SparkMasterRegex {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster by mesos:// or zk:// url
val MESOS_REGEX = """(mesos|zk)://.*""".r
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Expand Up @@ -185,6 +185,7 @@ object SparkEnv extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
Expand All @@ -197,6 +198,7 @@ object SparkEnv extends Logging {
port,
isDriver = true,
isLocal = isLocal,
numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
Expand Down Expand Up @@ -236,8 +238,8 @@ object SparkEnv extends Logging {
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
numUsableCores: Int = 0,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

// Listener bus is only used on the driver
Expand Down
Expand Up @@ -87,7 +87,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
Expand Down

0 comments on commit a76cf51

Please sign in to comment.