Skip to content

Commit

Permalink
[SPARK-28030][SQL] convert filePath to URI in binary file data source
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Convert `PartitionedFile.filePath` to URI first in binary file data source. Otherwise Spark will throw a FileNotFound exception because we create `Path` with URL encoded string, instead of wrapping it with URI.

## How was this patch tested?

Unit test.

Closes #24855 from mengxr/SPARK-28030.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
mengxr committed Jun 12, 2019
1 parent 37ab433 commit 4f4829b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.NextIterator
* that need to be prepended to each row.
*
* @param partitionValues value of partition columns to be prepended to each row.
* @param filePath path of the file to read
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param locations locality information (list of nodes that have the data).
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.binaryfile

import java.net.URI
import java.sql.Timestamp

import com.google.common.io.{ByteStreams, Closeables}
Expand Down Expand Up @@ -100,7 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)

file: PartitionedFile => {
val path = new Path(file.filePath)
val path = new Path(new URI(file.filePath))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
if (filterFuncs.forall(_.apply(status))) {
Expand Down
Expand Up @@ -368,4 +368,18 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
assert(caught.getMessage.contains("exceeds the max length allowed"))
}
}

test("SPARK-28030: support chars in file names that require URL encoding") {
withTempDir { dir =>
val file = new File(dir, "test space.txt")
val content = "123".getBytes
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
val df = spark.read.format(BINARY_FILE).load(dir.getPath)
df.select(col(PATH), col(CONTENT)).first() match {
case Row(p: String, c: Array[Byte]) =>
assert(p.endsWith(file.getAbsolutePath), "should support space in file name")
assert(c === content, "should read file with space in file name")
}
}
}
}

0 comments on commit 4f4829b

Please sign in to comment.