From c6900c65c07a5395bc8a2de03e580850acc79b27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20H=C3=B6ring?= Date: Fri, 14 Sep 2018 18:34:59 +0200 Subject: [PATCH] Add support for pex in PySpark --- .../api/python/PythonWorkerFactory.scala | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 6afa37aa36fd3..8b2acecf05bc6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -144,9 +144,21 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule)) + val command = { + if (!isPexFile(pythonExec)) { + Arrays.asList(pythonExec, "-m", workerModule) + } + else { + Arrays.asList(SparkFiles.get(pythonExec)) + } + } + val pb = new ProcessBuilder(command) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) + if (isPexFile(pythonExec)) { + workerEnv.put("PEX_MODULE", workerModule) + workerEnv.put("PEX_ROOT", "/tmp/.pex") + } workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") @@ -186,10 +198,22 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val command = Arrays.asList(pythonExec, "-m", daemonModule) + val command = { + if (!isPexFile(pythonExec)) { + Arrays.asList(pythonExec, "-m", daemonModule) + } + else { + Arrays.asList(SparkFiles.get(pythonExec)) + } + } + val pb = new ProcessBuilder(command) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) + if (isPexFile(pythonExec)) { + workerEnv.put("PEX_MODULE", daemonModule) + workerEnv.put("PEX_ROOT", "/tmp/.pex") + } workerEnv.put("PYTHONPATH", pythonPath) workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: @@ -256,6 +280,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } } + private def isPexFile(pythonExec: String): Boolean = { + pythonExec.endsWith(".pex") + } + /** * Redirect the given streams to our stderr in separate threads. */