Skip to content

Commit

Permalink
[SPARK-28843][PYTHON] Set OMP_NUM_THREADS to executor cores for pytho…
Browse files Browse the repository at this point in the history
…n if not set

### What changes were proposed in this pull request?

When starting python processes, set `OMP_NUM_THREADS` to the number of cores allocated to an executor or driver if `OMP_NUM_THREADS` is not already set. Each python process will use the same `OMP_NUM_THREADS` setting, even if workers are not shared.

This avoids creating an OpenMP thread pool for parallel processing with a number of threads equal to the number of cores on the executor and [significantly reduces memory consumption](numpy/numpy#10455). Instead, this threadpool should use the number of cores allocated to the executor, if available. If a setting for number of cores is not available, this doesn't change any behavior. OpenMP is used by numpy and pandas.

### Why are the changes needed?

To reduce memory consumption for PySpark jobs.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Validated this reduces python worker memory consumption by more than 1GB on our cluster.

Closes #25545 from rdblue/SPARK-28843-set-omp-num-cores.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
rdblue authored and HyukjinKwon committed Aug 30, 2019
1 parent f8f7c52 commit 31b59bd
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Expand Up @@ -106,6 +106,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
// SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
// this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
// see https://github.com/numpy/numpy/issues/10455
conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _))
}
envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
if (reuseWorker) {
envVars.put("SPARK_REUSE_WORKER", "1")
Expand Down
Expand Up @@ -91,6 +91,15 @@ object PythonRunner {
// python process is through environment variable.
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
if (sparkConf.getOption("spark.yarn.appMasterEnv.OMP_NUM_THREADS").isEmpty &&
sparkConf.getOption("spark.mesos.driverEnv.OMP_NUM_THREADS").isEmpty &&
sparkConf.getOption("spark.kubernetes.driverEnv.OMP_NUM_THREADS").isEmpty) {
// SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to the driver
// this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
// see https://github.com/numpy/numpy/issues/10455
sparkConf.getOption("spark.driver.cores").foreach(env.put("OMP_NUM_THREADS", _))
}
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
try {
val process = builder.start()
Expand Down

0 comments on commit 31b59bd

Please sign in to comment.