From 8810beb215e8b3b646e0ae32794a16dd5da1eb65 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 30 Jun 2015 18:15:00 -0700 Subject: [PATCH 01/16] R packages support --- .../apache/spark/deploy/RPackageUtils.scala | 125 ++++++++++++++++++ .../org/apache/spark/deploy/SparkSubmit.scala | 5 + 2 files changed, 130 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala new file mode 100644 index 0000000000000..e880fbe4ea850 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{FileOutputStream, PrintStream, File} +import java.net.URI +import java.util.jar.JarFile + +import org.apache.spark.util.Utils + +import scala.sys.process._ + +private[deploy] object RPackageUtils { + + /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */ + private final val hasRPackage = "Spark-HasRPackage" + + /** Base of the shell command used in order to install R packages. */ + private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l") + + /** R source code should exist under R/pkg in a jar. */ + private final val RJarEntries = "R/pkg" + + /** + * Checks the manifest of the Jar whether there is any R source code bundled with it. + * Exposed for testing. + */ + private[deploy] def checkManifestForR(jar: JarFile): Boolean = { + val manifest = jar.getManifest.getMainAttributes + manifest.getValue(hasRPackage) != null && manifest.getValue(hasRPackage).trim == "true" + } + + /** + * Runs the standard R package installation code to build the R package from source. + * Multiple runs don't cause problems. Exposed for testing. + */ + private[deploy] def rPackageBuilder( + dir: File, + printStream: PrintStream, + verbose: Boolean): Boolean = { + val sparkHome = sys.env.get("SPARK_HOME").orNull + if (sparkHome == null) throw new IllegalArgumentException("SPARK_HOME not set!") + val pathToSparkR = Seq(sparkHome, "R", "lib").mkString(File.separator) + val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) + val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg) + if (verbose) { + printStream.println(s"Building R package with the command: $installCmd") + } + (installCmd #> printStream).! == 0 + } + + /** + * Extracts the files under /R in the jar to a temporary directory for building. + */ + private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: Boolean): File = { + val tempDir = Utils.createTempDir(null) + val jarEntries = jar.entries() + while (jarEntries.hasMoreElements) { + val entry = jarEntries.nextElement() + val entryRIndex = entry.getName.indexOf(RJarEntries) + if (entryRIndex > -1) { + val entryPath = entry.getName.substring(entryRIndex) + if (entry.isDirectory) { + val dir = new File(tempDir, entryPath) + if (verbose) { + printStream.println(s"Creating directory: $dir") + } + dir.mkdirs + } else { + val inStream = jar.getInputStream(entry) + val outPath = new File(tempDir, entryPath) + val outStream = new FileOutputStream(outPath) + if (verbose) { + printStream.println(s"Extracting $entry to $outPath") + } + Utils.copyStream(inStream, outStream, closeStreams = true) + } + } + } + tempDir + } + + /** + * Extracts the files under /R in the jar to a temporary directory for building. + */ + private[deploy] def checkAndBuildRPackage( + jars: String, + printStream: PrintStream, + verbose: Boolean): Unit = { + jars.split(",").foreach { jarUri => + val file = new File(new URI(jarUri)) + if (file.exists()) { + val jar = new JarFile(file) + if (checkManifestForR(jar)) { + printStream.println(s"$file contains R source code. Now installing package.") + val rSource = extractRFolder(jar, printStream, verbose) + if (!rPackageBuilder(rSource, printStream, verbose)) { + printStream.println(s"ERROR: Failed to build R package in $file.") + } + } else { + if (verbose) { + printStream.println(s"$file doesn't contain R source code, skipping...") + } + } + } else { + printStream.println(s"WARN: $file resolved as dependency, but not found.") + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b1d6ec209d62b..eb747ebeed776 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -276,6 +276,11 @@ object SparkSubmit { } } } + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local From 0226768c1cddce134253b9d817098880f04de0a5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 1 Jul 2015 12:20:50 -0700 Subject: [PATCH 02/16] fixed issues --- .../apache/spark/deploy/RPackageUtils.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index e880fbe4ea850..1014e02fbba95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -21,9 +21,9 @@ import java.io.{FileOutputStream, PrintStream, File} import java.net.URI import java.util.jar.JarFile -import org.apache.spark.util.Utils +import org.apache.spark.util.{RedirectThread, Utils} -import scala.sys.process._ +import scala.collection.JavaConversions._ private[deploy] object RPackageUtils { @@ -61,7 +61,19 @@ private[deploy] object RPackageUtils { if (verbose) { printStream.println(s"Building R package with the command: $installCmd") } - (installCmd #> printStream).! == 0 + try { + val builder = new ProcessBuilder(installCmd) + builder.redirectErrorStream(true) + val env = builder.environment() + env.clear() + val process = builder.start() + new RedirectThread(process.getInputStream, printStream, "redirect R output").start() + process.waitFor() == 0 + } catch { + case e: Throwable => + printStream.println(e.getMessage + "\n" + e.getStackTrace) + false + } } /** @@ -109,8 +121,12 @@ private[deploy] object RPackageUtils { if (checkManifestForR(jar)) { printStream.println(s"$file contains R source code. Now installing package.") val rSource = extractRFolder(jar, printStream, verbose) - if (!rPackageBuilder(rSource, printStream, verbose)) { - printStream.println(s"ERROR: Failed to build R package in $file.") + try { + if (!rPackageBuilder(rSource, printStream, verbose)) { + printStream.println(s"ERROR: Failed to build R package in $file.") + } + } finally { + rSource.delete() // clean up } } else { if (verbose) { From bb751ce029daa90df951bf37843f9e29303ddf64 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 2 Jul 2015 12:43:19 -0700 Subject: [PATCH 03/16] fix null bug --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 ++++---- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index eb747ebeed776..64571155c3732 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -262,14 +262,14 @@ object SparkSubmit { val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates( args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { + if (resolvedMavenCoordinates.trim.nonEmpty) { + if (!args.isNonEmptyArg(args.jars)) { args.jars = resolvedMavenCoordinates } else { args.jars += s",$resolvedMavenCoordinates" } if (args.isPython) { - if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + if (!args.isNonEmptyArg(args.pyFiles)) { args.pyFiles = resolvedMavenCoordinates } else { args.pyFiles += s",$resolvedMavenCoordinates" @@ -278,7 +278,7 @@ object SparkSubmit { } // install any R packages that may have been passed through --jars or --packages. // Spark Packages may contain R source code inside the jar. - if (args.isR) { + if (args.isR && args.isNonEmptyArg(args.jars)) { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index b7429a901e162..e50da09b2a66b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -595,4 +595,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } } + private[deploy] def isNonEmptyArg(arg: String): Boolean = { + arg != null && arg.nonEmpty + } } From e5b5a06de567f12617cdccf1e7e651ad7ef347b5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 8 Jul 2015 11:49:39 -0700 Subject: [PATCH 04/16] added doc --- .../apache/spark/deploy/RPackageUtils.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 1014e02fbba95..e51e8c2bf2265 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -36,6 +36,29 @@ private[deploy] object RPackageUtils { /** R source code should exist under R/pkg in a jar. */ private final val RJarEntries = "R/pkg" + /** Documentation on how the R source file layout should be in the jar. */ + private final val RJarDoc = + s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few + |requirements. The R source code must be shipped in a jar, with additional Java/Scala + |classes. The jar must be in the following format: + | 1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: $hasRPackage: true + | 2- The standard R package layout must be preserved under R/pkg/ inside the jar. More + | information on the standard R package layout can be found in: + | http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf + | An example layout is given below. After running `jar tf $$JAR_FILE | sort`: + | + |META-INF/MANIFEST.MF + |R/ + |R/pkg/ + |R/pkg/DESCRIPTION + |R/pkg/NAMESPACE + |R/pkg/R/ + |R/pkg/R/myRcode.R + |org/ + |org/apache/ + |... + """.stripMargin.trim + /** * Checks the manifest of the Jar whether there is any R source code bundled with it. * Exposed for testing. @@ -124,6 +147,7 @@ private[deploy] object RPackageUtils { try { if (!rPackageBuilder(rSource, printStream, verbose)) { printStream.println(s"ERROR: Failed to build R package in $file.") + printStream.println(RJarDoc) } } finally { rSource.delete() // clean up From eff5ba15333085adb9f95f1a953cf2c5f506fd2a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 8 Jul 2015 22:55:17 -0700 Subject: [PATCH 05/16] ready for review --- R/pkg/inst/tests/packageInAJarTest.R | 33 ++++++ .../apache/spark/deploy/RPackageUtils.scala | 19 ++-- .../apache/spark/deploy/IvyTestUtils.scala | 99 +++++++++++++---- .../spark/deploy/RPackageUtilsSuite.scala | 100 ++++++++++++++++++ .../spark/deploy/SparkSubmitSuite.scala | 20 +++- 5 files changed, 237 insertions(+), 34 deletions(-) create mode 100644 R/pkg/inst/tests/packageInAJarTest.R create mode 100644 core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala diff --git a/R/pkg/inst/tests/packageInAJarTest.R b/R/pkg/inst/tests/packageInAJarTest.R new file mode 100644 index 0000000000000..748650480a89d --- /dev/null +++ b/R/pkg/inst/tests/packageInAJarTest.R @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +library(SparkR) +library(sparkPackageTest) + +sc <- sparkR.init() + +run1 <- myfunc(5L) + +run2 <- myfunc(-4L) + +sparkR.stop() + +print(run1) +print(run2) + +if(run1 != 6) quit(save = "no", status = 1) + +if(run2 != -3) quit(save = "no", status = 1) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index e51e8c2bf2265..205fc7b3a3b73 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -21,6 +21,7 @@ import java.io.{FileOutputStream, PrintStream, File} import java.net.URI import java.util.jar.JarFile +import com.google.common.io.Files import org.apache.spark.util.{RedirectThread, Utils} import scala.collection.JavaConversions._ @@ -37,7 +38,7 @@ private[deploy] object RPackageUtils { private final val RJarEntries = "R/pkg" /** Documentation on how the R source file layout should be in the jar. */ - private final val RJarDoc = + private[deploy] final val RJarDoc = s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few |requirements. The R source code must be shipped in a jar, with additional Java/Scala |classes. The jar must be in the following format: @@ -70,13 +71,10 @@ private[deploy] object RPackageUtils { /** * Runs the standard R package installation code to build the R package from source. - * Multiple runs don't cause problems. Exposed for testing. + * Multiple runs don't cause problems. */ - private[deploy] def rPackageBuilder( - dir: File, - printStream: PrintStream, - verbose: Boolean): Boolean = { - val sparkHome = sys.env.get("SPARK_HOME").orNull + private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = { + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home")).orNull if (sparkHome == null) throw new IllegalArgumentException("SPARK_HOME not set!") val pathToSparkR = Seq(sparkHome, "R", "lib").mkString(File.separator) val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) @@ -90,7 +88,7 @@ private[deploy] object RPackageUtils { val env = builder.environment() env.clear() val process = builder.start() - new RedirectThread(process.getInputStream, printStream, "redirect R output").start() + new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start() process.waitFor() == 0 } catch { case e: Throwable => @@ -119,6 +117,7 @@ private[deploy] object RPackageUtils { } else { val inStream = jar.getInputStream(entry) val outPath = new File(tempDir, entryPath) + Files.createParentDirs(outPath) val outStream = new FileOutputStream(outPath) if (verbose) { printStream.println(s"Extracting $entry to $outPath") @@ -137,8 +136,8 @@ private[deploy] object RPackageUtils { jars: String, printStream: PrintStream, verbose: Boolean): Unit = { - jars.split(",").foreach { jarUri => - val file = new File(new URI(jarUri)) + jars.split(",").foreach { jarPath => + val file = new File(jarPath) if (file.exists()) { val jar = new JarFile(file) if (checkManifestForR(jar)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 823050b0aabbe..b212495d9d104 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -19,6 +19,10 @@ package org.apache.spark.deploy import java.io.{File, FileInputStream, FileOutputStream} import java.util.jar.{JarEntry, JarOutputStream} +import java.util.jar.Attributes.Name +import java.util.jar.Manifest + +import scala.collection.mutable.ArrayBuffer import com.google.common.io.{Files, ByteStreams} @@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils { * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` * or `pom`. */ - private def pathFromCoordinate( + private[deploy] def pathFromCoordinate( artifact: MavenCoordinate, prefix: File, ext: String, @@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils { } /** Returns the artifact naming based on standard ivy or maven format. */ - private def artifactName( + private[deploy] def artifactName( artifact: MavenCoordinate, useIvyLayout: Boolean, ext: String = ".jar"): String = { @@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils { writeFile(dir, "mylib.py", contents) } + /** Create an example R package that calls the given Java class. */ + private def createRFiles( + dir: File, + className: String, + packageName: String): Seq[(String, File)] = { + val rFilesDir = new File(dir, "R" + File.separator + "pkg") + Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R")) + val contents = + s"""myfunc <- function(x) { + | SparkR:::callJStatic("$packageName.$className", "myFunc", x) + |} + """.stripMargin + val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents) + val description = + """Package: sparkPackageTest + |Type: Package + |Title: Test for building an R package + |Version: 0.1 + |Date: 2015-07-08 + |Author: Burak Yavuz + |Imports: methods, SparkR + |Depends: R (>= 3.1), methods, SparkR + |Suggests: testthat + |Description: Test for building an R package within a jar + |License: Apache License (== 2.0) + |Collate: 'mylib.R' + """.stripMargin + val descFile = writeFile(rFilesDir, "DESCRIPTION", description) + val namespace = + """import(SparkR) + |export("myfunc") + """.stripMargin + val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace) + Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), ("R/pkg/NAMESPACE", nameFile)) + } + /** Create a simple testable Class. */ private def createJavaClass(dir: File, className: String, packageName: String): File = { val contents = @@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils { | |import java.lang.Integer; | - |class $className implements java.io.Serializable { - | - | public $className() {} - | - | public Integer myFunc(Integer x) { + |public class $className implements java.io.Serializable { + | public static Integer myFunc(Integer x) { | return x + 1; | } |} """.stripMargin val sourceFile = - new JavaSourceFromString(new File(dir, className + ".java").getAbsolutePath, contents) + new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents) createCompiledClass(className, dir, sourceFile, Seq.empty) } @@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils { } /** Create the jar for the given maven coordinate, using the supplied files. */ - private def packJar( + private[deploy] def packJar( dir: File, artifact: MavenCoordinate, files: Seq[(String, File)], - useIvyLayout: Boolean): File = { + useIvyLayout: Boolean, + withR: Boolean, + withManifest: Option[Manifest] = None): File = { val jarFile = new File(dir, artifactName(artifact, useIvyLayout)) val jarFileStream = new FileOutputStream(jarFile) - val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) + val manifest = withManifest.getOrElse { + val mani = new Manifest() + if (withR) { + val attr = mani.getMainAttributes + attr.put(Name.MANIFEST_VERSION, "1.0") + attr.put(new Name("Spark-HasRPackage"), "true") + } + mani + } + val jarStream = new JarOutputStream(jarFileStream, manifest) for (file <- files) { val jarEntry = new JarEntry(file._1) @@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils { dependencies: Option[Seq[MavenCoordinate]] = None, tempDir: Option[File] = None, useIvyLayout: Boolean = false, - withPython: Boolean = false): File = { + withPython: Boolean = false, + withR: Boolean = false): File = { // Where the root of the repository exists, and what Ivy will search in val tempPath = tempDir.getOrElse(Files.createTempDir()) // Create directory if it doesn't exist @@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils { val javaClass = createJavaClass(root, className, artifact.groupId) // A tuple of files representation in the jar, and the file val javaFile = (artifact.groupId.replace(".", "/") + "/" + javaClass.getName, javaClass) - val allFiles = - if (withPython) { - val pythonFile = createPythonFile(root) - Seq(javaFile, (pythonFile.getName, pythonFile)) - } else { - Seq(javaFile) - } - val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout) + val allFiles = ArrayBuffer[(String, File)](javaFile) + if (withPython) { + val pythonFile = createPythonFile(root) + allFiles.append((pythonFile.getName, pythonFile)) + } + if (withR) { + val rFiles = createRFiles(root, className, artifact.groupId) + allFiles.append(rFiles: _*) + } + val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR) assert(jarFile.exists(), "Problem creating Jar file") val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout) assert(descriptor.exists(), "Problem creating Pom file") @@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils { dependencies: Option[String], rootDir: Option[File], useIvyLayout: Boolean = false, - withPython: Boolean = false): File = { + withPython: Boolean = false, + withR: Boolean = false): File = { val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) - val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython) + val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR) deps.foreach { seq => seq.foreach { dep => createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false) }} @@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils { rootDir: Option[File], useIvyLayout: Boolean = false, withPython: Boolean = false, + withR: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) purgeLocalIvyCache(artifact, deps, ivySettings) val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, - withPython) + withPython, withR) try { f(repo.toURI.toString) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala new file mode 100644 index 0000000000000..a027650ffb3c9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -0,0 +1,100 @@ +package org.apache.spark.deploy + +import java.io.{PrintStream, OutputStream, File} +import java.net.URI +import java.util.jar.Attributes.Name +import java.util.jar.{JarFile, Manifest} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.scalatest.BeforeAndAfterEach + +import scala.collection.mutable.ArrayBuffer + +class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { + + private val main = MavenCoordinate("a", "b", "c") + private val dep1 = MavenCoordinate("a", "dep1", "c") + private val dep2 = MavenCoordinate("a", "dep2", "d") + + private def getJarPath(coord: MavenCoordinate, repo: File): File = { + new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false), + IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar")) + } + + private val lineBuffer = ArrayBuffer[String]() + + private val noOpOutputStream = new OutputStream { + def write(b: Int) = {} + } + + /** Simple PrintStream that reads data into a buffer */ + private class BufferPrintStream extends PrintStream(noOpOutputStream) { + override def println(line: String) { + lineBuffer += line + } + } + + def beforeAll() { + System.setProperty("spark.testing", "true") + } + + override def beforeEach(): Unit = { + lineBuffer.clear() + } + + test("pick which jars to unpack using the manifest") { + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo))))) + assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") + assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") + assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") + } + } + + test("build an R package from a jar end to end") { + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map { c => + getJarPath(c, new File(new URI(repo))) + }.mkString(",") + RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true) + val firstJar = jars.substring(0, jars.indexOf(",")) + val output = lineBuffer.mkString("\n") + assert(output.contains("Building R package")) + assert(output.contains("Extracting")) + assert(output.contains(s"$firstJar contains R source code. Now installing package.")) + assert(output.contains("doesn't contain R source code, skipping...")) + } + } + + test("jars that don't exist are skipped and print warning") { + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map { c => + getJarPath(c, new File(new URI(repo))) + "dummy" + }.mkString(",") + RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true) + val individualJars = jars.split(",") + val output = lineBuffer.mkString("\n") + individualJars.foreach { jarFile => + assert(output.contains(s"WARN: $jarFile")) + } + } + } + + test("faulty R package shows documentation") { + IvyTestUtils.withRepository(main, None, None) { repo => + val manifest = new Manifest + val attr = manifest.getMainAttributes + attr.put(Name.MANIFEST_VERSION, "1.0") + attr.put(new Name("Spark-HasRPackage"), "true") + val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil, + useIvyLayout = false, withR = false, Some(manifest)) + RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, true) + val output = lineBuffer.mkString("\n") + assert(output.contains(RPackageUtils.RJarDoc)) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 2e05dec99b6bf..dca4e5bc62e0d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -349,7 +349,7 @@ class SparkSubmitSuite "--class", JarCreationTest.getClass.getName.stripSuffix("$"), "--name", "testApp", "--master", "local-cluster[2,1,512]", - "--packages", Seq(main, dep).mkString(","), + "--packages", main.toString, "--repositories", repo, "--conf", "spark.ui.enabled=false", unusedJar.toString, @@ -358,6 +358,24 @@ class SparkSubmitSuite } } + test("correctly builds R packages included in a jar with --packages") { + val main = MavenCoordinate("my.great.lib", "mylib", "0.1") + val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) + val rScriptDir = + Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator) + assert(new File(rScriptDir).exists) + IvyTestUtils.withRepository(main, None, None, withR = true) { repo => + val args = Seq( + "--name", "testApp", + "--master", "local-cluster[2,1,512]", + "--packages", main.toString, + "--repositories", repo, + "--conf", "spark.ui.enabled=false", + rScriptDir) + runSparkSubmit(args) + } + } + test("resolves command line argument paths correctly") { val jars = "/jar1,/jar2" // --jars val files = "hdfs:/file1,file2" // --files From d8677565957f9c1bda03f9bc7cc531684548f9e3 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 9 Jul 2015 00:30:59 -0700 Subject: [PATCH 06/16] add apache header --- .../spark/deploy/RPackageUtilsSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index a027650ffb3c9..b611d1bce62b3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy import java.io.{PrintStream, OutputStream, File} From 9778e03916c1f605c4003a935b15135e02567a79 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Jul 2015 13:53:09 -0700 Subject: [PATCH 07/16] addressed comments --- R/pkg/inst/tests/packageInAJarTest.R | 3 - .../scala/org/apache/spark/SparkContext.scala | 6 +- .../org/apache/spark/api/r/RBackend.scala | 1 + .../scala/org/apache/spark/api/r/RUtils.scala | 6 +- .../apache/spark/deploy/RPackageUtils.scala | 57 +++++++++++++------ .../org/apache/spark/deploy/SparkSubmit.scala | 5 -- .../spark/deploy/RPackageUtilsSuite.scala | 5 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 3 + 8 files changed, 55 insertions(+), 31 deletions(-) diff --git a/R/pkg/inst/tests/packageInAJarTest.R b/R/pkg/inst/tests/packageInAJarTest.R index 748650480a89d..207a37a0cb47f 100644 --- a/R/pkg/inst/tests/packageInAJarTest.R +++ b/R/pkg/inst/tests/packageInAJarTest.R @@ -25,9 +25,6 @@ run2 <- myfunc(-4L) sparkR.stop() -print(run1) -print(run2) - if(run1 != 6) quit(save = "no", status = 1) if(run2 != -3) quit(save = "no", status = 1) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac6ac6c216767..020e83ce3f9f5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -46,7 +46,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.deploy.{RPackageUtils, LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump} import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat} @@ -1662,6 +1662,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } if (key != null) { addedJars(key) = System.currentTimeMillis + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. This method needs to be here + // so that the package is always installed on the driver. + RPackageUtils.checkAndBuildRPackage(path) logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index b7e72d4d0ed0b..3b43fc7b1362a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -28,6 +28,7 @@ import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.LengthFieldBasedFrameDecoder import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder} +import org.apache.spark.deploy.RPackageUtils import org.apache.spark.{Logging, SparkConf} diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index d53abd3408c55..1e2997f1cd822 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -26,7 +26,7 @@ private[spark] object RUtils { * Get the SparkR package path in the local spark distribution. */ def localSparkRPackagePath: Option[String] = { - val sparkHome = sys.env.get("SPARK_HOME") + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home")) sparkHome.map( Seq(_, "R", "lib").mkString(File.separator) ) @@ -46,8 +46,8 @@ private[spark] object RUtils { (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode")) } - val isYarnCluster = master.contains("yarn") && deployMode == "cluster" - val isYarnClient = master.contains("yarn") && deployMode == "client" + val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster" + val isYarnClient = master != null && master.contains("yarn") && deployMode == "client" // In YARN mode, the SparkR package is distributed as an archive symbolically // linked to the "sparkr" file in the current directory. Note that this does not apply diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 205fc7b3a3b73..56601a7b6ade3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -18,15 +18,18 @@ package org.apache.spark.deploy import java.io.{FileOutputStream, PrintStream, File} -import java.net.URI import java.util.jar.JarFile +import java.util.logging.Level import com.google.common.io.Files + +import org.apache.spark.Logging +import org.apache.spark.api.r.RUtils import org.apache.spark.util.{RedirectThread, Utils} import scala.collection.JavaConversions._ -private[deploy] object RPackageUtils { +private[spark] object RPackageUtils extends Logging { /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */ private final val hasRPackage = "Spark-HasRPackage" @@ -74,13 +77,12 @@ private[deploy] object RPackageUtils { * Multiple runs don't cause problems. */ private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home")).orNull - if (sparkHome == null) throw new IllegalArgumentException("SPARK_HOME not set!") - val pathToSparkR = Seq(sparkHome, "R", "lib").mkString(File.separator) + // this code should be always running on the driver. + val pathToSparkR = RUtils.sparkRPackagePath(isDriver = true) val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg) if (verbose) { - printStream.println(s"Building R package with the command: $installCmd") + print(s"Building R package with the command: $installCmd", printStream) } try { val builder = new ProcessBuilder(installCmd) @@ -92,7 +94,7 @@ private[deploy] object RPackageUtils { process.waitFor() == 0 } catch { case e: Throwable => - printStream.println(e.getMessage + "\n" + e.getStackTrace) + print("Failed to build R package.", printStream, Level.SEVERE, e) false } } @@ -111,7 +113,7 @@ private[deploy] object RPackageUtils { if (entry.isDirectory) { val dir = new File(tempDir, entryPath) if (verbose) { - printStream.println(s"Creating directory: $dir") + print(s"Creating directory: $dir", printStream) } dir.mkdirs } else { @@ -120,7 +122,7 @@ private[deploy] object RPackageUtils { Files.createParentDirs(outPath) val outStream = new FileOutputStream(outPath) if (verbose) { - printStream.println(s"Extracting $entry to $outPath") + print(s"Extracting $entry to $outPath", printStream) } Utils.copyStream(inStream, outStream, closeStreams = true) } @@ -132,32 +134,53 @@ private[deploy] object RPackageUtils { /** * Extracts the files under /R in the jar to a temporary directory for building. */ - private[deploy] def checkAndBuildRPackage( + private[spark] def checkAndBuildRPackage( jars: String, - printStream: PrintStream, - verbose: Boolean): Unit = { + printStream: PrintStream = null, + verbose: Boolean = false): Unit = { jars.split(",").foreach { jarPath => val file = new File(jarPath) if (file.exists()) { val jar = new JarFile(file) if (checkManifestForR(jar)) { - printStream.println(s"$file contains R source code. Now installing package.") + print(s"$file contains R source code. Now installing package.", printStream, Level.INFO) val rSource = extractRFolder(jar, printStream, verbose) try { if (!rPackageBuilder(rSource, printStream, verbose)) { - printStream.println(s"ERROR: Failed to build R package in $file.") - printStream.println(RJarDoc) + print(s"ERROR: Failed to build R package in $file.", printStream) + print(RJarDoc, printStream) } } finally { rSource.delete() // clean up } } else { if (verbose) { - printStream.println(s"$file doesn't contain R source code, skipping...") + print(s"$file doesn't contain R source code, skipping...", printStream) } } } else { - printStream.println(s"WARN: $file resolved as dependency, but not found.") + print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING) + } + } + } + + /** Internal method for logging. We log to a printStream in tests, for debugging purposes. */ + private def print( + msg: String, + printStream: PrintStream, + level: Level = Level.FINE, + e: Throwable = null): Unit = { + if (printStream != null) { + printStream.println(msg) + if (e != null) { + e.printStackTrace(printStream) + } + } else { + level match { + case Level.INFO => logInfo(msg) + case Level.WARNING => logWarning(msg) + case Level.SEVERE => logError(msg, e) + case _ => logDebug(msg) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 743f089974092..b9728cf479352 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -292,11 +292,6 @@ object SparkSubmit { } } } - // install any R packages that may have been passed through --jars or --packages. - // Spark Packages may contain R source code inside the jar. - if (args.isR && args.isNonEmptyArg(args.jars)) { - RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) - } // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index b611d1bce62b3..472636af78887 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -96,7 +96,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { val individualJars = jars.split(",") val output = lineBuffer.mkString("\n") individualJars.foreach { jarFile => - assert(output.contains(s"WARN: $jarFile")) + assert(output.contains(s"$jarFile")) } } } @@ -109,7 +109,8 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { attr.put(new Name("Spark-HasRPackage"), "true") val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil, useIvyLayout = false, withR = false, Some(manifest)) - RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, true) + RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, + verbose = true) val output = lineBuffer.mkString("\n") assert(output.contains(RPackageUtils.RJarDoc)) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 547863d9a0739..9b78d10cd3777 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -22,6 +22,9 @@ import java.net.URL import java.util.Properties import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.IvyTestUtils +import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate + import scala.collection.JavaConversions._ import scala.collection.mutable From 1bc5554563acb7bbebe2fd582799a1cb9d57296f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Jul 2015 14:09:36 -0700 Subject: [PATCH 08/16] add assumes for tests --- core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 5 +++++ .../scala/org/apache/spark/deploy/RPackageUtilsSuite.scala | 6 +++++- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 1e2997f1cd822..35767ee9cd55a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.api.r import java.io.File +import scala.sys.process._ + import org.apache.spark.{SparkEnv, SparkException} private[spark] object RUtils { @@ -62,4 +64,7 @@ private[spark] object RUtils { } } } + + /** Check if R is installed before running tests that use R commands. */ + def isRInstalled: Boolean = Seq("R", "--version").! == 0 } diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 472636af78887..59153450c15f4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -23,6 +23,7 @@ import java.util.jar.Attributes.Name import java.util.jar.{JarFile, Manifest} import org.apache.spark.SparkFunSuite +import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.scalatest.BeforeAndAfterEach @@ -71,6 +72,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { } test("build an R package from a jar end to end") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") val deps = Seq(dep1, dep2).mkString(",") IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => val jars = Seq(main, dep1, dep2).map { c => @@ -87,6 +89,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { } test("jars that don't exist are skipped and print warning") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") val deps = Seq(dep1, dep2).mkString(",") IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => val jars = Seq(main, dep1, dep2).map { c => @@ -102,6 +105,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { } test("faulty R package shows documentation") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") IvyTestUtils.withRepository(main, None, None) { repo => val manifest = new Manifest val attr = manifest.getMainAttributes @@ -109,7 +113,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { attr.put(new Name("Spark-HasRPackage"), "true") val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil, useIvyLayout = false, withR = false, Some(manifest)) - RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, + RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, verbose = true) val output = lineBuffer.mkString("\n") assert(output.contains(RPackageUtils.RJarDoc)) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 90dfc393f6099..a453aa5e154fe 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.io._ +import org.apache.spark.api.r.RUtils + import scala.collection.mutable.ArrayBuffer import com.google.common.base.Charsets.UTF_8 @@ -363,6 +365,7 @@ class SparkSubmitSuite } test("correctly builds R packages included in a jar with --packages") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") val main = MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val rScriptDir = From e6bf7b0b3cb48a7c7ee99da27e2ac034e55d245a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Jul 2015 14:13:44 -0700 Subject: [PATCH 09/16] add println ignores --- core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala | 2 ++ .../test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 56601a7b6ade3..b8d62907c9c65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -171,7 +171,9 @@ private[spark] object RPackageUtils extends Logging { level: Level = Level.FINE, e: Throwable = null): Unit = { if (printStream != null) { + // scalastyle:off println printStream.println(msg) + // scalastyle:on println if (e != null) { e.printStackTrace(printStream) } diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 59153450c15f4..646cacb13c4da 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -48,7 +48,9 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { /** Simple PrintStream that reads data into a buffer */ private class BufferPrintStream extends PrintStream(noOpOutputStream) { + // scalastyle:off println override def println(line: String) { + // scalastyle:on println lineBuffer += line } } From ac45527acd6209f9a7de54802eea89b41b7d6692 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 31 Jul 2015 14:04:46 -0700 Subject: [PATCH 10/16] added zipping of all libs --- .../scala/org/apache/spark/SparkContext.scala | 4 - .../scala/org/apache/spark/api/r/RUtils.scala | 7 +- .../apache/spark/deploy/RPackageUtils.scala | 94 ++++++++++++++----- .../org/apache/spark/deploy/SparkSubmit.scala | 15 ++- .../spark/deploy/SparkSubmitSuite.scala | 1 + 5 files changed, 87 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 020e83ce3f9f5..852ff2cc17dff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1662,10 +1662,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } if (key != null) { addedJars(key) = System.currentTimeMillis - // install any R packages that may have been passed through --jars or --packages. - // Spark Packages may contain R source code inside the jar. This method needs to be here - // so that the package is always installed on the driver. - RPackageUtils.checkAndBuildRPackage(path) logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 35767ee9cd55a..93b3bea578676 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.api.r import java.io.File -import scala.sys.process._ +import scala.collection.JavaConversions._ import org.apache.spark.{SparkEnv, SparkException} @@ -66,5 +66,8 @@ private[spark] object RUtils { } /** Check if R is installed before running tests that use R commands. */ - def isRInstalled: Boolean = Seq("R", "--version").! == 0 + def isRInstalled: Boolean = { + val builder = new ProcessBuilder(Seq("R", "--version")) + builder.start().waitFor() == 0 + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index b8d62907c9c65..6559adf073b91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -17,19 +17,21 @@ package org.apache.spark.deploy -import java.io.{FileOutputStream, PrintStream, File} +import java.io._ +import java.net.URI import java.util.jar.JarFile import java.util.logging.Level +import java.util.zip.{ZipEntry, ZipOutputStream} -import com.google.common.io.Files +import com.google.common.io.{ByteStreams, Files} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.api.r.RUtils import org.apache.spark.util.{RedirectThread, Utils} import scala.collection.JavaConversions._ -private[spark] object RPackageUtils extends Logging { +private[deploy] object RPackageUtils extends Logging { /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */ private final val hasRPackage = "Spark-HasRPackage" @@ -63,6 +65,29 @@ private[spark] object RPackageUtils extends Logging { |... """.stripMargin.trim + /** Internal method for logging. We log to a printStream in tests, for debugging purposes. */ + private def print( + msg: String, + printStream: PrintStream, + level: Level = Level.FINE, + e: Throwable = null): Unit = { + if (printStream != null) { + // scalastyle:off println + printStream.println(msg) + // scalastyle:on println + if (e != null) { + e.printStackTrace(printStream) + } + } else { + level match { + case Level.INFO => logInfo(msg) + case Level.WARNING => logWarning(msg) + case Level.SEVERE => logError(msg, e) + case _ => logDebug(msg) + } + } + } + /** * Checks the manifest of the Jar whether there is any R source code bundled with it. * Exposed for testing. @@ -78,7 +103,8 @@ private[spark] object RPackageUtils extends Logging { */ private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = { // this code should be always running on the driver. - val pathToSparkR = RUtils.sparkRPackagePath(isDriver = true) + val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse( + throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")) val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg) if (verbose) { @@ -134,12 +160,12 @@ private[spark] object RPackageUtils extends Logging { /** * Extracts the files under /R in the jar to a temporary directory for building. */ - private[spark] def checkAndBuildRPackage( + private[deploy] def checkAndBuildRPackage( jars: String, printStream: PrintStream = null, verbose: Boolean = false): Unit = { jars.split(",").foreach { jarPath => - val file = new File(jarPath) + val file = new File(new URI(jarPath)) if (file.exists()) { val jar = new JarFile(file) if (checkManifestForR(jar)) { @@ -164,26 +190,44 @@ private[spark] object RPackageUtils extends Logging { } } - /** Internal method for logging. We log to a printStream in tests, for debugging purposes. */ - private def print( - msg: String, - printStream: PrintStream, - level: Level = Level.FINE, - e: Throwable = null): Unit = { - if (printStream != null) { - // scalastyle:off println - printStream.println(msg) - // scalastyle:on println - if (e != null) { - e.printStackTrace(printStream) - } + private def listFilesRecursively(dir: File): Seq[File] = { + if (!dir.exists()) { + Seq.empty[File] } else { - level match { - case Level.INFO => logInfo(msg) - case Level.WARNING => logWarning(msg) - case Level.SEVERE => logError(msg, e) - case _ => logDebug(msg) + if (dir.isDirectory) { + val subDir = dir.listFiles(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + !dir.getAbsolutePath.contains("SparkR" + File.separator) + } + }) + subDir.flatMap(listFilesRecursively) + } else { + Seq(dir) + } + } + } + + /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */ + private[deploy] def zipRLibraries(dir: File, name: String): File = { + val filesToBundle = listFilesRecursively(dir) + // create a zip file from scratch, do not append to existing file. + val zipFile = new File(dir, name) + zipFile.delete() + val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) + try { + filesToBundle.foreach { file => + // get the relative paths for proper naming in the zip file + val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "") + val fis = new FileInputStream(file) + val zipEntry = new ZipEntry(relPath) + zipOutputStream.putNextEntry(zipEntry) + ByteStreams.copy(fis, zipOutputStream) + zipOutputStream.closeEntry() + fis.close() } + } finally { + zipOutputStream.close() } + zipFile } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b9728cf479352..41b2f0f2c4895 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation import org.apache.ivy.Ivy @@ -81,6 +82,8 @@ object SparkSubmit { private val PYSPARK_SHELL = "pyspark-shell" private val SPARKR_SHELL = "sparkr-shell" private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip" + // contains R packages designed to run with SparkR for distribution in Yarn cluster mode + private val SPARKR_EXTRAS_ARCHIVE = "sparkr-extras.zip" private val CLASS_NOT_FOUND_EXIT_STATUS = 101 @@ -293,6 +296,12 @@ object SparkSubmit { } } + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR && !StringUtils.isEmpty(args.jars)) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -367,9 +376,11 @@ object SparkSubmit { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) + val rExtras = RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_EXTRAS_ARCHIVE) + val extrasURI = Utils.resolveURI(rExtras.getAbsolutePath).toString + "#sparkr-extras" // Assigns a symbol link name "sparkr" to the shipped package. - args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") + args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr", extrasURI) } // If we're running a R app, set the main class to our specific R runner @@ -988,11 +999,9 @@ private[spark] object SparkSubmitUtils { addExclusionRules(ivySettings, ivyConfName, md) // add all supplied maven artifacts as dependencies addDependenciesToIvy(md, artifacts, ivyConfName) - exclusions.foreach { e => md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) } - // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index a453aa5e154fe..b1bfc3b405d8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -377,6 +377,7 @@ class SparkSubmitSuite "--master", "local-cluster[2,1,1024]", "--packages", main.toString, "--repositories", repo, + "--verbose", "--conf", "spark.ui.enabled=false", rScriptDir) runSparkSubmit(args) From 77995df31fc378e12408ce2b8ec1b6883720f2c2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 3 Aug 2015 10:52:01 -0700 Subject: [PATCH 11/16] fix URI --- core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 6559adf073b91..2ff8505c08c35 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -165,7 +165,7 @@ private[deploy] object RPackageUtils extends Logging { printStream: PrintStream = null, verbose: Boolean = false): Unit = { jars.split(",").foreach { jarPath => - val file = new File(new URI(jarPath)) + val file = new File(Utils.resolveURI(jarPath)) if (file.exists()) { val jar = new JarFile(file) if (checkManifestForR(jar)) { From 3a1be7dbc3f91b6591beb81dae2b3f2135e8417f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 3 Aug 2015 15:18:39 -0700 Subject: [PATCH 12/16] don't zip --- .../scala/org/apache/spark/deploy/RPackageUtils.scala | 10 +++++----- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 2ff8505c08c35..2b7ca362c5893 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -190,19 +190,19 @@ private[deploy] object RPackageUtils extends Logging { } } - private def listFilesRecursively(dir: File): Seq[File] = { + private def listFilesRecursively(dir: File): Set[File] = { if (!dir.exists()) { - Seq.empty[File] + Set.empty[File] } else { if (dir.isDirectory) { val subDir = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !dir.getAbsolutePath.contains("SparkR" + File.separator) + !dir.getAbsolutePath.contains("SparkR") && !name.contains(".zip") } }) - subDir.flatMap(listFilesRecursively) + subDir.flatMap(listFilesRecursively).toSet } else { - Seq(dir) + Set(dir) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 41b2f0f2c4895..ae90b0eaa08b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -376,11 +376,11 @@ object SparkSubmit { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) - val rExtras = RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_EXTRAS_ARCHIVE) - val extrasURI = Utils.resolveURI(rExtras.getAbsolutePath).toString + "#sparkr-extras" + // val rExtras = RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_EXTRAS_ARCHIVE) + // val extrasURI = Utils.resolveURI(rExtras.getAbsolutePath).toString + "#sparkr-extras" // Assigns a symbol link name "sparkr" to the shipped package. - args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr", extrasURI) + args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") } // If we're running a R app, set the main class to our specific R runner From ddfcc0686958ae24217a41646cc358fa7d91fc36 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 3 Aug 2015 17:51:59 -0700 Subject: [PATCH 13/16] added zipping test --- R/install-dev.sh | 1 - .../apache/spark/deploy/RPackageUtils.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 11 ++---- .../spark/deploy/SparkSubmitArguments.scala | 4 -- .../apache/spark/deploy/IvyTestUtils.scala | 2 +- .../spark/deploy/RPackageUtilsSuite.scala | 38 +++++++++++++++++-- 6 files changed, 41 insertions(+), 17 deletions(-) diff --git a/R/install-dev.sh b/R/install-dev.sh index 4972bb9217072..b3a71e87a69ed 100755 --- a/R/install-dev.sh +++ b/R/install-dev.sh @@ -44,6 +44,5 @@ R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/ # Zip the SparkR package so that it can be distributed to worker nodes on YARN cd $LIB_DIR -jar cfM "$LIB_DIR/sparkr.zip" SparkR popd > /dev/null diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 2b7ca362c5893..0a08b37aefbd7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -197,7 +197,7 @@ private[deploy] object RPackageUtils extends Logging { if (dir.isDirectory) { val subDir = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !dir.getAbsolutePath.contains("SparkR") && !name.contains(".zip") + !name.contains(".zip") } }) subDir.flatMap(listFilesRecursively).toSet diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ae90b0eaa08b8..79bd4a1a25d47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -82,8 +82,6 @@ object SparkSubmit { private val PYSPARK_SHELL = "pyspark-shell" private val SPARKR_SHELL = "sparkr-shell" private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip" - // contains R packages designed to run with SparkR for distribution in Yarn cluster mode - private val SPARKR_EXTRAS_ARCHIVE = "sparkr-extras.zip" private val CLASS_NOT_FOUND_EXIT_STATUS = 101 @@ -282,13 +280,13 @@ object SparkSubmit { SparkSubmitUtils.resolveMavenCoordinates( args.packages, Option(args.repositories), Option(args.ivyRepoPath)) if (resolvedMavenCoordinates.trim.nonEmpty) { - if (!args.isNonEmptyArg(args.jars)) { + if (!StringUtils.isEmpty(args.jars)) { args.jars = resolvedMavenCoordinates } else { args.jars += s",$resolvedMavenCoordinates" } if (args.isPython) { - if (!args.isNonEmptyArg(args.pyFiles)) { + if (!StringUtils.isEmpty(args.pyFiles)) { args.pyFiles = resolvedMavenCoordinates } else { args.pyFiles += s",$resolvedMavenCoordinates" @@ -371,13 +369,12 @@ object SparkSubmit { if (rPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } - val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE) + val rPackageFile = + RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) - // val rExtras = RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_EXTRAS_ARCHIVE) - // val extrasURI = Utils.resolveURI(rExtras.getAbsolutePath).toString + "#sparkr-extras" // Assigns a symbol link name "sparkr" to the shipped package. args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 569a761b05387..b3250ec6a16c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -600,8 +600,4 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S System.setErr(currentErr) } } - - private[deploy] def isNonEmptyArg(arg: String): Boolean = { - arg != null && arg.nonEmpty - } } diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index b212495d9d104..d93febcfd23fd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -77,7 +77,7 @@ private[deploy] object IvyTestUtils { } /** Write the contents to a file to the supplied directory. */ - private def writeFile(dir: File, fileName: String, contents: String): File = { + private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = { val outputFile = new File(dir, fileName) val outputStream = new FileOutputStream(outputFile) outputStream.write(contents.toCharArray.map(_.toByte)) diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 646cacb13c4da..47a64081e297e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -21,13 +21,18 @@ import java.io.{PrintStream, OutputStream, File} import java.net.URI import java.util.jar.Attributes.Name import java.util.jar.{JarFile, Manifest} +import java.util.zip.{ZipEntry, ZipFile} + +import org.scalatest.BeforeAndAfterEach +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.Files +import org.apache.commons.io.FileUtils import org.apache.spark.SparkFunSuite import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -import org.scalatest.BeforeAndAfterEach - -import scala.collection.mutable.ArrayBuffer class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { @@ -121,4 +126,31 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { assert(output.contains(RPackageUtils.RJarDoc)) } } + + test("SparkR zipping works properly") { + val tempDir = Files.createTempDir() + try { + IvyTestUtils.writeFile(tempDir, "test.R", "abc") + val fakeSparkRDir = new File(tempDir, "SparkR") + assert(fakeSparkRDir.mkdirs()) + IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc") + IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc") + IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :) + val fakePackageDir = new File(tempDir, "packageTest") + assert(fakePackageDir.mkdirs()) + IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc") + IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") + val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") + assert(finalZip.exists()) + val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName) + assert(entries.contains("/test.R")) + assert(entries.contains("/SparkR/abc.R")) + assert(entries.contains("/SparkR/DESCRIPTION")) + assert(!entries.contains("/package.zip")) + assert(entries.contains("/packageTest/def.R")) + assert(entries.contains("/packageTest/DESCRIPTION")) + } finally { + FileUtils.deleteDirectory(tempDir) + } + } } From 6603d0dcc80b5759f357194f5d102cac51b9be0d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 3 Aug 2015 22:21:52 -0700 Subject: [PATCH 14/16] addressed comments --- R/install-dev.sh | 3 --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/deploy/RPackageUtils.scala | 8 ++++---- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++++ 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/R/install-dev.sh b/R/install-dev.sh index b3a71e87a69ed..59d98c9c7a646 100755 --- a/R/install-dev.sh +++ b/R/install-dev.sh @@ -42,7 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo # Install SparkR to $LIB_DIR R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/ -# Zip the SparkR package so that it can be distributed to worker nodes on YARN -cd $LIB_DIR - popd > /dev/null diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 980bf8ec64bbb..4380cf45cc1b0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -46,7 +46,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.{RPackageUtils, LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump} import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat} diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 0a08b37aefbd7..2a5255ca1458a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -190,17 +190,17 @@ private[deploy] object RPackageUtils extends Logging { } } - private def listFilesRecursively(dir: File): Set[File] = { + private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): Set[File] = { if (!dir.exists()) { Set.empty[File] } else { if (dir.isDirectory) { val subDir = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !name.contains(".zip") + !excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern } }) - subDir.flatMap(listFilesRecursively).toSet + subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet } else { Set(dir) } @@ -209,7 +209,7 @@ private[deploy] object RPackageUtils extends Logging { /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */ private[deploy] def zipRLibraries(dir: File, name: String): File = { - val filesToBundle = listFilesRecursively(dir) + val filesToBundle = listFilesRecursively(dir, Seq(".zip")) // create a zip file from scratch, do not append to existing file. val zipFile = new File(dir, name) zipFile.delete() diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b1bfc3b405d8d..fa276eb53852c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -365,6 +365,9 @@ class SparkSubmitSuite } test("correctly builds R packages included in a jar with --packages") { + // TODO: Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins. + // It's hard to write the test in SparkR (because we can't create the repository dynamically) + /* assume(RUtils.isRInstalled, "R isn't installed on this machine.") val main = MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -382,6 +385,7 @@ class SparkSubmitSuite rScriptDir) runSparkSubmit(args) } + */ } test("resolves command line argument paths correctly") { From d25370802ffd0594edf27e639b2dbcc24fbce2cf Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 4 Aug 2015 10:00:40 -0700 Subject: [PATCH 15/16] removed unused imports --- .../main/scala/org/apache/spark/deploy/RPackageUtils.scala | 5 ++--- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 3 --- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 2a5255ca1458a..ed1e972955679 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -18,19 +18,18 @@ package org.apache.spark.deploy import java.io._ -import java.net.URI import java.util.jar.JarFile import java.util.logging.Level import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConversions._ + import com.google.common.io.{ByteStreams, Files} import org.apache.spark.{SparkException, Logging} import org.apache.spark.api.r.RUtils import org.apache.spark.util.{RedirectThread, Utils} -import scala.collection.JavaConversions._ - private[deploy] object RPackageUtils extends Logging { /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index fa276eb53852c..53c411f60a470 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -365,7 +365,7 @@ class SparkSubmitSuite } test("correctly builds R packages included in a jar with --packages") { - // TODO: Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins. + // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins. // It's hard to write the test in SparkR (because we can't create the repository dynamically) /* assume(RUtils.isRInstalled, "R isn't installed on this machine.") diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 29b2dd7753ae4..eb6e1fd370620 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -22,9 +22,6 @@ import java.net.URL import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.spark.deploy.IvyTestUtils -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate - import scala.collection.JavaConversions._ import scala.collection.mutable From 0de384f33b92252c7e0a9e187a6c83c7e96d4e06 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 4 Aug 2015 10:02:37 -0700 Subject: [PATCH 16/16] remove unused imports 2 --- core/src/main/scala/org/apache/spark/api/r/RBackend.scala | 1 - .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 3b43fc7b1362a..b7e72d4d0ed0b 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -28,7 +28,6 @@ import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.LengthFieldBasedFrameDecoder import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder} -import org.apache.spark.deploy.RPackageUtils import org.apache.spark.{Logging, SparkConf} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 53c411f60a470..757e0ce3d278b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy import java.io._ -import org.apache.spark.api.r.RUtils - import scala.collection.mutable.ArrayBuffer import com.google.common.base.Charsets.UTF_8