Skip to content

Commit

Permalink
[SPARK-32270][SQL] Use TextFileFormat in CSV's schema inference with …
Browse files Browse the repository at this point in the history
…a different encoding

This PR proposes to use text datasource in CSV's schema inference. This shares the same reasons of SPARK-18362, SPARK-19885 and SPARK-19918 - we're currently using Hadoop RDD when the encoding is different, which is unnecessary. This PR completes SPARK-18362, and address the comment at apache#15813 (comment).

We should better keep the code paths consistent with existing CSV and JSON datasources as well, but this CSV schema inference with the encoding specified is different from UTF-8 alone.

There can be another story that this PR might lead to a bug fix: Spark session configurations, say Hadoop configurations, are not respected during CSV schema inference when the encoding is different (but it has to be set to Spark context for schema inference when the encoding is different).

For consistency, potentially better performance, and fixing a potentially very corner case bug.

Virtually no.

Existing tests should cover.

Closes apache#29063 from HyukjinKwon/SPARK-32270.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
HyukjinKwon authored and dongjoon-hyun committed Sep 2, 2020
1 parent 8a8c1d4 commit 41d65a2
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource {
inputPaths: Seq[FileStatus],
options: CSVOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
val df = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters.originalMap
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)

if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters.originalMap
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
df
} else {
val charset = options.charset
val rdd = sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(","))
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
sparkSession.createDataset(rdd)(Encoders.STRING)
sparkSession.createDataset(df.queryExecution.toRdd.map { row =>
val bytes = row.getBinary(0)
new String(bytes, 0, bytes.length, charset)
})(Encoders.STRING)
}
}
}
Expand Down

0 comments on commit 41d65a2

Please sign in to comment.