Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6300][Spark Core] sc.addFile(path) does not support the relative path. #4993

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}

Expand Down
51 changes: 37 additions & 14 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,34 +79,57 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file = File.createTempFile("someprefix", "somesuffix")
val absolutePath = file.getAbsolutePath
val file1 = File.createTempFile("someprefix1", "somesuffix1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't use the temp dir you created below.

val absolutePath1 = file1.getAbsolutePath

val pluto = Utils.createTempDir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, "pluto" isn't a good name, though I know it was copied from another test, where names were after planets.

val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

try {
Files.write("somewords", file, UTF_8)
val length = file.length()
Files.write("somewords1", file1, UTF_8)
Files.write("somewords2", file2, UTF_8)
val length1 = file1.length()
val length2 = file2.length()

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file.getAbsolutePath)
sc.addFile(file1.getAbsolutePath)
sc.addFile(relativePath)
sc.parallelize(Array(1), 1).map(x => {
val gotten = new File(SparkFiles.get(file.getName))
if (!gotten.exists()) {
throw new SparkException("file doesn't exist")
val gotten1 = new File(SparkFiles.get(file1.getName))
val gotten2 = new File(SparkFiles.get(file2.getName))
if (!gotten1.exists()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these be assert? I know they weren't before but it would be simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert ends up with screwy effects because the code is running inside a map function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Well at least this could be refactored into a little helper.

throw new SparkException("file doesn't exist : " + absolutePath1)
}
if (!gotten2.exists()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same; feels like you can wrap up all these checks in a local function that you can just apply to both files rather than repeating code.

throw new SparkException("file doesn't exist : " + absolutePath2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and others can now use string interpolation, while we're here

}
if (length != gotten.length()) {

if (length1 != gotten1.length()) {
throw new SparkException(
s"file has different length $length1 than added file ${gotten1.length()} : " + absolutePath1)
}
if (length2 != gotten2.length()) {
throw new SparkException(
s"file has different length $length than added file ${gotten.length()}")
s"file has different length $length2 than added file ${gotten2.length()} : " + absolutePath2)
}
if (absolutePath == gotten.getAbsolutePath) {
throw new SparkException("file should have been copied")

if (absolutePath1 == gotten1.getAbsolutePath) {
throw new SparkException("file should have been copied :" + absolutePath1)
}
if (absolutePath2 == gotten2.getAbsolutePath) {
throw new SparkException("file should have been copied : " + absolutePath2)
}
x
}).count()
} finally {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down