Skip to content

Commit

Permalink
[SPARK-20980][SQL] Rename wholeFile to multiLine for both CSV and…
Browse files Browse the repository at this point in the history
… JSON

The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`.

N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18202 from gatorsmile/renameCVSOption.

(cherry picked from commit 2051428)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gatorsmile authored and cloud-fan committed Jun 15, 2017
1 parent e02e063 commit af4f89c
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 54 deletions.
6 changes: 3 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ setMethod("toDF", signature(x = "RDD"),
#'
#' Loads a JSON file, returning the result as a SparkDataFrame
#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
#' ) is supported. For JSON (one record per file), set a named property \code{wholeFile} to
#' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
#' \code{TRUE}.
#' It goes through the entire dataset once to determine the schema.
#'
Expand All @@ -348,7 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' df <- read.json(path, wholeFile = TRUE)
#' df <- read.json(path, multiLine = TRUE)
#' df <- jsonFile(path)
#' }
#' @name read.json
Expand Down Expand Up @@ -598,7 +598,7 @@ tableToDF <- function(tableName) {
#' df1 <- read.df("path/to/file.json", source = "json")
#' schema <- structType(structField("name", "string"),
#' structField("info", "map<string,double>"))
#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
#' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
#' }
#' @name read.df
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
wholeFile=None):
multiLine=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
`JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
For JSON (one record per file), set the ``wholeFile`` parameter to ``true``.
For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
Expand Down Expand Up @@ -224,7 +224,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
:param multiLine: parse one record, which may span multiple lines, per file. If None is
set, it uses the default value, ``false``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
Expand All @@ -242,7 +242,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, wholeFile=wholeFile)
timestampFormat=timestampFormat, multiLine=multiLine)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -316,7 +316,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, wholeFile=None):
columnNameOfCorruptRecord=None, multiLine=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -389,7 +389,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse records, which may span multiple lines. If None is
:param multiLine: parse records, which may span multiple lines. If None is
set, it uses the default value, ``false``.
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
Expand All @@ -404,7 +404,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
wholeFile=None):
multiLine=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
`JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
For JSON (one record per file), set the ``wholeFile`` parameter to ``true``.
For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
Expand Down Expand Up @@ -458,7 +458,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
:param multiLine: parse one record, which may span multiple lines, per file. If None is
set, it uses the default value, ``false``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
Expand All @@ -473,7 +473,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, wholeFile=wholeFile)
timestampFormat=timestampFormat, multiLine=multiLine)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
Expand Down Expand Up @@ -532,7 +532,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, wholeFile=None):
columnNameOfCorruptRecord=None, multiLine=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -607,7 +607,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse one record, which may span multiple lines. If None is
:param multiLine: parse one record, which may span multiple lines. If None is
set, it uses the default value, ``false``.
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
Expand All @@ -624,7 +624,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,15 @@ def test_udf_with_order_by_and_limit(self):
res.explain(True)
self.assertEqual(res.collect(), [Row(id=0, copy=0)])

def test_wholefile_json(self):
def test_multiLine_json(self):
people1 = self.spark.read.json("python/test_support/sql/people.json")
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
wholeFile=True)
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_wholefile_csv(self):
def test_multiLine_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", wholeFile=True)
"python/test_support/sql/ages_newlines.csv", multiLine=True)
expected = [Row(_c0=u'Joe', _c1=u'20', _c2=u'Hi,\nI am Jeo'),
Row(_c0=u'Tom', _c1=u'30', _c2=u'My name is Tom'),
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[sql] class JSONOptions(
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)

val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* Loads JSON files and returns the results as a `DataFrame`.
*
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
* default. For JSON (one record per file), set the `wholeFile` option to true.
* default. For JSON (one record per file), set the `multiLine` option to true.
*
* This function goes through the input once to determine the input schema. If you know the
* schema in advance, use the version that specifies the schema to avoid the extra scan.
Expand Down Expand Up @@ -323,7 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* </ul>
*
Expand Down Expand Up @@ -525,7 +525,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* </ul>
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ abstract class CSVDataSource extends Serializable {

object CSVDataSource {
def apply(options: CSVOptions): CSVDataSource = {
if (options.wholeFile) {
WholeFileCSVDataSource
if (options.multiLine) {
MultiLineCSVDataSource
} else {
TextInputCSVDataSource
}
Expand Down Expand Up @@ -196,7 +196,7 @@ object TextInputCSVDataSource extends CSVDataSource {
}
}

object WholeFileCSVDataSource extends CSVDataSource {
object MultiLineCSVDataSource extends CSVDataSource {
override val isSplitable: Boolean = false

override def readFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class CSVOptions(
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)

val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

val maxColumns = getInt("maxColumns", 20480)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ abstract class JsonDataSource extends Serializable {

object JsonDataSource {
def apply(options: JSONOptions): JsonDataSource = {
if (options.wholeFile) {
WholeFileJsonDataSource
if (options.multiLine) {
MultiLineJsonDataSource
} else {
TextInputJsonDataSource
}
Expand Down Expand Up @@ -147,7 +147,7 @@ object TextInputJsonDataSource extends JsonDataSource {
}
}

object WholeFileJsonDataSource extends JsonDataSource {
object MultiLineJsonDataSource extends JsonDataSource {
override val isSplitable: Boolean = {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* Loads a JSON file stream and returns the results as a `DataFrame`.
*
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
* default. For JSON (one record per file), set the `wholeFile` option to true.
* default. For JSON (one record per file), set the `multiLine` option to true.
*
* This function goes through the input once to determine the input schema. If you know the
* schema in advance, use the version that specifies the schema to avoid the extra scan.
Expand Down Expand Up @@ -205,7 +205,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* </ul>
*
Expand Down Expand Up @@ -276,7 +276,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("test for DROPMALFORMED parsing mode") {
Seq(false, true).foreach { wholeFile =>
Seq(false, true).foreach { multiLine =>
val cars = spark.read
.format("csv")
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile))

Expand All @@ -284,11 +284,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("test for FAILFAST parsing mode") {
Seq(false, true).foreach { wholeFile =>
Seq(false, true).foreach { multiLine =>
val exception = intercept[SparkException] {
spark.read
.format("csv")
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.options(Map("header" -> "true", "mode" -> "failfast"))
.load(testFile(carsFile)).collect()
}
Expand Down Expand Up @@ -990,13 +990,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
Seq(false, true).foreach { wholeFile =>
Seq(false, true).foreach { multiLine =>
val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
// We use `PERMISSIVE` mode by default if invalid string is given.
val df1 = spark
.read
.option("mode", "abcd")
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.schema(schema)
.csv(testFile(valueMalformedFile))
checkAnswer(df1,
Expand All @@ -1011,7 +1011,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.read
.option("mode", "Permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))
checkAnswer(df2,
Expand All @@ -1028,7 +1028,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.read
.option("mode", "permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.schema(schemaWithCorrField2)
.csv(testFile(valueMalformedFile))
checkAnswer(df3,
Expand All @@ -1041,7 +1041,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.schema(schema.add(columnNameOfCorruptRecord, IntegerType))
.csv(testFile(valueMalformedFile))
.collect
Expand Down Expand Up @@ -1073,7 +1073,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

val df = spark.read
.option("header", true)
.option("wholeFile", true)
.option("multiLine", true)
.csv(path.getAbsolutePath)

// Check if headers have new lines in the names.
Expand All @@ -1096,10 +1096,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("Empty file produces empty dataframe with empty schema") {
Seq(false, true).foreach { wholeFile =>
Seq(false, true).foreach { multiLine =>
val df = spark.read.format("csv")
.option("header", true)
.option("wholeFile", wholeFile)
.option("multiLine", multiLine)
.load(testFile(emptyFile))

assert(df.schema === spark.emptyDataFrame.schema)
Expand Down
Loading

0 comments on commit af4f89c

Please sign in to comment.