Skip to content

Commit

Permalink
Revert "[SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remot…
Browse files Browse the repository at this point in the history
…e resources in yarn client mode"

This reverts commit 59529b2.
  • Loading branch information
Marcelo Vanzin committed Aug 29, 2017
1 parent 59529b2 commit 917fe66
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 115 deletions.
66 changes: 23 additions & 43 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,14 @@ object SparkSubmit extends CommandLineUtils {

/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
Expand Down Expand Up @@ -317,16 +311,12 @@ object SparkSubmit extends CommandLineUtils {
}

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
var localFiles: String = null
if (deployMode == CLIENT) {
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
val hadoopConf = new HadoopConfiguration()
args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
}

// Require all python files to be local, so we can add them to the PYTHONPATH
Expand Down Expand Up @@ -376,7 +366,7 @@ object SparkSubmit extends CommandLineUtils {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
Expand All @@ -386,8 +376,8 @@ object SparkSubmit extends CommandLineUtils {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (localPyFiles != null) {
sysProps("spark.submit.pyFiles") = localPyFiles
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

Expand Down Expand Up @@ -441,7 +431,7 @@ object SparkSubmit extends CommandLineUtils {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
Expand Down Expand Up @@ -478,7 +468,6 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
Expand All @@ -502,28 +491,15 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),

// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// Also add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (deployMode == CLIENT || isYarnCluster) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
Expand Down Expand Up @@ -580,6 +556,10 @@ object SparkSubmit extends CommandLineUtils {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// assure a keytab is available from any place in a JVM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ package object config {
.intConf
.createOptional

private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
.internal()
.stringConf
.toSequence
Expand Down
25 changes: 10 additions & 15 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2580,23 +2580,18 @@ private[spark] object Utils extends Logging {
}

/**
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute jars.
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
* only the "spark.jars" property.
*/
def getUserJars(conf: SparkConf): Seq[String] = {
def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}

/**
* Return the local jar files which will be added to REPL's classpath. These jar files are
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
* SparkSubmit at first.
*/
def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
val localJars = conf.getOption("spark.repl.local.jars")
localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
if (conf.get("spark.master") == "yarn" && isShell) {
val yarnJars = conf.getOption("spark.yarn.dist.jars")
unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
}

private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
Expand Down
68 changes: 13 additions & 55 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.io.Source
import com.google.common.io.ByteStreams
import org.apache.commons.io.{FilenameUtils, FileUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -738,7 +738,10 @@ class SparkSubmitSuite

test("downloadFile - file doesn't exist") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
intercept[FileNotFoundException] {
SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf)
}
Expand All @@ -756,7 +759,10 @@ class SparkSubmitSuite
val content = "hello, world"
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf)
checkDownloadedFile(sourcePath, outputPath)
Expand All @@ -769,7 +775,10 @@ class SparkSubmitSuite
val content = "hello, world"
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",")

Expand All @@ -780,43 +789,6 @@ class SparkSubmitSuite
}
}

test("Avoid re-upload remote resources in yarn client mode") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)

val tmpDir = Utils.createTempDir()
val file = File.createTempFile("tmpFile", "", tmpDir)
val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--jars", tmpJarPath,
"--files", s"s3a://${file.getAbsolutePath}",
"--py-files", s"s3a://${pyFile.getAbsolutePath}",
s"s3a://$mainResource"
)

val appArgs = new SparkSubmitArguments(args)
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3

// All the resources should still be remote paths, so that YARN client will not upload again.
sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")

// Local repl jars should be a local path.
sysProps("spark.repl.local.jars") should (startWith("file:"))

// local py files should not be a URI format.
sysProps("spark.submit.pyFiles") should (startWith("/"))
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand Down Expand Up @@ -856,11 +828,6 @@ class SparkSubmitSuite
Utils.deleteRecursively(tmpDir)
}
}

private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
conf.set("fs.s3a.impl.disable.cache", "true")
}
}

object JarCreationTest extends Logging {
Expand Down Expand Up @@ -930,13 +897,4 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
// Ignore the scheme for testing.
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
}

override def globStatus(pathPattern: Path): Array[FileStatus] = {
val newPath = new Path(pathPattern.toUri.getPath)
super.globStatus(newPath).map { status =>
val path = s"s3a://${status.getPath.toUri.getPath}"
status.setPath(new Path(path))
status
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = Utils.getLocalUserJarsForShell(conf)
val jars = Utils.getUserJars(conf, isShell = true)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
Expand Down

0 comments on commit 917fe66

Please sign in to comment.