Skip to content
Permalink
Browse files

[SPARK-23148][SQL] Allow pathnames with special characters for CSV / …

…JSON / text

…JSON / text

## What changes were proposed in this pull request?

Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.

## How was this patch tested?

New unit tests for JSON, CSV and text suites

Author: Henry Robinson <henry@cloudera.com>

Closes #20355 from henryr/spark-23148.

(cherry picked from commit de36f65)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
  • Loading branch information...
Henry Robinson authored and HyukjinKwon committed Jan 24, 2018
1 parent 84a189a commit 17317c8fb99715836fcebc39ffb04648ab7fb762
@@ -45,11 +45,11 @@ object CodecStreams {
}

/**
* Creates an input stream from the string path and add a closure for the input stream to be
* Creates an input stream from the given path and add a closure for the input stream to be
* closed on task completion.
*/
def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = {
val inputStream = createInputStream(config, new Path(path))
def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = {
val inputStream = createInputStream(config, path)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close()))
inputStream
}
@@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.datasources.csv

import java.net.URI
import java.nio.charset.{Charset, StandardCharsets}

import com.univocity.parsers.csv.CsvParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.Job
@@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
@@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
parser.options.headerFlag,
parser,
schema)
@@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
val path = new Path(lines.getPath())
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
shouldDropHeader = false,
new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption match {
@@ -230,7 +231,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(
lines.getConfiguration,
lines.getPath()),
new Path(lines.getPath())),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
@@ -18,11 +18,12 @@
package org.apache.spark.sql.execution.datasources.json

import java.io.InputStream
import java.net.URI

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -168,9 +169,10 @@ object MultiLineJsonDataSource extends JsonDataSource {
}

private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
val path = new Path(record.getPath())
CreateJacksonParser.inputStream(
jsonFactory,
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, record.getPath()))
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
}

override def readFile(
@@ -180,7 +182,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
def partitionedFileString(ignored: Any): UTF8String = {
Utils.tryWithResource {
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath)
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
@@ -193,6 +195,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
parser.options.columnNameOfCorruptRecord)

safeParser.parse(
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath))
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
}
}
@@ -23,6 +23,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
import testImplicits._

private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
private val nameWithSpecialChars = "sp&cial%c hars"

allFileBasedDataSources.foreach { format =>
test(s"Writing empty datasets should not fail - $format") {
@@ -54,7 +55,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
// Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
// `TEXT` data source always has a single column whose name is `value`.
Seq("orc", "parquet").foreach { format =>
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
withTempPath { file =>
val path = file.getCanonicalPath
val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
@@ -69,7 +70,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {

allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
val nameWithSpecialChars = s"sp&cial%chars"
withTempDir { dir =>
val tmpFile = s"$dir/$nameWithSpecialChars"
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
@@ -78,4 +78,18 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
}
}
}

// Separate test case for formats that support multiLine as an option.
Seq("json", "csv").foreach { format =>
test("SPARK-23148 read files containing special characters " +
s"using $format with multiline enabled") {
withTempDir { dir =>
val tmpFile = s"$dir/$nameWithSpecialChars"
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
val reader = spark.read.format(format).option("multiLine", true)
val fileContent = reader.load(tmpFile)
checkAnswer(fileContent, Seq(Row("a"), Row("b")))
}
}
}
}

0 comments on commit 17317c8

Please sign in to comment.
You can’t perform that action at this time.