Skip to content

Commit

Permalink
[SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory:

```
  File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer
    fe_eval_rec.update(f(src_rec_prep, mat_rec_prep))
  File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp
    comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, []))
  File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare
    permutations = sorted(permutations, reverse=True)
  MemoryError
```

The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity.

## How was this patch tested?

Tested memory limits in our YARN cluster and verified that MemoryError is thrown.

Author: Ryan Blue <blue@apache.org>

Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit.
  • Loading branch information
rdblue authored and Marcelo Vanzin committed Aug 28, 2018
1 parent aff8f15 commit 7ad18ee
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 65 deletions.
Expand Up @@ -49,9 +49,6 @@ private[spark] class PythonRDD(
isFromBarrier: Boolean = false)
extends RDD[Array[Byte]](parent) {

val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)

override def getPartitions: Array[Partition] = firstParent.partitions

override val partitioner: Option[Partitioner] = {
Expand All @@ -61,7 +58,7 @@ private[spark] class PythonRDD(
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val runner = PythonRunner(func, bufferSize, reuseWorker)
val runner = PythonRunner(func)
runner.compute(firstParent.iterator(split, context), split.index, context)
}

Expand Down
27 changes: 17 additions & 10 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Expand Up @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._

Expand Down Expand Up @@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
*/
private[spark] abstract class BasePythonRunner[IN, OUT](
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]])
extends Logging {

require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")

private val conf = SparkEnv.get.conf
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
.map(_ / conf.getInt("spark.executor.cores", 1))

// All the Python functions should have the same exec, version and envvars.
protected val envVars = funcs.head.funcs.head.envVars
protected val pythonExec = funcs.head.funcs.head.pythonExec
Expand All @@ -82,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private[spark] var serverSocket: Option[ServerSocket] = None

// Authentication helper used when serving method calls via socket from Python side.
private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
private lazy val authHelper = new SocketAuthHelper(conf)

def compute(
inputIterator: Iterator[IN],
Expand All @@ -95,6 +102,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (reuseWorker) {
envVars.put("SPARK_REUSE_WORKER", "1")
}
if (memoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Whether is the worker released into idle pool
val released = new AtomicBoolean(false)
Expand Down Expand Up @@ -485,20 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

private[spark] object PythonRunner {

def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = {
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
def apply(func: PythonFunction): PythonRunner = {
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))))
}
}

/**
* A helper class to run Python mapPartition in Spark.
*/
private[spark] class PythonRunner(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean)
private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
extends BasePythonRunner[Array[Byte], Array[Byte]](
funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) {
funcs, PythonEvalType.NON_UDF, Array(Array(0))) {

protected override def newWriterThread(
env: SparkEnv,
Expand Down
Expand Up @@ -114,6 +114,10 @@ package object config {
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
.createWithDefault(0)

private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)

Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Expand Up @@ -179,6 +179,18 @@ of the most common options to set are:
(e.g. <code>2g</code>, <code>8g</code>).
</td>
</tr>
<tr>
<td><code>spark.executor.pyspark.memory</code></td>
<td>Not set</td>
<td>
The amount of memory to be allocated to PySpark in each executor, in MiB
unless otherwise specified. If set, PySpark memory for an executor will be
limited to this amount. If not set, Spark will not limit Python's memory use
and it is up to the application to avoid exceeding the overhead memory space
shared with other non-JVM processes. When PySpark is run in YARN, this memory
is added to executor resource requests.
</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/worker.py
Expand Up @@ -22,6 +22,7 @@
import os
import sys
import time
import resource

This comment has been minimized.

Copy link
@haydenjeune

haydenjeune Nov 15, 2018

@rdblue This package is not available on Windows. Does Spark 2.4.0 still support windows or is there a workaround I'm missing?

This comment has been minimized.

Copy link
@rdblue

rdblue Nov 15, 2018

Author Contributor

Supported platforms did not change for the 2.4.0 release. If this is causing a problem, then please open an issue for the bug. I suspect that it will just require ignoring the memory limit setting on platforms that don't support the resource API.

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Nov 16, 2018

Member

We should fix this. Yup. Let me open a PR.

import socket
import traceback

Expand Down Expand Up @@ -263,6 +264,28 @@ def main(infile, outfile):
isBarrier = read_bool(infile)
boundPort = read_int(infile)
secret = UTF8Deserializer().loads(infile)

# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
total_memory = resource.RLIMIT_AS
try:
if memory_limit_mb > 0:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)

# convert to bytes
new_limit = memory_limit_mb * 1024 * 1024

if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))

except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)

# initialize global state
taskContext = None
if isBarrier:
Expand Down
Expand Up @@ -91,6 +91,13 @@ private[spark] class Client(
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

private val isPython = sparkConf.get(IS_PYTHON_APP)
private val pysparkWorkerMemory: Int = if (isPython) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}

private val distCacheMgr = new ClientDistributedCacheManager()

private val principal = sparkConf.get(PRINCIPAL).orNull
Expand Down Expand Up @@ -333,12 +340,12 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = executorMemory + executorMemoryOverhead
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
"'yarn.nodemanager.resource.memory-mb'.")
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
Expand Down
Expand Up @@ -133,10 +133,17 @@ private[yarn] class YarnAllocator(
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}
// Number of cores per executor.
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private[yarn] val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory,
executorCores)

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
Expand Down
Expand Up @@ -133,7 +133,8 @@ abstract class BaseYarnClusterSuite
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
extraConf: Map[String, String] = Map(),
extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
extraEnv: Map[String, String] = Map(),
outFile: Option[File] = None): SparkAppHandle.State = {
val deployMode = if (clientMode) "client" else "cluster"
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
Expand Down Expand Up @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
}
extraJars.foreach(launcher.addJar)

if (outFile.isDefined) {
launcher.redirectOutput(outFile.get)
launcher.redirectError()
}

val handle = launcher.startApplication()
try {
eventually(timeout(2 minutes), interval(1 second)) {
Expand All @@ -179,17 +185,22 @@ abstract class BaseYarnClusterSuite
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
checkResult(finalState, result, "success")
}

protected def checkResult(
finalState: SparkAppHandle.State,
result: File,
expected: String): Unit = {
finalState should be (SparkAppHandle.State.FINISHED)
expected: String = "success",
outFile: Option[File] = None): Unit = {
// the context message is passed to assert as Any instead of a function. to lazily load the
// output from the file, this passes an anonymous object that loads it in toString when building
// an error message
val output = new Object() {
override def toString: String = outFile
.map(Files.toString(_, StandardCharsets.UTF_8))
.getOrElse("(stdout/stderr was not captured)")
}
assert(finalState === SparkAppHandle.State.FINISHED, output)
val resultString = Files.toString(result, StandardCharsets.UTF_8)
resultString should be (expected)
assert(resultString === expected, output)
}

protected def mainClassName(klass: Class[_]): String = {
Expand Down
Expand Up @@ -282,13 +282,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
val result = File.createTempFile("result", null, tempDir)
val outFile = Some(File.createTempFile("stdout", null, tempDir))

val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnvVars,
extraConf = extraConf)
checkResult(finalState, result)
extraConf = extraConf,
outFile = outFile)
checkResult(finalState, result, outFile = outFile)
}

private def testUseClassPathFirst(clientMode: Boolean): Unit = {
Expand Down
Expand Up @@ -79,8 +79,6 @@ case class AggregateInPandasExec(
override protected def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()

val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)

Expand Down Expand Up @@ -137,8 +135,6 @@ case class AggregateInPandasExec(

val columnarBatchIter = new ArrowPythonRunner(
pyFuncs,
bufferSize,
reuseWorker,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
argOffsets,
aggInputSchema,
Expand Down
Expand Up @@ -75,8 +75,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
Expand All @@ -89,8 +87,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

val columnarBatchIter = new ArrowPythonRunner(
funcs,
bufferSize,
reuseWorker,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
argOffsets,
schema,
Expand Down
Expand Up @@ -39,15 +39,13 @@ import org.apache.spark.util.Utils
*/
class ArrowPythonRunner(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]],
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
funcs, bufferSize, reuseWorker, evalType, argOffsets) {
funcs, evalType, argOffsets) {

protected override def newWriterThread(
env: SparkEnv,
Expand Down
Expand Up @@ -43,8 +43,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
Expand Down Expand Up @@ -75,8 +73,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
}.grouped(100).map(x => pickle.dumps(x.toArray))

// Output iterator for results from Python.
val outputIterator = new PythonUDFRunner(
funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
.compute(inputIterator, context.partitionId(), context)

val unpickle = new Unpickler
Expand Down
Expand Up @@ -78,17 +78,13 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil

protected def evaluate(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
context: TaskContext): Iterator[InternalRow]

protected override def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute().map(_.copy())
val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)

inputRDD.mapPartitions { iter =>
val context = TaskContext.get()
Expand Down Expand Up @@ -129,7 +125,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
}

val outputRowIterator = evaluate(
pyFuncs, bufferSize, reuseWorker, argOffsets, projectedRowIter, schema, context)
pyFuncs, argOffsets, projectedRowIter, schema, context)

val joined = new JoinedRow
val resultProj = UnsafeProjection.create(output, output)
Expand Down
Expand Up @@ -74,8 +74,6 @@ case class FlatMapGroupsInPandasExec(
override protected def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()

val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
Expand Down Expand Up @@ -141,8 +139,6 @@ case class FlatMapGroupsInPandasExec(

val columnarBatchIter = new ArrowPythonRunner(
chainedFunc,
bufferSize,
reuseWorker,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
argOffsets,
dedupSchema,
Expand Down

0 comments on commit 7ad18ee

Please sign in to comment.