Skip to content
Permalink
Browse files

[SPARK-24068][BACKPORT-2.3] Propagating DataFrameReader's options to …

…Text datasource on schema inferring

## What changes were proposed in this pull request?

While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302

but the options are not propagated to Text datasource on schema inferring, for instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188

The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified.

## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:

```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
|   a|   1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
```

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21292 from MaxGekk/text-options-backport-v2.3.
  • Loading branch information...
MaxGekk authored and HyukjinKwon committed May 10, 2018
1 parent 8889d78 commit eab10f9945c1d01daa45c233a39dedfd184f543c
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
@@ -184,7 +184,8 @@ object TextInputCSVDataSource extends CSVDataSource {
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
@@ -248,7 +249,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
options: CSVOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val name = paths.mkString(",")
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
options.parameters))
FileInputFormat.setInputPaths(job, paths: _*)
val conf = job.getConfiguration

@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._

class CSVOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv

import java.io.InputStream
import java.math.BigDecimal
import java.text.NumberFormat
import java.util.Locale

import scala.util.Try
import scala.util.control.NonFatal
@@ -92,7 +92,7 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
inferFromDataset(json, parsedOptions)
}

@@ -104,13 +104,15 @@ object TextInputJsonDataSource extends JsonDataSource {

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): Dataset[String] = {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
className = classOf[TextFileFormat].getName,
options = parsedOptions.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
@@ -144,16 +146,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
JsonInferSchema.infer(sampled, parsedOptions, createParser)
}

private def createBaseRdd(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
parsedOptions.parameters))
val conf = job.getConfiguration
val name = paths.mkString(",")
FileInputFormat.setInputPaths(job, paths: _*)

0 comments on commit eab10f9

Please sign in to comment.
You can’t perform that action at this time.