diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 926e1ff7a874d..52d2ef6e700e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -356,16 +356,19 @@ object SparkSubmit { args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the primary file differently, so don't merge it. - args.files = mergeFileLists(args.files, args.primaryResource) + args.files = mergeFileLists(args.files, args.primaryResource, args.pyRequirements) } } if (clusterManager != YARN) { // The YARN backend handles python files differently, so don't merge the lists. - args.files = mergeFileLists(args.files, args.pyFiles) + args.files = mergeFileLists(args.files, args.pyFiles, args.pyRequirements) } if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } // In YARN mode for an R app, add the SparkR package archive and the R package @@ -542,6 +545,10 @@ object SparkSubmit { if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } // assure a keytab is available from any place in a JVM @@ -593,6 +600,9 @@ object SparkSubmit { if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } else { childArgs += (args.primaryResource, args.mainClass) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index ec6d48485f110..3136a729d5933 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -64,6 +64,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null + var pyRequirements: String = null var isR: Boolean = false var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() @@ -304,6 +305,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | numExecutors $numExecutors | files $files | pyFiles $pyFiles + | pyRequiremenst $pyRequirements | archives $archives | mainClass $mainClass | primaryResource $primaryResource @@ -395,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case PY_FILES => pyFiles = Utils.resolveURIs(value) + case PY_REQUIREMENTS => + pyRequirements = Utils.resolveURIs(value) + case ARCHIVES => archives = Utils.resolveURIs(value) @@ -505,6 +510,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | search for the maven coordinates given with --packages. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. + | --py-requirements REQS Pip requirements file with dependencies that will be fetched + | and placed on PYTHONPATH | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 271897699201b..8fac39a5cb267 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -570,6 +570,37 @@ class SparkSubmitSuite appArgs.executorMemory should be ("2.3g") } } + + test("py-requirements will be distributed") { + val pyReqs = "requirements.txt" + + val clArgsYarn = Seq( + "--master", "yarn", + "--deploy-mode", "cluster", + "--py-requirements", pyReqs, + "mister.py" + ) + + val appArgsYarn = new SparkSubmitArguments(clArgsYarn) + val sysPropsYarn = SparkSubmit.prepareSubmitEnvironment(appArgsYarn)._3 + appArgsYarn.pyRequirements should be (Utils.resolveURIs(pyReqs)) + sysPropsYarn("spark.yarn.dist.files") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + sysPropsYarn("spark.submit.pyRequirements") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + + val clArgs = Seq( + "--master", "local", + "--py-requirements", pyReqs, + "mister.py" + ) + + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 + appArgs.pyRequirements should be (Utils.resolveURIs(pyReqs)) + sysProps("spark.submit.pyRequirements") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + } // scalastyle:on println // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc5079649..d036ac322809c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -55,6 +55,7 @@ class SparkSubmitOptionParser { protected final String PROPERTIES_FILE = "--properties-file"; protected final String PROXY_USER = "--proxy-user"; protected final String PY_FILES = "--py-files"; + protected final String PY_REQUIREMENTS = "--py-requirements"; protected final String REPOSITORIES = "--repositories"; protected final String STATUS = "--status"; protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 53ed31954537b..84b026d956a6b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -209,6 +209,12 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + # Apply requirements file set by spark-submit. + for path in self._conf.get("spark.submit.pyRequirements", "").split(","): + if path != "": + (dirname, filename) = os.path.split(path) + self.addRequirementsFile(os.path.join(SparkFiles.getRootDirectory(), filename)) + # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \