Skip to content

Commit

Permalink
[SPARK-13081][PYSPARK][SPARK_SUBMIT] Allow set pythonExec of driver a…
Browse files Browse the repository at this point in the history
…nd executor through conf…

Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python"

Manually test in local & yarn mode for pyspark-shell and pyspark batch mode.

Author: Jeff Zhang <zjffdu@apache.org>

Closes #13146 from zjffdu/SPARK-13081.
  • Loading branch information
zjffdu authored and Marcelo Vanzin committed Aug 12, 2016
1 parent ea0bf91 commit 7a9e25c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 8 deletions.
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Expand Up @@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.spark.SparkUserAppException
import org.apache.spark.{SparkConf, SparkUserAppException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.internal.config._
import org.apache.spark.util.{RedirectThread, Utils}

/**
Expand All @@ -37,8 +38,12 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
val sparkConf = new SparkConf()
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python")

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
Expand Down Expand Up @@ -77,6 +82,9 @@ object PythonRunner {
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
// pass conf spark.pyspark.python to python process, the only way to pass info to
// python process is through environment variable.
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
try {
val process = builder.start()
Expand Down
Expand Up @@ -106,4 +106,12 @@ package object config {
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
.createOptional

private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
.stringConf
.createOptional

private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
.stringConf
.createOptional
}
Expand Up @@ -28,6 +28,8 @@
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;

import org.apache.spark.internal.config.package$;

/**
* These tests require the Spark assembly to be built before they can be run.
*/
Expand Down Expand Up @@ -89,6 +91,12 @@ public void testSparkArgumentHandling() throws Exception {
launcher.setConf("spark.foo", "foo");
launcher.addSparkArg(opts.CONF, "spark.foo=bar");
assertEquals("bar", launcher.builder.conf.get("spark.foo"));

launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4");
launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5");
assertEquals("python3.4", launcher.builder.conf.get(
package$.MODULE$.PYSPARK_DRIVER_PYTHON().key()));
assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
}

@Test(expected=IllegalStateException.class)
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Expand Up @@ -51,8 +51,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

test("loading from system properties") {
System.setProperty("spark.test.testProperty", "2")
System.setProperty("nonspark.test.testProperty", "0")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
assert(!conf.contains("nonspark.test.testProperty"))
}

test("initializing without loading defaults") {
Expand Down
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{ResetSystemProperties, Utils}
Expand Down Expand Up @@ -512,13 +513,17 @@ class SparkSubmitSuite
val clArgs3 = Seq(
"--master", "local",
"--py-files", pyFiles,
"--conf", "spark.pyspark.driver.python=python3.4",
"--conf", "spark.pyspark.python=python3.5",
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
sysProps3("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
sysProps3(PYSPARK_PYTHON.key) should be ("python3.5")
}

test("resolves config paths correctly") {
Expand Down
21 changes: 19 additions & 2 deletions docs/configuration.md
Expand Up @@ -427,6 +427,21 @@ Apart from these, the following properties are also available, and may be useful
with <code>spark.jars.packages</code>.
</td>
</tr>
<tr>
<td><code>spark.pyspark.driver.python</code></td>
<td></td>
<td>
Python binary executable to use for PySpark in driver.
(default is <code>spark.pyspark.python</code>)
</td>
</tr>
<tr>
<td><code>spark.pyspark.python</code></td>
<td></td>
<td>
Python binary executable to use for PySpark in both driver and executors.
</td>
</tr>
</table>

#### Shuffle Behavior
Expand Down Expand Up @@ -1786,11 +1801,13 @@ The following variables can be set in `spark-env.sh`:
</tr>
<tr>
<td><code>PYSPARK_PYTHON</code></td>
<td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).</td>
<td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).
Property <code>spark.pyspark.python</code> take precedence if it is set</td>
</tr>
<tr>
<td><code>PYSPARK_DRIVER_PYTHON</code></td>
<td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).</td>
<td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).
Property <code>spark.pyspark.driver.python</code> take precedence if it is set</td>
</tr>
<tr>
<td><code>SPARKR_DRIVER_R</code></td>
Expand Down
Expand Up @@ -64,6 +64,10 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";

static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";

static final String PYSPARK_PYTHON = "spark.pyspark.python";

/** Logger name to use when launching a child process. */
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";

Expand Down
Expand Up @@ -294,11 +294,23 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
appResource = PYSPARK_SHELL_RESOURCE;
constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");

// The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script,
// followed by PYSPARK_DRIVER_PYTHON_OPTS.
// Will pick up the binary executable in the following order
// 1. conf spark.pyspark.driver.python
// 2. conf spark.pyspark.python
// 3. environment variable PYSPARK_DRIVER_PYTHON
// 4. environment variable PYSPARK_PYTHON
// 5. python
List<String> pyargs = new ArrayList<>();
pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
conf.get(SparkLauncher.PYSPARK_PYTHON),
System.getenv("PYSPARK_DRIVER_PYTHON"),
System.getenv("PYSPARK_PYTHON"),
"python"));
String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
// pass conf spark.pyspark.python to python by environment variable.
env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
}
if (!isEmpty(pyOpts)) {
pyargs.addAll(parseOptionString(pyOpts));
}
Expand Down

0 comments on commit 7a9e25c

Please sign in to comment.