From 7ad18ee9f26e75dbe038c6034700f9cd4c0e2baa Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 28 Aug 2018 12:31:33 -0700 Subject: [PATCH] [SPARK-25004][CORE] Add spark.executor.pyspark.memory limit. ## What changes were proposed in this pull request? This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. ## How was this patch tested? Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit. --- .../apache/spark/api/python/PythonRDD.scala | 5 +--- .../spark/api/python/PythonRunner.scala | 27 ++++++++++++------- .../spark/internal/config/package.scala | 4 +++ docs/configuration.md | 12 +++++++++ python/pyspark/worker.py | 23 ++++++++++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 17 ++++++++---- .../spark/deploy/yarn/YarnAllocator.scala | 9 ++++++- .../deploy/yarn/BaseYarnClusterSuite.scala | 27 +++++++++++++------ .../spark/deploy/yarn/YarnClusterSuite.scala | 6 +++-- .../python/AggregateInPandasExec.scala | 4 --- .../python/ArrowEvalPythonExec.scala | 4 --- .../execution/python/ArrowPythonRunner.scala | 4 +-- .../python/BatchEvalPythonExec.scala | 5 +--- .../sql/execution/python/EvalPythonExec.scala | 6 +---- .../python/FlatMapGroupsInPandasExec.scala | 4 --- .../python/PythonForeachWriter.scala | 5 +--- .../execution/python/PythonUDFRunner.scala | 4 +-- .../execution/python/WindowInPandasExec.scala | 4 --- 18 files changed, 105 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index c3db60a23f987..197f4643e6134 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -49,9 +49,6 @@ private[spark] class PythonRDD( isFromBarrier: Boolean = false) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.getInt("spark.buffer.size", 65536) - val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - override def getPartitions: Array[Partition] = firstParent.partitions override val partitioner: Option[Partitioner] = { @@ -61,7 +58,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonRunner(func, bufferSize, reuseWorker) + val runner = PythonRunner(func) runner.compute(firstParent.iterator(split, context), split.index, context) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 151c910bf1aee..da6475cfa8549 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -62,14 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) + // All the Python functions should have the same exec, version and envvars. protected val envVars = funcs.head.funcs.head.envVars protected val pythonExec = funcs.head.funcs.head.pythonExec @@ -82,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private[spark] var serverSocket: Option[ServerSocket] = None // Authentication helper used when serving method calls via socket from Python side. - private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf) + private lazy val authHelper = new SocketAuthHelper(conf) def compute( inputIterator: Iterator[IN], @@ -95,6 +102,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") } + if (memoryMb.isDefined) { + envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString) + } val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) // Whether is the worker released into idle pool val released = new AtomicBoolean(false) @@ -485,20 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private[spark] object PythonRunner { - def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = { - new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker) + def apply(func: PythonFunction): PythonRunner = { + new PythonRunner(Seq(ChainedPythonFunctions(Seq(func)))) } } /** * A helper class to run Python mapPartition in Spark. */ -private[spark] class PythonRunner( - funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean) +private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) { + funcs, PythonEvalType.NON_UDF, Array(Array(0))) { protected override def newWriterThread( env: SparkEnv, diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index daf3f070d72e9..7c2f601c9986a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -114,6 +114,10 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") + .bytesConf(ByteUnit.MiB) + .createOptional + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/docs/configuration.md b/docs/configuration.md index 0270dc2cfaf45..9714b48d5e69b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -179,6 +179,18 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + + The amount of memory to be allocated to PySpark in each executor, in MiB + unless otherwise specified. If set, PySpark memory for an executor will be + limited to this amount. If not set, Spark will not limit Python's memory use + and it is up to the application to avoid exceeding the overhead memory space + shared with other non-JVM processes. When PySpark is run in YARN, this memory + is added to executor resource requests. + + spark.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d54a5b8e396ea..228b3e07c647a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,6 +22,7 @@ import os import sys import time +import resource import socket import traceback @@ -263,6 +264,28 @@ def main(infile, outfile): isBarrier = read_bool(infile) boundPort = read_int(infile) secret = UTF8Deserializer().loads(infile) + + # set up memory limits + memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) + total_memory = resource.RLIMIT_AS + try: + if memory_limit_mb > 0: + (soft_limit, hard_limit) = resource.getrlimit(total_memory) + msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit) + print(msg, file=sys.stderr) + + # convert to bytes + new_limit = memory_limit_mb * 1024 * 1024 + + if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit: + msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit) + print(msg, file=sys.stderr) + resource.setrlimit(total_memory, (new_limit, new_limit)) + + except (resource.error, OSError, ValueError) as e: + # not all systems support resource limits, so warn instead of failing + print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) + # initialize global state taskContext = None if isBarrier: diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 698fc2ce8bf9d..4a85898ef880b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -91,6 +91,13 @@ private[spark] class Client( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val isPython = sparkConf.get(IS_PYTHON_APP) + private val pysparkWorkerMemory: Int = if (isPython) { + sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + 0 + } + private val distCacheMgr = new ClientDistributedCacheManager() private val principal = sparkConf.get(PRINCIPAL).orNull @@ -333,12 +340,12 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = executorMemory + executorMemoryOverhead + val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + - s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + - "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + - "'yarn.nodemanager.resource.memory-mb'.") + throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " + + s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " + + s"the max threshold ($maxMem MB) of this cluster! Please check the values of " + + s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.") } val amMem = amMemory + amMemoryOverhead if (amMem > maxMem) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 40f1222fcd83f..8a7551de7c088 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -133,10 +133,17 @@ private[yarn] class YarnAllocator( // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt + protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) { + sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + 0 + } // Number of cores per executor. protected val executorCores = sparkConf.get(EXECUTOR_CORES) // Resource capability requested for each executors - private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) + private[yarn] val resource = Resource.newInstance( + executorMemory + memoryOverhead + pysparkWorkerMemory, + executorCores) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index b0abcc9149d08..3a7913122dd83 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -133,7 +133,8 @@ abstract class BaseYarnClusterSuite extraClassPath: Seq[String] = Nil, extraJars: Seq[String] = Nil, extraConf: Map[String, String] = Map(), - extraEnv: Map[String, String] = Map()): SparkAppHandle.State = { + extraEnv: Map[String, String] = Map(), + outFile: Option[File] = None): SparkAppHandle.State = { val deployMode = if (clientMode) "client" else "cluster" val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf) val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) + if (outFile.isDefined) { + launcher.redirectOutput(outFile.get) + launcher.redirectError() + } + val handle = launcher.startApplication() try { eventually(timeout(2 minutes), interval(1 second)) { @@ -179,17 +185,22 @@ abstract class BaseYarnClusterSuite * the tests enforce that something is written to a file after everything is ok to indicate * that the job succeeded. */ - protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = { - checkResult(finalState, result, "success") - } - protected def checkResult( finalState: SparkAppHandle.State, result: File, - expected: String): Unit = { - finalState should be (SparkAppHandle.State.FINISHED) + expected: String = "success", + outFile: Option[File] = None): Unit = { + // the context message is passed to assert as Any instead of a function. to lazily load the + // output from the file, this passes an anonymous object that loads it in toString when building + // an error message + val output = new Object() { + override def toString: String = outFile + .map(Files.toString(_, StandardCharsets.UTF_8)) + .getOrElse("(stdout/stderr was not captured)") + } + assert(finalState === SparkAppHandle.State.FINISHED, output) val resultString = Files.toString(result, StandardCharsets.UTF_8) - resultString should be (expected) + assert(resultString === expected, output) } protected def mainClassName(klass: Class[_]): String = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d67f5d2768e49..58d11e96942e1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -282,13 +282,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") val result = File.createTempFile("result", null, tempDir) + val outFile = Some(File.createTempFile("stdout", null, tempDir)) val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(), sparkArgs = Seq("--py-files" -> pyFiles), appArgs = Seq(result.getAbsolutePath()), extraEnv = extraEnvVars, - extraConf = extraConf) - checkResult(finalState, result) + extraConf = extraConf, + outFile = outFile) + checkResult(finalState, result, outFile = outFile) } private def testUseClassPathFirst(clientMode: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 88c9c026928e8..2ab7240556aaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -79,8 +79,6 @@ case class AggregateInPandasExec( override protected def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -137,8 +135,6 @@ case class AggregateInPandasExec( val columnarBatchIter = new ArrowPythonRunner( pyFuncs, - bufferSize, - reuseWorker, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 6a03f860f8f95..2b87796dc6833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -75,8 +75,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi protected override def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -89,8 +87,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val columnarBatchIter = new ArrowPythonRunner( funcs, - bufferSize, - reuseWorker, PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 85b187159a3e6..18992d7a9f974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -39,15 +39,13 @@ import org.apache.spark.util.Utils */ class ArrowPythonRunner( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, conf: Map[String, String]) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( - funcs, bufferSize, reuseWorker, evalType, argOffsets) { + funcs, evalType, argOffsets) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 2054c700957e0..b08b7e60e130b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -43,8 +43,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi protected override def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -75,8 +73,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi }.grouped(100).map(x => pickle.dumps(x.toArray)) // Output iterator for results from Python. - val outputIterator = new PythonUDFRunner( - funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets) + val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 04c7dfdd4e204..942a6db57416e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -78,8 +78,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil protected def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -87,8 +85,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil protected override def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute().map(_.copy()) - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) inputRDD.mapPartitions { iter => val context = TaskContext.get() @@ -129,7 +125,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } val outputRowIterator = evaluate( - pyFuncs, bufferSize, reuseWorker, argOffsets, projectedRowIter, schema, context) + pyFuncs, argOffsets, projectedRowIter, schema, context) val joined = new JoinedRow val resultProj = UnsafeProjection.create(output, output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index f5a563baf52df..e9cff1a5a2007 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -74,8 +74,6 @@ case class FlatMapGroupsInPandasExec( override protected def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -141,8 +139,6 @@ case class FlatMapGroupsInPandasExec( val columnarBatchIter = new ArrowPythonRunner( chainedFunc, - bufferSize, - reuseWorker, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, dedupSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala index f08f816cbcca9..a4e9b3305052f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala @@ -45,10 +45,7 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType) } private lazy val pythonRunner = { - val conf = SparkEnv.get.conf - val bufferSize = conf.getInt("spark.buffer.size", 65536) - val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - PythonRunner(func, bufferSize, reuseWorker) + PythonRunner(func) } private lazy val outputIterator = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index e28def1c4b423..cc61faa7e7051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -29,12 +29,10 @@ import org.apache.spark.api.python._ */ class PythonUDFRunner( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, evalType, argOffsets) { + funcs, evalType, argOffsets) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 47bfbde56bb3e..27bed1137e5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -95,8 +95,6 @@ case class WindowInPandasExec( protected override def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -156,8 +154,6 @@ case class WindowInPandasExec( val windowFunctionResult = new ArrowPythonRunner( pyFuncs, - bufferSize, - reuseWorker, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF, argOffsets, windowInputSchema,