diff --git a/bin/utils.sh b/bin/utils.sh index 9d5277cddd122..1dd277bfb4891 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \ --conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) + --total-executor-cores | --executor-cores | --queue | --num-executors | \ + --archives | --ivy-repo ) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 1082a952dac99..2ccfbbfd5f844 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>" SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" -SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>" +SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--maven\> \<--maven-repos\> \<--ivy-repo\>" echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 714b3699cff03..677fb46f37f70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,7 +22,6 @@ import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL import org.apache.ivy.Ivy -import org.apache.ivy.ant.IvyDependencyExclude import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId} import org.apache.ivy.core.report.ResolveReport @@ -69,14 +68,15 @@ object SparkSubmit { // Directories for caching downloads through ivy and storing the jars when maven coordinates are // supplied to spark-submit - // TODO: Take these as arguments? For example, on AWS /mnt/ is a better location. - private val IVY_CACHE = new File("ivy/cache") - private val MAVEN_JARS = new File("ivy/jars") + private var IVY_CACHE: File = null + private var MAVEN_JARS: File = null // Exposed for testing private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] var printStream: PrintStream = System.err + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) + private[spark] def printErrorAndExit(str: String) = { printStream.println("Error: " + str) printStream.println("Run with --help for usage help or --verbose for debug output") @@ -94,13 +94,13 @@ object SparkSubmit { /** * @return a tuple containing - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a list of system properties and env vars, and - * (4) the main class for the child + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { // Values to return val childArgs = new ArrayBuffer[String]() @@ -147,7 +147,7 @@ object SparkSubmit { if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + - "This copy of Spark may not have been compiled with YARN support.") + "This copy of Spark may not have been compiled with YARN support.") } } @@ -186,6 +186,8 @@ object SparkSubmit { sysProps("SPARK_SUBMIT") = "true" // Resolve maven dependencies if there are any and add classpath to jars + IVY_CACHE = new File(s"${args.ivyRepoPath}/cache") + MAVEN_JARS = new File(s"${args.ivyRepoPath}/jars") val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { @@ -203,6 +205,7 @@ object SparkSubmit { OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -214,6 +217,7 @@ object SparkSubmit { // Standalone cluster only OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"), + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), @@ -252,18 +256,26 @@ object SparkSubmit { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } - if (args.jars != null) { childClasspath ++= args.jars.split(",") } - if (args.childArgs != null) { childArgs ++= args.childArgs } + if (args.jars != null) { + childClasspath ++= args.jars.split(",") + } + if (args.childArgs != null) { + childArgs ++= args.childArgs + } } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && - (deployMode & opt.deployMode) != 0 && - (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } - if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } + (deployMode & opt.deployMode) != 0 && + (clusterManager & opt.clusterManager) != 0) { + if (opt.clOption != null) { + childArgs +=(opt.clOption, opt.value) + } + if (opt.sysProp != null) { + sysProps.put(opt.sysProp, opt.value) + } } } @@ -286,7 +298,7 @@ object SparkSubmit { childArgs += "--supervise" } childArgs += "launch" - childArgs += (args.master, args.primaryResource, args.mainClass) + childArgs +=(args.master, args.primaryResource, args.mainClass) if (args.childArgs != null) { childArgs ++= args.childArgs } @@ -296,11 +308,11 @@ object SparkSubmit { if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.primaryResource != SPARK_INTERNAL) { - childArgs += ("--jar", args.primaryResource) + childArgs +=("--jar", args.primaryResource) } - childArgs += ("--class", args.mainClass) + childArgs +=("--class", args.mainClass) if (args.childArgs != null) { - args.childArgs.foreach { arg => childArgs += ("--arg", arg) } + args.childArgs.foreach { arg => childArgs +=("--arg", arg)} } } @@ -341,11 +353,11 @@ object SparkSubmit { } private def launch( - childArgs: ArrayBuffer[String], - childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], - childMainClass: String, - verbose: Boolean = false) { + childArgs: ArrayBuffer[String], + childClasspath: ArrayBuffer[String], + sysProps: Map[String, String], + childMainClass: String, + verbose: Boolean = false) { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -452,8 +464,8 @@ object SparkSubmit { */ private[spark] def mergeFileLists(lists: String*): String = { val merged = lists.filter(_ != null) - .flatMap(_.split(",")) - .mkString(",") + .flatMap(_.split(",")) + .mkString(",") if (merged == "") null else merged } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index c400dd938685e..943c9a17ca7f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -52,6 +52,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var jars: String = null var maven: String = null var mavenRepos: String = null + var ivyRepoPath: String = null var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -125,6 +126,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull + ivyRepoPath = Option(ivyRepoPath) + .orElse(sparkProperties.get("spark.jars.ivy")).orElse(Option("ivy/")).get deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) @@ -228,6 +231,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | jars $jars | maven $maven | maven-repos $mavenRepos + | ivy-repo $ivyRepoPath | verbose $verbose | |Spark properties used, including those specified through @@ -342,6 +346,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St mavenRepos = value parse(tail) + case ("--ivy-repo") :: value :: tail => + ivyRepoPath = Utils.resolveURIs(value) + parse(tail) + case ("--conf" | "-c") :: value :: tail => value.split("=", 2).toSeq match { case Seq(k, v) => sparkProperties(k) = v @@ -399,6 +407,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | coordinates should be groupId:artifactId:version. | --maven-repos Supply additional remote repositories as a comma-delimited | list to search for the maven coordinates given with --maven. + | --ivy-repo The path to use to cache jars downloaded using maven + | coordinates. The default is $PWD/ivy | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working