diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index c3db60a23f987..197f4643e6134 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -49,9 +49,6 @@ private[spark] class PythonRDD(
isFromBarrier: Boolean = false)
extends RDD[Array[Byte]](parent) {
- val bufferSize = conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
-
override def getPartitions: Array[Partition] = firstParent.partitions
override val partitioner: Option[Partitioner] = {
@@ -61,7 +58,7 @@ private[spark] class PythonRDD(
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
- val runner = PythonRunner(func, bufferSize, reuseWorker)
+ val runner = PythonRunner(func)
runner.compute(firstParent.iterator(split, context), split.index, context)
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 151c910bf1aee..da6475cfa8549 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._
@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
*/
private[spark] abstract class BasePythonRunner[IN, OUT](
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]])
extends Logging {
require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")
+ private val conf = SparkEnv.get.conf
+ private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+ private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+ // each python worker gets an equal part of the allocation. the worker pool will grow to the
+ // number of concurrent tasks, which is determined by the number of cores in this executor.
+ private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+ .map(_ / conf.getInt("spark.executor.cores", 1))
+
// All the Python functions should have the same exec, version and envvars.
protected val envVars = funcs.head.funcs.head.envVars
protected val pythonExec = funcs.head.funcs.head.pythonExec
@@ -82,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private[spark] var serverSocket: Option[ServerSocket] = None
// Authentication helper used when serving method calls via socket from Python side.
- private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+ private lazy val authHelper = new SocketAuthHelper(conf)
def compute(
inputIterator: Iterator[IN],
@@ -95,6 +102,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (reuseWorker) {
envVars.put("SPARK_REUSE_WORKER", "1")
}
+ if (memoryMb.isDefined) {
+ envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
+ }
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Whether is the worker released into idle pool
val released = new AtomicBoolean(false)
@@ -485,20 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private[spark] object PythonRunner {
- def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = {
- new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
+ def apply(func: PythonFunction): PythonRunner = {
+ new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))))
}
}
/**
* A helper class to run Python mapPartition in Spark.
*/
-private[spark] class PythonRunner(
- funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean)
+private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
extends BasePythonRunner[Array[Byte], Array[Byte]](
- funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) {
+ funcs, PythonEvalType.NON_UDF, Array(Array(0))) {
protected override def newWriterThread(
env: SparkEnv,
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index daf3f070d72e9..7c2f601c9986a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -114,6 +114,10 @@ package object config {
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
.createWithDefault(0)
+ private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
+ .bytesConf(ByteUnit.MiB)
+ .createOptional
+
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)
diff --git a/docs/configuration.md b/docs/configuration.md
index 0270dc2cfaf45..9714b48d5e69b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -179,6 +179,18 @@ of the most common options to set are:
(e.g. 2g
, 8g
).
+
+ spark.executor.pyspark.memory |
+ Not set |
+
+ The amount of memory to be allocated to PySpark in each executor, in MiB
+ unless otherwise specified. If set, PySpark memory for an executor will be
+ limited to this amount. If not set, Spark will not limit Python's memory use
+ and it is up to the application to avoid exceeding the overhead memory space
+ shared with other non-JVM processes. When PySpark is run in YARN, this memory
+ is added to executor resource requests.
+ |
+
spark.executor.memoryOverhead |
executorMemory * 0.10, with minimum of 384 |
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d54a5b8e396ea..228b3e07c647a 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -22,6 +22,7 @@
import os
import sys
import time
+import resource
import socket
import traceback
@@ -263,6 +264,28 @@ def main(infile, outfile):
isBarrier = read_bool(infile)
boundPort = read_int(infile)
secret = UTF8Deserializer().loads(infile)
+
+ # set up memory limits
+ memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
+ total_memory = resource.RLIMIT_AS
+ try:
+ if memory_limit_mb > 0:
+ (soft_limit, hard_limit) = resource.getrlimit(total_memory)
+ msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
+ print(msg, file=sys.stderr)
+
+ # convert to bytes
+ new_limit = memory_limit_mb * 1024 * 1024
+
+ if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
+ msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
+ print(msg, file=sys.stderr)
+ resource.setrlimit(total_memory, (new_limit, new_limit))
+
+ except (resource.error, OSError, ValueError) as e:
+ # not all systems support resource limits, so warn instead of failing
+ print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
+
# initialize global state
taskContext = None
if isBarrier:
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 698fc2ce8bf9d..4a85898ef880b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -91,6 +91,13 @@ private[spark] class Client(
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+ private val isPython = sparkConf.get(IS_PYTHON_APP)
+ private val pysparkWorkerMemory: Int = if (isPython) {
+ sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
+ } else {
+ 0
+ }
+
private val distCacheMgr = new ClientDistributedCacheManager()
private val principal = sparkConf.get(PRINCIPAL).orNull
@@ -333,12 +340,12 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
- val executorMem = executorMemory + executorMemoryOverhead
+ val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
- s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
- "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
- "'yarn.nodemanager.resource.memory-mb'.")
+ throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
+ s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
+ s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
+ s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 40f1222fcd83f..8a7551de7c088 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -133,10 +133,17 @@ private[yarn] class YarnAllocator(
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
+ protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
+ sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
+ } else {
+ 0
+ }
// Number of cores per executor.
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
- private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
+ private[yarn] val resource = Resource.newInstance(
+ executorMemory + memoryOverhead + pysparkWorkerMemory,
+ executorCores)
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index b0abcc9149d08..3a7913122dd83 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -133,7 +133,8 @@ abstract class BaseYarnClusterSuite
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
extraConf: Map[String, String] = Map(),
- extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
+ extraEnv: Map[String, String] = Map(),
+ outFile: Option[File] = None): SparkAppHandle.State = {
val deployMode = if (clientMode) "client" else "cluster"
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
}
extraJars.foreach(launcher.addJar)
+ if (outFile.isDefined) {
+ launcher.redirectOutput(outFile.get)
+ launcher.redirectError()
+ }
+
val handle = launcher.startApplication()
try {
eventually(timeout(2 minutes), interval(1 second)) {
@@ -179,17 +185,22 @@ abstract class BaseYarnClusterSuite
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
- protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
- checkResult(finalState, result, "success")
- }
-
protected def checkResult(
finalState: SparkAppHandle.State,
result: File,
- expected: String): Unit = {
- finalState should be (SparkAppHandle.State.FINISHED)
+ expected: String = "success",
+ outFile: Option[File] = None): Unit = {
+ // the context message is passed to assert as Any instead of a function. to lazily load the
+ // output from the file, this passes an anonymous object that loads it in toString when building
+ // an error message
+ val output = new Object() {
+ override def toString: String = outFile
+ .map(Files.toString(_, StandardCharsets.UTF_8))
+ .getOrElse("(stdout/stderr was not captured)")
+ }
+ assert(finalState === SparkAppHandle.State.FINISHED, output)
val resultString = Files.toString(result, StandardCharsets.UTF_8)
- resultString should be (expected)
+ assert(resultString === expected, output)
}
protected def mainClassName(klass: Class[_]): String = {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d67f5d2768e49..58d11e96942e1 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -282,13 +282,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
val result = File.createTempFile("result", null, tempDir)
+ val outFile = Some(File.createTempFile("stdout", null, tempDir))
val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnvVars,
- extraConf = extraConf)
- checkResult(finalState, result)
+ extraConf = extraConf,
+ outFile = outFile)
+ checkResult(finalState, result, outFile = outFile)
}
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 88c9c026928e8..2ab7240556aaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -79,8 +79,6 @@ case class AggregateInPandasExec(
override protected def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()
- val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
@@ -137,8 +135,6 @@ case class AggregateInPandasExec(
val columnarBatchIter = new ArrowPythonRunner(
pyFuncs,
- bufferSize,
- reuseWorker,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
argOffsets,
aggInputSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 6a03f860f8f95..2b87796dc6833 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -75,8 +75,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
@@ -89,8 +87,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
val columnarBatchIter = new ArrowPythonRunner(
funcs,
- bufferSize,
- reuseWorker,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
argOffsets,
schema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 85b187159a3e6..18992d7a9f974 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -39,15 +39,13 @@ import org.apache.spark.util.Utils
*/
class ArrowPythonRunner(
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]],
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
- funcs, bufferSize, reuseWorker, evalType, argOffsets) {
+ funcs, evalType, argOffsets) {
protected override def newWriterThread(
env: SparkEnv,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index 2054c700957e0..b08b7e60e130b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -43,8 +43,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
@@ -75,8 +73,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
}.grouped(100).map(x => pickle.dumps(x.toArray))
// Output iterator for results from Python.
- val outputIterator = new PythonUDFRunner(
- funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
+ val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
.compute(inputIterator, context.partitionId(), context)
val unpickle = new Unpickler
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 04c7dfdd4e204..942a6db57416e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -78,8 +78,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
protected def evaluate(
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
@@ -87,8 +85,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
protected override def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute().map(_.copy())
- val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
inputRDD.mapPartitions { iter =>
val context = TaskContext.get()
@@ -129,7 +125,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
}
val outputRowIterator = evaluate(
- pyFuncs, bufferSize, reuseWorker, argOffsets, projectedRowIter, schema, context)
+ pyFuncs, argOffsets, projectedRowIter, schema, context)
val joined = new JoinedRow
val resultProj = UnsafeProjection.create(output, output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index f5a563baf52df..e9cff1a5a2007 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -74,8 +74,6 @@ case class FlatMapGroupsInPandasExec(
override protected def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()
- val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
@@ -141,8 +139,6 @@ case class FlatMapGroupsInPandasExec(
val columnarBatchIter = new ArrowPythonRunner(
chainedFunc,
- bufferSize,
- reuseWorker,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
argOffsets,
dedupSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
index f08f816cbcca9..a4e9b3305052f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
@@ -45,10 +45,7 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType)
}
private lazy val pythonRunner = {
- val conf = SparkEnv.get.conf
- val bufferSize = conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
- PythonRunner(func, bufferSize, reuseWorker)
+ PythonRunner(func)
}
private lazy val outputIterator =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index e28def1c4b423..cc61faa7e7051 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -29,12 +29,10 @@ import org.apache.spark.api.python._
*/
class PythonUDFRunner(
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]])
extends BasePythonRunner[Array[Byte], Array[Byte]](
- funcs, bufferSize, reuseWorker, evalType, argOffsets) {
+ funcs, evalType, argOffsets) {
protected override def newWriterThread(
env: SparkEnv,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 47bfbde56bb3e..27bed1137e5b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -95,8 +95,6 @@ case class WindowInPandasExec(
protected override def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()
- val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
- val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
@@ -156,8 +154,6 @@ case class WindowInPandasExec(
val windowFunctionResult = new ArrowPythonRunner(
pyFuncs,
- bufferSize,
- reuseWorker,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
argOffsets,
windowInputSchema,