diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index dc6c59673d142..d2a10df7acbd3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -106,6 +106,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val startTime = System.currentTimeMillis val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") + // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores + if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _)) + } envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index f5e8cfff2ad1d..8055a6270dac8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -91,6 +91,15 @@ object PythonRunner { // python process is through environment variable. sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _)) sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) + // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores + if (sparkConf.getOption("spark.yarn.appMasterEnv.OMP_NUM_THREADS").isEmpty && + sparkConf.getOption("spark.mesos.driverEnv.OMP_NUM_THREADS").isEmpty && + sparkConf.getOption("spark.kubernetes.driverEnv.OMP_NUM_THREADS").isEmpty) { + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to the driver + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + sparkConf.getOption("spark.driver.cores").foreach(env.put("OMP_NUM_THREADS", _)) + } builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize try { val process = builder.start()