Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[SPARK-26122][SQL] Support encoding for multiLine in CSV datasource
## What changes were proposed in this pull request?

In the PR, I propose to pass the CSV option `encoding`/`charset` to `uniVocity` parser to allow parsing CSV files in different encodings when `multiLine` is enabled. The value of the option is passed to the `beginParsing` method of `CSVParser`.

## How was this patch tested?

Added new test to `CSVSuite` for different encodings and enabled/disabled header.

Closes #23091 from MaxGekk/csv-miltiline-encoding.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Nov 21, 2018
1 parent 4785105 commit 2df34db
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
Expand Up @@ -271,11 +271,12 @@ private[sql] object UnivocityParser {
def tokenizeStream(
inputStream: InputStream,
shouldDropHeader: Boolean,
tokenizer: CsvParser): Iterator[Array[String]] = {
tokenizer: CsvParser,
encoding: String): Iterator[Array[String]] = {
val handleHeader: () => Unit =
() => if (shouldDropHeader) tokenizer.parseNext

convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
convertStream(inputStream, tokenizer, handleHeader, encoding)(tokens => tokens)
}

/**
Expand All @@ -297,17 +298,18 @@ private[sql] object UnivocityParser {
val handleHeader: () => Unit =
() => headerChecker.checkHeaderColumnNames(tokenizer)

convertStream(inputStream, tokenizer, handleHeader) { tokens =>
convertStream(inputStream, tokenizer, handleHeader, parser.options.charset) { tokens =>
safeParser.parse(tokens)
}.flatten
}

private def convertStream[T](
inputStream: InputStream,
tokenizer: CsvParser,
handleHeader: () => Unit)(
handleHeader: () => Unit,
encoding: String)(
convert: Array[String] => T) = new Iterator[T] {
tokenizer.beginParsing(inputStream)
tokenizer.beginParsing(inputStream, encoding)

// We can handle header here since here the stream is open.
handleHeader()
Expand Down
Expand Up @@ -192,7 +192,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
shouldDropHeader = false,
new CsvParser(parsedOptions.asParserSettings))
new CsvParser(parsedOptions.asParserSettings),
encoding = parsedOptions.charset)
}.take(1).headOption match {
case Some(firstRow) =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
Expand All @@ -203,7 +204,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
lines.getConfiguration,
new Path(lines.getPath())),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
new CsvParser(parsedOptions.asParserSettings),
encoding = parsedOptions.charset)
}
val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
CSVInferSchema.infer(sampled, header, parsedOptions)
Expand Down
Expand Up @@ -1859,4 +1859,25 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
checkAnswer(df, Row(null, csv))
}
}

test("encoding in multiLine mode") {
val df = spark.range(3).toDF()
Seq("UTF-8", "ISO-8859-1", "CP1251", "US-ASCII", "UTF-16BE", "UTF-32LE").foreach { encoding =>
Seq(true, false).foreach { header =>
withTempPath { path =>
df.write
.option("encoding", encoding)
.option("header", header)
.csv(path.getCanonicalPath)
val readback = spark.read
.option("multiLine", true)
.option("encoding", encoding)
.option("inferSchema", true)
.option("header", header)
.csv(path.getCanonicalPath)
checkAnswer(readback, df)
}
}
}
}
}

0 comments on commit 2df34db

Please sign in to comment.