Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV #16854

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema
Expand Down Expand Up @@ -368,14 +369,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
createParser)
}

// Check a field requirement for corrupt records here to throw an exception in a driver side
schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
val f = schema(corruptFieldIndex)
if (f.dataType != StringType || !f.nullable) {
throw new AnalysisException(
"The field for corrupt records must be string type and nullable")
}
}
verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)

val parsed = jsonDataset.rdd.mapPartitions { iter =>
val parser = new JacksonParser(schema, parsedOptions)
Expand All @@ -398,6 +392,51 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
csv(Seq(path): _*)
}

/**
* Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`.
*
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
* this function goes through the input once to determine the input schema.
*
* If the schema is not specified using `schema` function and `inferSchema` option is disabled,
* it determines the columns as string types and it reads only the first line to determine the
* names and the number of fields.
*
* @param csvDataset input Dataset with one CSV row per record
* @since 2.2.0
*/
def csv(csvDataset: Dataset[String]): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also add def json(lines: Dataset[String])? and deprecate json(r: RDD[String]) and json(r: JavaRDD[String])

cc @rxin

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Actually, there is a JIRA and closed PR, #13460 and SPARK-15615 where I was negative because it can be easily worked around.

However, I am fine if we are promoting to use datasets instead of RDDs for some advantages like SPARK-18362 (if applicable).

cc @pjfanning, could you reopen and proceed your PR if we are all fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I can look at resurrecting the pull request for SPARK-15615

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I added a new pull request because my original branch was deleted. #16895

val parsedOptions: CSVOptions = new CSVOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone)
val filteredLines: Dataset[String] =
CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)
val maybeFirstLine: Option[String] = filteredLines.take(1).headOption

val schema = userSpecifiedSchema.getOrElse {
TextInputCSVDataSource.inferFromDataset(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should issue an error when users try to parse it as a wholeFile.

Need to check whether all the other CSV options are still accepted by this API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, Yes, we need to check. For JSON API too. Though, should we throws an error? It reminds me of parse modes in from_json/to_json that ignore parse modes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not simply ignore the options without error messages. The options are not like hints.

sparkSession,
csvDataset,
maybeFirstLine,
parsedOptions)
}

verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)

val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
}.getOrElse(filteredLines.rdd)

val parsed = linesWithoutHeader.mapPartitions { iter =>
val parser = new UnivocityParser(schema, parsedOptions)
iter.flatMap(line => parser.parse(line))
}

Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
}

/**
* Loads a CSV file and returns the result as a `DataFrame`.
*
Expand Down Expand Up @@ -604,6 +643,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
}

/**
* A convenient function for schema validation in datasources supporting
* `columnNameOfCorruptRecord` as an option.
*/
private def verifyColumnNameOfCorruptRecord(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, this is too much. I am willing to revert this back.

schema: StructType,
columnNameOfCorruptRecord: String): Unit = {
schema.getFieldIndex(columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
val f = schema(corruptFieldIndex)
if (f.dataType != StringType || !f.nullable) {
throw new AnalysisException(
"The field for corrupt records must be string type and nullable")
}
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.sql.execution.datasources.csv

import java.io.InputStream
import java.nio.charset.{Charset, StandardCharsets}

import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
import com.univocity.parsers.csv.CsvParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -134,23 +133,33 @@ object TextInputCSVDataSource extends CSVDataSource {
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption match {
case Some(firstLine) =>
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
case None =>
// If the first line could not be read, just return the empty schema.
Some(StructType(Nil))
}
val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption
Some(inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions))
}

/**
* Infers the schema from `Dataset` that stores CSV string records.
*/
def inferFromDataset(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is almost no code modification here. Just moved from above.

sparkSession: SparkSession,
csv: Dataset[String],
maybeFirstLine: Option[String],
parsedOptions: CSVOptions): StructType = maybeFirstLine match {
case Some(firstLine) =>
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
CSVInferSchema.infer(tokenRDD, header, parsedOptions)
case None =>
// If the first line could not be read, just return the empty schema.
StructType(Nil)
}

private def createBaseDataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}

private[csv] class CSVOptions(
class CSVOptions(
@transient private val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

private[csv] class UnivocityParser(
class UnivocityParser(
schema: StructType,
requiredSchema: StructType,
private val options: CSVOptions) extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(cars, withHeader = true, checkTypes = true)
}

test("simple csv test with string dataset") {
val csvDataset = spark.read.text(testFile(carsFile)).as[String]
val cars = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvDataset)

verifyCars(cars, withHeader = true, checkTypes = true)

val carsWithoutHeader = spark.read
.option("header", "false")
.csv(csvDataset)

verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false)
}

test("test inferring booleans") {
val result = spark.read
.format("csv")
Expand Down Expand Up @@ -1088,4 +1104,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
checkAnswer(df, spark.emptyDataFrame)
}
}

test("Empty string dataset produces empty dataframe and keep user-defined schema") {
val df1 = spark.read.csv(spark.emptyDataset[String])
assert(df1.schema === spark.emptyDataFrame.schema)
checkAnswer(df1, spark.emptyDataFrame)

val schema = StructType(StructField("a", StringType) :: Nil)
val df2 = spark.read.schema(schema).csv(spark.emptyDataset[String])
assert(df2.schema === schema)
}

}