-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23729][CORE] Respect URI fragment when resolving globs #20853
Conversation
…hich is meant to be the remote name In case glob resolution results multiple items for a file with a remote name, an error is displayed.
Jenkins, ok to test |
nit: pr title should describe the solution, not the problem. e.g. "Respect URI fragment when resolving globs" is a description of the solution. |
Also this is not related to structured streaming, so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change [SS] which is structured streaming to something else like [CORE].
@@ -105,11 +105,17 @@ class SparkSubmitSuite | |||
|
|||
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x | |||
implicit val defaultSignaler: Signaler = ThreadSignaler | |||
var dir: File = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the tests doesn't use this dir at all. Why create it for all the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this too. I wanted to avoid making it an Option or doing a not very nice null check. I can do that later though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean more like put something here which is used by more than 2 tests. There are ~40 tests which are just creating and deleting this directory without any benefit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to make sure the directory is deleted even if the test fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is a good example here: test("launch simple application with spark-submit with redaction")
val jars = "/jar1,/jar2" // --jars | ||
val files = "local:/file1,file2" // --files | ||
val archives = "file:/archive1,archive2" // --archives | ||
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" | ||
// --archives | ||
val pyFiles = "py-file1,py-file2" // --py-files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does --py-files
support renaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the doc only --files
and --archives
support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN's Client.scala
supports renaming for everything that uses the distributed cache, even if that's not explicitly called out in the docs.
@@ -657,6 +667,31 @@ class SparkSubmitSuite | |||
conf3.get(PYSPARK_PYTHON.key) should be ("python3.5") | |||
} | |||
|
|||
var cleanExit = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is it used for?
} catch { | ||
case e: SparkException => | ||
printErrorAndExit(e.getMessage) | ||
throw new RuntimeException("Unreachable production code") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I have a feeling it's a bit overkill compared to the other occurences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which part do you mean and overkill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new RuntimeException...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise execution just continues in the test itself where exitFn
does not stop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just let the exception propagate? That's what a lot of this code does... then you don't need to change this file at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the directory deletion is hooked into JVM shutdown. So I will let this to do the housekeeping for us and will avoid a new field either.
val renameAs = if (spath.length > 1) Some(spath(1)) else None | ||
val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) | ||
resolved match { | ||
case array: Array[String] if !renameAs.isEmpty && array.length>1 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: array.length > 1
val spath = path.split('#') | ||
val renameAs = if (spath.length > 1) Some(spath(1)) else None | ||
val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) | ||
resolved match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified something like this: (renameAs, resolved) match...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole match block is a little ugly, but I'll wait to see how you implement Gabor's suggestion...
}.getOrElse(Array(path)) | ||
val spath = path.split('#') | ||
val renameAs = if (spath.length > 1) Some(spath(1)) else None | ||
val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: resolveGlobPath
case array: Array[String] if !renameAs.isEmpty && array.length>1 => | ||
throw new SparkException( | ||
s"${spath(1)} resolves ambiguously to multiple files: ${array.mkString(",")}") | ||
case array: Array[String] if !renameAs.isEmpty => array.map( _ + "#" + renameAs.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can find some meaningful name for array
which makes me hard to read the code.
Option(fs.globStatus(new Path(uri))).map { status => | ||
status.filter(_.isFile).map(_.getPath.toUri.toString) | ||
}.getOrElse(Array(path)) | ||
val spath = path.split('#') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use Utils.resolveURI
as before? Parsing URIs by hand is very sketchy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It took some time to clone a URI without the fragment part though but next version will include that.
val spath = path.split('#') | ||
val renameAs = if (spath.length > 1) Some(spath(1)) else None | ||
val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) | ||
resolved match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole match block is a little ugly, but I'll wait to see how you implement Gabor's suggestion...
} catch { | ||
case e: SparkException => | ||
printErrorAndExit(e.getMessage) | ||
throw new RuntimeException("Unreachable production code") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just let the exception propagate? That's what a lot of this code does... then you don't need to change this file at all.
@vanzin I want to present an error on the CLI. This is what the On the other hand I simplified to just rethrowing the Exception |
Test build #88379 has finished for PR 20853 at commit
|
}.getOrElse(Array(path)) | ||
val (base, fragment) = splitOnFragment(Utils.resolveURI(path)) | ||
(resolveGlobPath(base, hadoopConf), fragment) match { | ||
case (resolved: Array[String], Some(_)) if resolved.length > 1 => throw new SparkException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type inference is not working here?
(resolveGlobPath(base, hadoopConf), fragment) match { | ||
case (resolved: Array[String], Some(_)) if resolved.length > 1 => throw new SparkException( | ||
s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") | ||
case (resolved: Array[String], Some(namedAs)) => resolved.map( _ + "#" + namedAs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
…small improvements
Test build #88385 has finished for PR 20853 at commit
|
Test build #88389 has finished for PR 20853 at commit
|
val (base, fragment) = splitOnFragment(path) | ||
(resolveGlobPath(base, hadoopConf), fragment) match { | ||
case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException( | ||
s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: resolved.mkString(", ")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no space used here before. Actually there should not be any space in the resulting list. Tests also rely on this.
(withoutFragment, fragment) | ||
} | ||
|
||
private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array [String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Array[String]
try { | ||
doPrepareSubmitEnvironment(args, conf) | ||
} catch { | ||
case e: SparkException => printErrorAndExit(e.getMessage); throw e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
Test build #88406 has finished for PR 20853 at commit
|
retest this please |
Test build #88430 has finished for PR 20853 at commit
|
Test build #88429 has finished for PR 20853 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor things otherwise looks good.
private def splitOnFragment(path: String): (URI, Option[String]) = { | ||
val uri = Utils.resolveURI(path) | ||
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null) | ||
val fragment = if (uri.getFragment != null) Some(uri.getFragment) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option(uri.getFragment)
Files.createFile(archive2) | ||
val jars = "/jar1,/jar2" // --jars | ||
val files = "local:/file1,file2" // --files | ||
// --archives |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary comment. I know the other test has them, but I'd just remove these from this new code, since they don't add any useful information.
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" | ||
val pyFiles = "py-file1,py-file2" // --py-files | ||
|
||
// Test files and archives (Yarn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary comment.
Test build #88476 has finished for PR 20853 at commit
|
Merging to master / 2.3. |
Firstly, glob resolution will not result in swallowing the remote name part (that is preceded by the `#` sign) in case of `--files` or `--archives` options Moreover in the special case of multiple resolutions when the remote naming does not make sense and error is returned. Enhanced current test and wrote additional test for the error case Author: Mihaly Toth <misutoth@gmail.com> Closes #20853 from misutoth/glob-with-remote-name. (cherry picked from commit 0604bea) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Firstly, glob resolution will not result in swallowing the remote name part (that is preceded by the `#` sign) in case of `--files` or `--archives` options Moreover in the special case of multiple resolutions when the remote naming does not make sense and error is returned. Enhanced current test and wrote additional test for the error case Author: Mihaly Toth <misutoth@gmail.com> Closes apache#20853 from misutoth/glob-with-remote-name. (cherry picked from commit 0604bea) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Ref: HADOOP-42709 Firstly, glob resolution will not result in swallowing the remote name part (that is preceded by the `#` sign) in case of `--files` or `--archives` options Moreover in the special case of multiple resolutions when the remote naming does not make sense and error is returned. Enhanced current test and wrote additional test for the error case Author: Mihaly Toth <misutoth@gmail.com> Closes apache#20853 from misutoth/glob-with-remote-name. (cherry picked from commit 0604bea) RB=1500362 BUG=LIHADOOP-42709 G=superfriends-reviewers R=fli,mshen,yezhou,edlu A=fli,xhzhang
What changes were proposed in this pull request?
Firstly, glob resolution will not result in swallowing the remote name part (that is preceded by the
#
sign) in case of--files
or--archives
optionsMoreover in the special case of multiple resolutions when the remote naming does not make sense and error is returned.
How was this patch tested?
Enhanced current test and wrote additional test for the error case