Skip to content

Commit

Permalink
[SPARK-24068] Propagating DataFrameReader's options to Text datasourc…
Browse files Browse the repository at this point in the history
…e on schema inferring

## What changes were proposed in this pull request?

While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302

but the options are not propagated to Text datasource on schema inferring, for instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188

The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified.

## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:

```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
|   a|   1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
```

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21182 from MaxGekk/text-options.
  • Loading branch information
MaxGekk authored and HyukjinKwon committed May 9, 2018
1 parent 487faf1 commit e3de6ab
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 12 deletions.
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util._
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
Expand Down
Expand Up @@ -185,7 +185,8 @@ object TextInputCSVDataSource extends CSVDataSource {
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
Expand Down Expand Up @@ -250,7 +251,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
options: CSVOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val name = paths.mkString(",")
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
options.parameters))
FileInputFormat.setInputPaths(job, paths: _*)
val conf = job.getConfiguration

Expand Down
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._

class CSVOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
Expand Down
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.sql.execution.datasources.csv

import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv

import java.io.InputStream
import java.math.BigDecimal
import java.text.NumberFormat
import java.util.Locale

import scala.util.Try
import scala.util.control.NonFatal
Expand Down
Expand Up @@ -121,7 +121,7 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = textOptions
options = parsedOptions.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
Expand Down Expand Up @@ -159,7 +159,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
val parser = parsedOptions.encoding
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
Expand All @@ -170,9 +170,11 @@ object MultiLineJsonDataSource extends JsonDataSource {

private def createBaseRdd(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
parsedOptions.parameters))
val conf = job.getConfiguration
val name = paths.mkString(",")
FileInputFormat.setInputPaths(job, paths: _*)
Expand Down

0 comments on commit e3de6ab

Please sign in to comment.