From 92afe26e3b30a842037af0266e133267c0ca17ea Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 28 Jun 2016 11:44:46 +0100 Subject: [PATCH] Addition of --driver-jre Flag to SparkSubmit --- .../org/apache/spark/deploy/SparkSubmit.scala | 78 ++++++++++++++++++- .../spark/deploy/SparkSubmitArguments.scala | 11 +++ .../spark/deploy/SparkSubmitSuite.scala | 27 +++++++ dev/deps/spark-deps-hadoop-2.2 | 3 +- dev/deps/spark-deps-hadoop-2.3 | 3 +- dev/deps/spark-deps-hadoop-2.4 | 3 +- dev/deps/spark-deps-hadoop-2.6 | 3 +- dev/deps/spark-deps-hadoop-2.7 | 3 +- .../launcher/AbstractCommandBuilder.java | 1 + .../apache/spark/launcher/SparkLauncher.java | 10 +++ .../launcher/SparkSubmitOptionParser.java | 2 + pom.xml | 7 ++ 12 files changed, 138 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9feafc99ac07f..aa87628e2a084 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,16 +17,17 @@ package org.apache.spark.deploy -import java.io.{File, PrintStream} +import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL +import java.nio.file.{Files, Path, Paths} import java.security.PrivilegedExceptionAction import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream} import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions @@ -573,7 +574,7 @@ object SparkSubmit { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { - val mainFile = new Path(args.primaryResource).getName + val mainFile = new org.apache.hadoop.fs.Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { @@ -637,9 +638,80 @@ object SparkSubmit { sysProps("spark.submit.pyFiles") = formattedPyFiles } + /** + * Archive the JRE into the a temporary directory to ship to YARN + */ + def archiveJre(javaHome: String): (Path, String) = { + // Output into current working directory + val EXTENSION = ".zip" + val outputName = "driverJre" ++ EXTENSION + val tmpPath = Files.createTempDirectory("spark-driver-jre") + val outputPath = tmpPath.resolve(outputName).toAbsolutePath + val rootPath = Paths.get(javaHome) + val zipStream = new ZipArchiveOutputStream( + new BufferedOutputStream(new FileOutputStream(outputPath.toString))) + + def compressDir(root: Path, current: Path, zipStream: ZipArchiveOutputStream): Unit = { + val rootAbs = root.toAbsolutePath.toString + val rootName = root.getFileName.toString + val file = current.toFile + + if (file.isFile) { + val absPath: String = Paths.get(rootName, + current.toAbsolutePath.toString.substring(rootAbs.size)).toString + val ent = new ZipArchiveEntry(absPath) // get full path name + + ent.setUnixMode(Files.getAttribute(current, "unix:mode").asInstanceOf[Int]) + zipStream.putArchiveEntry(ent) + zipStream.write(Files.readAllBytes(current)) + zipStream.closeArchiveEntry() + } else { + for (child <- file.list()) { + compressDir(root, current.resolve(child), zipStream) + } + } + } + + try { + compressDir(rootPath, rootPath, zipStream) + } catch { + case e: IOException => throw new RuntimeException(e.getCause) + } finally { + zipStream.close() + } + (outputPath.toAbsolutePath, rootPath.getFileName.toString) + } + + if (args.driverJre) { + val SPARK_YARN_DIST_ARCHIVES = "spark.yarn.dist.archives" + val JAVA_HOME = "java.home" + + if (System.getProperty(JAVA_HOME) == null) { + SparkSubmit.printErrorAndExit(s"""Cannot access System property "$JAVA_HOME"""") + } + + val javaHome = System.getProperty(JAVA_HOME) + // zip the archive to a file + val (archiveLocation, jreRoot): (Path, String) = try { + archiveJre(javaHome) + } catch { + case e: IOException => throw new RuntimeException(e.getCause) + } + + // Upload directory as archive + val distArchives = mergeFileLists(sysProps.getOrElse(SPARK_YARN_DIST_ARCHIVES, null), + archiveLocation.toString) + sysProps.put(SPARK_YARN_DIST_ARCHIVES, distArchives) + // Setup the executors to be using the shipped java distribution + val patchedJavaHome = archiveLocation.getFileName.toString ++ "/" ++ jreRoot + sysProps("spark.executorEnv.JAVA_HOME") = patchedJavaHome + sysProps("spark.yarn.appMasterEnv.JAVA_HOME") = patchedJavaHome + } + (childArgs, childClasspath, sysProps, childMainClass) } + /** * Run the main method of the child class using the provided launch environment. * diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1ec92..48e25936a44ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -48,6 +48,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var driverExtraClassPath: String = null var driverExtraLibraryPath: String = null var driverExtraJavaOptions: String = null + var driverJre: Boolean = false var queue: String = null var numExecutors: String = null var files: String = null @@ -249,6 +250,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S if (pyFiles != null && !isPython) { SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") } + if (!master.startsWith("yarn") && driverJre) { + SparkSubmit.printErrorAndExit("Cannot ship driver JRE except when running on YARN") + } if (master.startsWith("yarn")) { val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR") @@ -291,6 +295,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S s"""Parsed arguments: | master $master | deployMode $deployMode + | driverJre $driverJre | executorMemory $executorMemory | executorCores $executorCores | totalExecutorCores $totalExecutorCores @@ -364,6 +369,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case DRIVER_JAVA_OPTIONS => driverExtraJavaOptions = value + case DRIVER_JRE => + driverJre = true + case DRIVER_LIBRARY_PATH => driverExtraLibraryPath = value @@ -516,6 +524,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M). | --driver-java-options Extra Java options to pass to the driver. + | --driver-jre When specified, the driver will package up and ship a copy + | of the driver's JRE to be used for running the executors and + | Application Master on YARN. | --driver-library-path Extra library path entries to pass to the driver. | --driver-class-path Extra class path entries to pass to the driver. Note that | jars added with --jars are automatically included in the diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 0b020592b06d3..fdb04722464d9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -255,6 +255,33 @@ class SparkSubmitSuite sysProps("spark.ui.enabled") should be ("false") } + test("handles --driver-jre option without YARN") { + val clArgs = Array( + "--driver-jre", + "--master", "local[1]", + "--class", "Fake", "fake.jar") + val printStream = new BufferPrintStream + SparkSubmit.printStream = printStream + SparkSubmit.exitFn = (_) => throw new RuntimeException + try { + new SparkSubmitArguments(clArgs) + } catch { + case e: Throwable => + val output = printStream.lineBuffer.mkString("\n") + output should include regex ("Cannot ship driver JRE") + } + } + + test("handles --driver-jre option when using YARN") { + val clArgs = Seq( + "--driver-jre", + "--master", "yarn", + "--class", "Fake", + "fake.jar") + val appArgs = new SparkSubmitArguments(clArgs) + appArgs.driverJre should be (true) + } + test("handles standalone cluster mode") { testStandaloneCluster(useRest = true) } diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index b5c38a6c056ec..6f63559f0cf16 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -25,7 +25,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-2.7.6.jar -commons-compress-1.4.1.jar +commons-compress-1.12.jar commons-configuration-1.6.jar commons-dbcp-1.4.jar commons-digester-1.8.jar @@ -161,5 +161,4 @@ univocity-parsers-2.1.1.jar validation-api-1.1.0.Final.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar -xz-1.0.jar zookeeper-3.4.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index 969df0495d4c9..42fbd3e340015 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -28,7 +28,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-2.7.6.jar -commons-compress-1.4.1.jar +commons-compress-1.12.jar commons-configuration-1.6.jar commons-dbcp-1.4.jar commons-digester-1.8.jar @@ -169,5 +169,4 @@ univocity-parsers-2.1.1.jar validation-api-1.1.0.Final.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar -xz-1.0.jar zookeeper-3.4.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 501bf586a3934..2c09b47498561 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -28,7 +28,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-2.7.6.jar -commons-compress-1.4.1.jar +commons-compress-1.12.jar commons-configuration-1.6.jar commons-dbcp-1.4.jar commons-digester-1.8.jar @@ -169,5 +169,4 @@ univocity-parsers-2.1.1.jar validation-api-1.1.0.Final.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar -xz-1.0.jar zookeeper-3.4.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index b915727f46888..c33c708ea9e03 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -32,7 +32,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-2.7.6.jar -commons-compress-1.4.1.jar +commons-compress-1.12.jar commons-configuration-1.6.jar commons-dbcp-1.4.jar commons-digester-1.8.jar @@ -178,5 +178,4 @@ validation-api-1.1.0.Final.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar -xz-1.0.jar zookeeper-3.4.6.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index f752eaab660a6..44151a4b1009f 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -32,7 +32,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-2.7.6.jar -commons-compress-1.4.1.jar +commons-compress-1.12.jar commons-configuration-1.6.jar commons-dbcp-1.4.jar commons-digester-1.8.jar @@ -179,5 +179,4 @@ validation-api-1.1.0.Final.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar -xz-1.0.jar zookeeper-3.4.6.jar diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index c7488082ca899..f28ec0e5fe3d9 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -39,6 +39,7 @@ abstract class AbstractCommandBuilder { boolean verbose; + boolean driverJre; String appName; String appResource; String deployMode; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index 08873f5811238..dc191fe7e215b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -358,6 +358,16 @@ public SparkLauncher setVerbose(boolean verbose) { return this; } + /** + * Enables shipping of driver JRE to the YARN cluster nodes. + * + * This ensures that all executors run with the same version of the JRE as the client. + */ + public SparkLauncher withDriverJre() { + builder.driverJre = true; + return this; + } + /** * Launches a sub-process that will start the configured Spark application. *

diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc5079649..3bc43ad352813 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -42,6 +42,7 @@ class SparkSubmitOptionParser { protected final String DRIVER_CLASS_PATH = "--driver-class-path"; protected final String DRIVER_CORES = "--driver-cores"; protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options"; + protected final String DRIVER_JRE = "--driver-jre"; protected final String DRIVER_LIBRARY_PATH = "--driver-library-path"; protected final String DRIVER_MEMORY = "--driver-memory"; protected final String EXECUTOR_MEMORY = "--executor-memory"; @@ -121,6 +122,7 @@ class SparkSubmitOptionParser { * List of switches (command line options that do not take parameters) recognized by spark-submit. */ final String[][] switches = { + { DRIVER_JRE }, { HELP, "-h" }, { SUPERVISE }, { USAGE_ERROR }, diff --git a/pom.xml b/pom.xml index 89ed87ff9e844..0fd92cc7e7bb5 100644 --- a/pom.xml +++ b/pom.xml @@ -170,6 +170,8 @@ 2.6 3.3.2 + + 1.12 3.2.10 2.7.8 2.22.2 @@ -380,6 +382,11 @@ commons-lang ${commons-lang2.version} + + org.apache.commons + commons-compress + ${commons-compress.version} + commons-io commons-io