From 1a7a0cb4430f847ac95c0c764393003581415103 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Nov 2018 21:51:04 +0100 Subject: [PATCH 1/4] Added a test --- .../sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2efe1dda475c5..9a26c5c16cfc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1859,4 +1859,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(df, Row(null, csv)) } } + + test("encoding in multiLine mode") { + withTempPath { path => + val df = spark.range(3).toDF() + df.write + .option("encoding", "UTF-16BE") + .csv(path.getCanonicalPath) + val readback = spark.read + .option("multiLine", true) + .option("encoding", "UTF-16BE") + .csv(path.getCanonicalPath) + checkAnswer(df, readback) + } + } } From cd57ec5833bbfb5f0b33d63a56b48d25924f6be1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Nov 2018 22:07:41 +0100 Subject: [PATCH 2/4] Test multiple encodings --- .../execution/datasources/csv/CSVSuite.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9a26c5c16cfc1..3253980696faa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1861,16 +1861,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } test("encoding in multiLine mode") { - withTempPath { path => - val df = spark.range(3).toDF() - df.write - .option("encoding", "UTF-16BE") - .csv(path.getCanonicalPath) - val readback = spark.read - .option("multiLine", true) - .option("encoding", "UTF-16BE") - .csv(path.getCanonicalPath) - checkAnswer(df, readback) + val df = spark.range(3).toDF() + Seq("UTF-8", "ISO-8859-1", "CP1251", "US-ASCII", "UTF-16BE", "UTF-32LE").foreach { encoding => + withTempPath { path => + df.write + .option("encoding", encoding) + .csv(path.getCanonicalPath) + val readback = spark.read + .option("multiLine", true) + .option("encoding", encoding) + .option("inferSchema", true) + .csv(path.getCanonicalPath) + checkAnswer(readback, df) + } } } } From 1c76f8944979df8a7b9b8181ebfa38933c3f2c00 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Nov 2018 22:09:04 +0100 Subject: [PATCH 3/4] Pass encoding to uniVocity parser --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 12 +++++++----- .../execution/datasources/csv/CSVDataSource.scala | 6 ++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 46ed58ed92830..ed196935e357f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -271,11 +271,12 @@ private[sql] object UnivocityParser { def tokenizeStream( inputStream: InputStream, shouldDropHeader: Boolean, - tokenizer: CsvParser): Iterator[Array[String]] = { + tokenizer: CsvParser, + encoding: String): Iterator[Array[String]] = { val handleHeader: () => Unit = () => if (shouldDropHeader) tokenizer.parseNext - convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) + convertStream(inputStream, tokenizer, handleHeader, encoding)(tokens => tokens) } /** @@ -297,7 +298,7 @@ private[sql] object UnivocityParser { val handleHeader: () => Unit = () => headerChecker.checkHeaderColumnNames(tokenizer) - convertStream(inputStream, tokenizer, handleHeader) { tokens => + convertStream(inputStream, tokenizer, handleHeader, parser.options.charset) { tokens => safeParser.parse(tokens) }.flatten } @@ -305,9 +306,10 @@ private[sql] object UnivocityParser { private def convertStream[T]( inputStream: InputStream, tokenizer: CsvParser, - handleHeader: () => Unit)( + handleHeader: () => Unit, + encoding: String)( convert: Array[String] => T) = new Iterator[T] { - tokenizer.beginParsing(inputStream) + tokenizer.beginParsing(inputStream, encoding) // We can handle header here since here the stream is open. handleHeader() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 4808e8ef042d1..554baaf1a9b3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -192,7 +192,8 @@ object MultiLineCSVDataSource extends CSVDataSource { UnivocityParser.tokenizeStream( CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path), shouldDropHeader = false, - new CsvParser(parsedOptions.asParserSettings)) + new CsvParser(parsedOptions.asParserSettings), + encoding = parsedOptions.charset) }.take(1).headOption match { case Some(firstRow) => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis @@ -203,7 +204,8 @@ object MultiLineCSVDataSource extends CSVDataSource { lines.getConfiguration, new Path(lines.getPath())), parsedOptions.headerFlag, - new CsvParser(parsedOptions.asParserSettings)) + new CsvParser(parsedOptions.asParserSettings), + encoding = parsedOptions.charset) } val sampled = CSVUtils.sample(tokenRDD, parsedOptions) CSVInferSchema.infer(sampled, header, parsedOptions) From 16eb14c73f3fad8d83fee41d5665b52f180daf73 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Nov 2018 22:22:23 +0100 Subject: [PATCH 4/4] Test with header and without it --- .../execution/datasources/csv/CSVSuite.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3253980696faa..e29cd2aa7c4e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1863,16 +1863,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("encoding in multiLine mode") { val df = spark.range(3).toDF() Seq("UTF-8", "ISO-8859-1", "CP1251", "US-ASCII", "UTF-16BE", "UTF-32LE").foreach { encoding => - withTempPath { path => - df.write - .option("encoding", encoding) - .csv(path.getCanonicalPath) - val readback = spark.read - .option("multiLine", true) - .option("encoding", encoding) - .option("inferSchema", true) - .csv(path.getCanonicalPath) - checkAnswer(readback, df) + Seq(true, false).foreach { header => + withTempPath { path => + df.write + .option("encoding", encoding) + .option("header", header) + .csv(path.getCanonicalPath) + val readback = spark.read + .option("multiLine", true) + .option("encoding", encoding) + .option("inferSchema", true) + .option("header", header) + .csv(path.getCanonicalPath) + checkAnswer(readback, df) + } } } }