Skip to content
Permalink
Browse files

[SPARK-23729][CORE] Respect URI fragment when resolving globs

Firstly, glob resolution will not result in swallowing the remote name part (that is preceded by the `#` sign) in case of `--files` or `--archives` options

Moreover in the special case of multiple resolutions when the remote naming does not make sense and error is returned.

Enhanced current test and wrote additional test for the error case

Author: Mihaly Toth <misutoth@gmail.com>

Closes #20853 from misutoth/glob-with-remote-name.

(cherry picked from commit 0604bea)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information...
misutoth authored and vanzin committed Mar 22, 2018
1 parent 4b9f33f commit c9acd46bed8fa3e410e8a44aafe3237e59deaa73
@@ -18,12 +18,13 @@
package org.apache.spark.deploy

import java.io.File
import java.net.URI

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils {
@@ -137,16 +138,31 @@ private[deploy] object DependencyUtils {
def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
require(paths != null, "paths cannot be null.")
Utils.stringToSeq(paths).flatMap { path =>
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "local" | "http" | "https" | "ftp" => Array(path)
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
Option(fs.globStatus(new Path(uri))).map { status =>
status.filter(_.isFile).map(_.getPath.toUri.toString)
}.getOrElse(Array(path))
val (base, fragment) = splitOnFragment(path)
(resolveGlobPath(base, hadoopConf), fragment) match {
case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException(
s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}")
case (resolved, Some(namedAs)) => resolved.map(_ + "#" + namedAs)
case (resolved, _) => resolved
}
}.mkString(",")
}

private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
(withoutFragment, Option(uri.getFragment))
}

private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array[String] = {
uri.getScheme match {
case "local" | "http" | "https" | "ftp" => Array(uri.toString)
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
Option(fs.globStatus(new Path(uri))).map { status =>
status.filter(_.isFile).map(_.getPath.toUri.toString)
}.getOrElse(Array(uri.toString))
}
}

}
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
try {
doPrepareSubmitEnvironment(args, conf)
} catch {
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
}
}

private def doPrepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -606,10 +606,13 @@ class SparkSubmitSuite
}

test("resolves command line argument paths correctly") {
val jars = "/jar1,/jar2" // --jars
val files = "local:/file1,file2" // --files
val archives = "file:/archive1,archive2" // --archives
val pyFiles = "py-file1,py-file2" // --py-files
val dir = Utils.createTempDir()
val archive = Paths.get(dir.toPath.toString, "single.zip")
Files.createFile(archive)
val jars = "/jar1,/jar2"
val files = "local:/file1,file2"
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
val pyFiles = "py-file1,py-file2"

// Test jars and files
val clArgs = Seq(
@@ -636,9 +639,10 @@ class SparkSubmitSuite
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
appArgs2.files should be (Utils.resolveURIs(files))
appArgs2.archives should be (Utils.resolveURIs(archives))
appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
conf2.get("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
conf2.get("spark.yarn.dist.archives") should fullyMatch regex
("file:/archive1,file:.*#archive3")

// Test python files
val clArgs3 = Seq(
@@ -657,6 +661,29 @@ class SparkSubmitSuite
conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
}

test("ambiguous archive mapping results in error message") {
val dir = Utils.createTempDir()
val archive1 = Paths.get(dir.toPath.toString, "first.zip")
val archive2 = Paths.get(dir.toPath.toString, "second.zip")
Files.createFile(archive1)
Files.createFile(archive2)
val jars = "/jar1,/jar2"
val files = "local:/file1,file2"
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
val pyFiles = "py-file1,py-file2"

// Test files and archives (Yarn)
val clArgs2 = Seq(
"--master", "yarn",
"--class", "org.SomeClass",
"--files", files,
"--archives", archives,
"thejar.jar"
)

testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
}

test("resolves config paths correctly") {
val jars = "/jar1,/jar2" // spark.jars
val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files

0 comments on commit c9acd46

Please sign in to comment.
You can’t perform that action at this time.