From 74e1fc50765486a0cfa8af45839a9f3382e2d2fd Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 14 Feb 2017 22:26:05 +0900 Subject: [PATCH 01/12] Fill NULL in a field when detecting a malformed token --- .../datasources/csv/UnivocityParser.scala | 26 ++++++++++++++----- .../resources/test-data/value-malformed.csv | 2 ++ .../execution/datasources/csv/CSVSuite.scala | 23 ++++++++++++++++ 3 files changed, 44 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/value-malformed.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 2e409b3f5fbfc..267062a66ad4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -175,14 +175,26 @@ private[csv] class UnivocityParser( convertWithParseMode(parser.parseLine(input)) { tokens => var i: Int = 0 while (i < indexArr.length) { - val pos = indexArr(i) - // It anyway needs to try to parse since it decides if this row is malformed - // or not after trying to cast in `DROPMALFORMED` mode even if the casted - // value is not stored in the row. - val value = valueConverters(pos).apply(tokens(pos)) - if (i < requiredSchema.length) { - row(i) = value + try { + val pos = indexArr(i) + // It anyway needs to try to parse since it decides if this row is malformed + // or not after trying to cast in `DROPMALFORMED` mode even if the casted + // value is not stored in the row. + val value = valueConverters(pos).apply(tokens(pos)) + if (i < requiredSchema.length) { + row(i) = value + } + } catch { + case _: NumberFormatException | _: IllegalArgumentException if options.permissive => + logWarning("Fill NULL in a field because a malformed token detected: " + + tokens(indexArr(i))) + if (i < requiredSchema.length) { + row.setNullAt(i) + } + case e: Throwable => + throw e } + i += 1 } row diff --git a/sql/core/src/test/resources/test-data/value-malformed.csv b/sql/core/src/test/resources/test-data/value-malformed.csv new file mode 100644 index 0000000000000..8945ed73d2e83 --- /dev/null +++ b/sql/core/src/test/resources/test-data/value-malformed.csv @@ -0,0 +1,2 @@ +0,2013-111-11 12:13:14 +1,1983-08-04 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0c9a7298c3fa0..25e3ecec360b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" + private val valueMalformedFile = "test-data/value-malformed.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -958,4 +959,26 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { checkAnswer(df, Row(1, null)) } } + + test("SPARK-18699 fill null when parsing non-string types malformed") { + val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val df = spark + .read + .format("csv") + .option("mode", "PERMISSIVE") + .schema(schema) + .load(testFile(valueMalformedFile)) + checkAnswer(df, Row(0, null) :: Row(1, java.sql.Date.valueOf("1983-08-04")) :: Nil) + + val errMsg = intercept[SparkException] { + spark + .read + .format("csv") + .option("mode", "FAILFAST") + .schema(schema) + .load(testFile(valueMalformedFile)) + .collect + }.getCause.getMessage + assert(errMsg.contains("Timestamp format must be")) + } } From 763601d55868f663572d01f7949239076e7d23e4 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 16 Feb 2017 09:37:17 +0900 Subject: [PATCH 02/12] Add an option columnNameOfCorruptRecord for CSV formats --- .../datasources/csv/CSVFileFormat.scala | 12 +++- .../datasources/csv/CSVOptions.scala | 4 +- .../datasources/csv/UnivocityParser.scala | 58 +++++++++++-------- .../execution/datasources/csv/CSVSuite.scala | 29 ++++++---- 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 566f40f454393..aff18c8241bf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -101,6 +101,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val columnNameOfCorruptRecord = csvOptions.columnNameOfCorruptRecord.getOrElse(sparkSession + .sessionState.conf.columnNameOfCorruptRecord) + val shouldHandleCorruptRecord = csvOptions.permissive && requiredSchema.exists { f => + f.name == columnNameOfCorruptRecord && f.dataType == StringType && f.nullable + } + (file: PartitionedFile) => { val lines = { val conf = broadcastedHadoopConf.value.value @@ -120,7 +126,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions) - val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions) + val parser = if (shouldHandleCorruptRecord) { + new UnivocityParser(dataSchema, requiredSchema, csvOptions, Some(columnNameOfCorruptRecord)) + } else { + new UnivocityParser(dataSchema, requiredSchema, csvOptions) + } filteredLines.flatMap(parser.parse) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index b7fbaa4f44a62..c2b15eeddda4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -26,7 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions( +private[sql] class CSVOptions( @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { @@ -95,6 +95,8 @@ private[csv] class CSVOptions( val dropMalformed = ParseModes.isDropMalformedMode(parseMode) val permissive = ParseModes.isPermissiveMode(parseMode) + val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val nullValue = parameters.getOrElse("nullValue", "") val nanValue = parameters.getOrElse("nanValue", "NaN") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 267062a66ad4c..9148822077f1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -36,7 +36,8 @@ import org.apache.spark.unsafe.types.UTF8String private[csv] class UnivocityParser( schema: StructType, requiredSchema: StructType, - options: CSVOptions) extends Logging { + options: CSVOptions, + columnNameOfCorruptRecord: Option[String] = None) extends Logging { require(requiredSchema.toSet.subsetOf(schema.toSet), "requiredSchema should be the subset of schema.") @@ -45,8 +46,12 @@ private[csv] class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val inputSchema = columnNameOfCorruptRecord.map { fn => + StructType(schema.filter(_.name != fn)) + }.getOrElse(schema) + private val valueConverters = - schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray private val parser = new CsvParser(options.asParserSettings) @@ -54,7 +59,12 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val indexArr: Array[Int] = { + private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined + private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn => + requiredSchema.getFieldIndex(fn) + }.getOrElse(-1) + + private val indexArr: Array[(Int, Int)] = { val fields = if (options.dropMalformed) { // If `dropMalformed` is enabled, then it needs to parse all the values // so that we can decide which row is malformed. @@ -62,7 +72,9 @@ private[csv] class UnivocityParser( } else { requiredSchema } - fields.map(schema.indexOf(_: StructField)).toArray + fields.zipWithIndex.filter { case (_, i) => i != corruptIndex }.map { case (f, i) => + (inputSchema.indexOf(f), i) + }.toArray } /** @@ -173,10 +185,9 @@ private[csv] class UnivocityParser( */ def parse(input: String): Option[InternalRow] = { convertWithParseMode(parser.parseLine(input)) { tokens => - var i: Int = 0 - while (i < indexArr.length) { + var foundMalformed: Boolean = false + indexArr.foreach { case (pos, i) => try { - val pos = indexArr(i) // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. @@ -185,17 +196,22 @@ private[csv] class UnivocityParser( row(i) = value } } catch { - case _: NumberFormatException | _: IllegalArgumentException if options.permissive => - logWarning("Fill NULL in a field because a malformed token detected: " + - tokens(indexArr(i))) + case _: NumberFormatException | _: IllegalArgumentException + if options.permissive && shouldHandleCorruptRecord => + foundMalformed = true if (i < requiredSchema.length) { row.setNullAt(i) } case e: Throwable => throw e } - - i += 1 + } + if (shouldHandleCorruptRecord) { + if (foundMalformed) { + row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString)) + } else { + row.setNullAt(corruptIndex) + } } row } @@ -203,7 +219,7 @@ private[csv] class UnivocityParser( private def convertWithParseMode( tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { - if (options.dropMalformed && schema.length != tokens.length) { + if (options.dropMalformed && inputSchema.length != tokens.length) { if (numMalformedRecords < options.maxMalformedLogPerPartition) { logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") } @@ -214,14 +230,14 @@ private[csv] class UnivocityParser( } numMalformedRecords += 1 None - } else if (options.failFast && schema.length != tokens.length) { + } else if (options.failFast && inputSchema.length != tokens.length) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - val checkedTokens = if (options.permissive && schema.length > tokens.length) { - tokens ++ new Array[String](schema.length - tokens.length) - } else if (options.permissive && schema.length < tokens.length) { - tokens.take(schema.length) + val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) { + tokens ++ new Array[String](inputSchema.length - tokens.length) + } else if (options.permissive && inputSchema.length < tokens.length) { + tokens.take(inputSchema.length) } else { tokens } @@ -234,12 +250,6 @@ private[csv] class UnivocityParser( logWarning("Parse exception. " + s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { - logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + - "found on this partition. Malformed records from now on will not be logged.") - } - numMalformedRecords += 1 None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 25e3ecec360b3..047f286b915a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -960,25 +960,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("SPARK-18699 fill null when parsing non-string types malformed") { + test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { val schema = new StructType().add("a", IntegerType).add("b", TimestampType) - val df = spark - .read - .format("csv") - .option("mode", "PERMISSIVE") - .schema(schema) - .load(testFile(valueMalformedFile)) - checkAnswer(df, Row(0, null) :: Row(1, java.sql.Date.valueOf("1983-08-04")) :: Nil) - val errMsg = intercept[SparkException] { spark .read - .format("csv") - .option("mode", "FAILFAST") + .option("mode", "PERMISSIVE") .schema(schema) - .load(testFile(valueMalformedFile)) + .csv(testFile(valueMalformedFile)) .collect }.getCause.getMessage assert(errMsg.contains("Timestamp format must be")) + + // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records + val columnNameOfCorruptRecord = "_unparsed" + val df = spark + .read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema.add(columnNameOfCorruptRecord, StringType)) + .csv(testFile(valueMalformedFile)) + + checkAnswer(df, + Row(0, null, "0,2013-111-11 12:13:14") :: + Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: + Nil) } } From 873a383cd5c3c1b1179bf9a3b121acc3755beac0 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 19 Feb 2017 01:27:36 +0900 Subject: [PATCH 03/12] Apply review comments --- .../datasources/csv/CSVFileFormat.scala | 15 ++--- .../datasources/csv/CSVOptions.scala | 20 ++++-- .../datasources/csv/UnivocityParser.scala | 67 +++++++++---------- 3 files changed, 53 insertions(+), 49 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index aff18c8241bf4..69c99add81f03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -101,11 +101,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val columnNameOfCorruptRecord = csvOptions.columnNameOfCorruptRecord.getOrElse(sparkSession - .sessionState.conf.columnNameOfCorruptRecord) - val shouldHandleCorruptRecord = csvOptions.permissive && requiredSchema.exists { f => - f.name == columnNameOfCorruptRecord && f.dataType == StringType && f.nullable - } + val parsedOptions = new CSVOptions( + options, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) (file: PartitionedFile) => { val lines = { @@ -126,11 +125,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions) - val parser = if (shouldHandleCorruptRecord) { - new UnivocityParser(dataSchema, requiredSchema, csvOptions, Some(columnNameOfCorruptRecord)) - } else { - new UnivocityParser(dataSchema, requiredSchema, csvOptions) - } + val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) filteredLines.flatMap(parser.parse) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index c2b15eeddda4b..1caeec7c63945 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -26,12 +26,21 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[sql] class CSVOptions( - @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) +private[csv] class CSVOptions( + @transient private val parameters: CaseInsensitiveMap[String], + defaultTimeZoneId: String, + defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { - def this(parameters: Map[String, String], defaultTimeZoneId: String) = - this(CaseInsensitiveMap(parameters), defaultTimeZoneId) + def this( + parameters: Map[String, String], + defaultTimeZoneId: String, + defaultColumnNameOfCorruptRecord: String = "") = { + this( + CaseInsensitiveMap(parameters), + defaultTimeZoneId, + defaultColumnNameOfCorruptRecord) + } private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -95,7 +104,8 @@ private[sql] class CSVOptions( val dropMalformed = ParseModes.isDropMalformedMode(parseMode) val permissive = ParseModes.isPermissiveMode(parseMode) - val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val columnNameOfCorruptRecord = + parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) val nullValue = parameters.getOrElse("nullValue", "") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 9148822077f1e..30efd4ab139eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -36,8 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String private[csv] class UnivocityParser( schema: StructType, requiredSchema: StructType, - options: CSVOptions, - columnNameOfCorruptRecord: Option[String] = None) extends Logging { + options: CSVOptions) extends Logging { require(requiredSchema.toSet.subsetOf(schema.toSet), "requiredSchema should be the subset of schema.") @@ -46,9 +45,15 @@ private[csv] class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any - private val inputSchema = columnNameOfCorruptRecord.map { fn => - StructType(schema.filter(_.name != fn)) - }.getOrElse(schema) + private val shouldHandleCorruptRecord = options.permissive && requiredSchema.exists { f => + f.name == options.columnNameOfCorruptRecord && f.dataType == StringType && f.nullable + } + + private val inputSchema = if (shouldHandleCorruptRecord) { + StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) + } else { + schema + } private val valueConverters = inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray @@ -59,10 +64,8 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined - private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn => - requiredSchema.getFieldIndex(fn) - }.getOrElse(-1) + private val corruptIndex = + requiredSchema.getFieldIndex(options.columnNameOfCorruptRecord).getOrElse(-1) private val indexArr: Array[(Int, Int)] = { val fields = if (options.dropMalformed) { @@ -185,32 +188,13 @@ private[csv] class UnivocityParser( */ def parse(input: String): Option[InternalRow] = { convertWithParseMode(parser.parseLine(input)) { tokens => - var foundMalformed: Boolean = false indexArr.foreach { case (pos, i) => - try { - // It anyway needs to try to parse since it decides if this row is malformed - // or not after trying to cast in `DROPMALFORMED` mode even if the casted - // value is not stored in the row. - val value = valueConverters(pos).apply(tokens(pos)) - if (i < requiredSchema.length) { - row(i) = value - } - } catch { - case _: NumberFormatException | _: IllegalArgumentException - if options.permissive && shouldHandleCorruptRecord => - foundMalformed = true - if (i < requiredSchema.length) { - row.setNullAt(i) - } - case e: Throwable => - throw e - } - } - if (shouldHandleCorruptRecord) { - if (foundMalformed) { - row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString)) - } else { - row.setNullAt(corruptIndex) + // It anyway needs to try to parse since it decides if this row is malformed + // or not after trying to cast in `DROPMALFORMED` mode even if the casted + // value is not stored in the row. + val value = valueConverters(pos).apply(tokens(pos)) + if (i < requiredSchema.length) { + row(i) = value } } row @@ -234,6 +218,9 @@ private[csv] class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { + if (options.permissive && shouldHandleCorruptRecord) { + row.setNullAt(corruptIndex) + } val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) { tokens ++ new Array[String](inputSchema.length - tokens.length) } else if (options.permissive && inputSchema.length < tokens.length) { @@ -245,11 +232,23 @@ private[csv] class UnivocityParser( try { Some(convert(checkedTokens)) } catch { + // We only catch exceptions about malformed values here and pass over other exceptions + // (e.g., SparkException about unsupported types). + case _: NumberFormatException | _: IllegalArgumentException + if options.permissive && shouldHandleCorruptRecord => + row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString)) + Some(row) case NonFatal(e) if options.dropMalformed => if (numMalformedRecords < options.maxMalformedLogPerPartition) { logWarning("Parse exception. " + s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") } + if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } + numMalformedRecords += 1 None } } From 4eed4a4bea927c6365dac2e2734c895a0a1a0026 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 20 Feb 2017 18:39:15 +0900 Subject: [PATCH 04/12] Apply review comments --- python/pyspark/sql/readwriter.py | 18 +++++-- .../apache/spark/sql/DataFrameReader.scala | 10 +++- .../datasources/csv/CSVFileFormat.scala | 24 ++++++--- .../datasources/csv/UnivocityParser.scala | 50 +++++++++---------- .../sql/streaming/DataStreamReader.scala | 10 +++- .../execution/datasources/csv/CSVSuite.scala | 44 +++++++++------- 6 files changed, 98 insertions(+), 58 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6bed390e60c96..0c0cbc57f460b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -303,8 +303,9 @@ def text(self, paths): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + negativeInf=None, columnNameOfCorruptRecord=None, dateFormat=None, + timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, + maxMalformedLogPerPartition=None, mode=None, timeZone=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -367,10 +368,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + If users set a string-type field named ``columnNameOfCorruptRecord`` in a + user-specified ``schema``, it puts the malformed string into the field. When + a ``schema`` is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: defines a field name for malformed strings created + by ``PERMISSIVE`` mode. If a user-specified `schema` + has this named field, Spark puts malformed strings + in this field. This overrides + `spark.sql.columnNameOfCorruptRecord`. + >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes [('_c0', 'string'), ('_c1', 'string')] @@ -380,7 +389,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) if isinstance(path, basestring): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cb9493a575643..5208c72363760 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -422,12 +422,18 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When - * a schema is set by user, it sets `null` for extra fields.
    • + *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users + * set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`, + * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` + * for extra fields.
    • *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    *
  • + *
  • `columnNameOfCorruptRecord` (default is the value specified in + * `spark.sql.columnNameOfCorruptRecord`): defines a field name for malformed strings created by + * `PERMISSIVE` mode. If a user-specified `schema` has this named field, Spark puts malformed + * strings in this field. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 69c99add81f03..215fc8f59893e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -27,9 +27,9 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} +import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.sources._ @@ -96,8 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) - + CSVUtils.verifySchema(dataSchema) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -106,25 +105,34 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) + // Check a field requirement for corrupt records here to throw an exception in a driver side + dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).map { corruptFieldIndex => + val f = dataSchema(corruptFieldIndex) + if (f.dataType != StringType || !f.nullable) { + throw new AnalysisException( + "A field for corrupt records must be a string type and nullable") + } + } + (file: PartitionedFile) => { val lines = { val conf = broadcastedHadoopConf.value.value val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) linesReader.map { line => - new String(line.getBytes, 0, line.getLength, csvOptions.charset) + new String(line.getBytes, 0, line.getLength, parsedOptions.charset) } } - val linesWithoutHeader = if (csvOptions.headerFlag && file.start == 0) { + val linesWithoutHeader = if (parsedOptions.headerFlag && file.start == 0) { // Note that if there are only comments in the first block, the header would probably // be not dropped. - CSVUtils.dropHeaderLine(lines, csvOptions) + CSVUtils.dropHeaderLine(lines, parsedOptions) } else { lines } - val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions) + val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, parsedOptions) val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) filteredLines.flatMap(parser.parse) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 30efd4ab139eb..295ebc1fd427a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -45,15 +46,15 @@ private[csv] class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any - private val shouldHandleCorruptRecord = options.permissive && requiredSchema.exists { f => - f.name == options.columnNameOfCorruptRecord && f.dataType == StringType && f.nullable + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + corruptFieldIndex.foreach { corrFieldIndex => + require(schema(corrFieldIndex).dataType == StringType, + "A field for corrupt records must have a string type") + require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable") } - private val inputSchema = if (shouldHandleCorruptRecord) { - StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) - } else { - schema - } + private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) + CSVUtils.verifySchema(inputSchema) private val valueConverters = inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray @@ -64,9 +65,6 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val corruptIndex = - requiredSchema.getFieldIndex(options.columnNameOfCorruptRecord).getOrElse(-1) - private val indexArr: Array[(Int, Int)] = { val fields = if (options.dropMalformed) { // If `dropMalformed` is enabled, then it needs to parse all the values @@ -75,7 +73,12 @@ private[csv] class UnivocityParser( } else { requiredSchema } - fields.zipWithIndex.filter { case (_, i) => i != corruptIndex }.map { case (f, i) => + val fieldsWithIndexes = fields.zipWithIndex + corruptFieldIndex.map { case corrFieldIndex => + fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex } + }.getOrElse { + fieldsWithIndexes + }.map { case (f, i) => (inputSchema.indexOf(f), i) }.toArray } @@ -162,8 +165,6 @@ private[csv] class UnivocityParser( case udt: UserDefinedType[_] => (datum: String) => makeConverter(name, udt.sqlType, nullable, options) - - case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}") } private def nullSafeDatum( @@ -187,8 +188,10 @@ private[csv] class UnivocityParser( * the record is malformed). */ def parse(input: String): Option[InternalRow] = { - convertWithParseMode(parser.parseLine(input)) { tokens => - indexArr.foreach { case (pos, i) => + parseWithMode(input) { tokens => + var i: Int = 0 + while (i < indexArr.length) { + val (pos, _) = indexArr(i) // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. @@ -196,13 +199,15 @@ private[csv] class UnivocityParser( if (i < requiredSchema.length) { row(i) = value } + i += 1 } row } } - private def convertWithParseMode( - tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { + private def parseWithMode(input: String)(convert: Array[String] => InternalRow) + : Option[InternalRow] = { + val tokens = parser.parseLine(input) if (options.dropMalformed && inputSchema.length != tokens.length) { if (numMalformedRecords < options.maxMalformedLogPerPartition) { logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") @@ -218,9 +223,6 @@ private[csv] class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - if (options.permissive && shouldHandleCorruptRecord) { - row.setNullAt(corruptIndex) - } val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) { tokens ++ new Array[String](inputSchema.length - tokens.length) } else if (options.permissive && inputSchema.length < tokens.length) { @@ -232,11 +234,9 @@ private[csv] class UnivocityParser( try { Some(convert(checkedTokens)) } catch { - // We only catch exceptions about malformed values here and pass over other exceptions - // (e.g., SparkException about unsupported types). - case _: NumberFormatException | _: IllegalArgumentException - if options.permissive && shouldHandleCorruptRecord => - row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString)) + case NonFatal(e) if options.permissive => + val row = new GenericInternalRow(requiredSchema.length) + corruptFieldIndex.map(row(_) = UTF8String.fromString(input)) Some(row) case NonFatal(e) if options.dropMalformed => if (numMalformedRecords < options.maxMalformedLogPerPartition) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 99943944f3c6d..de62a738b1c5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -245,12 +245,18 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When - * a schema is set by user, it sets `null` for extra fields.
    • + *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users + * set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`, + * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` + * for extra fields.
    • *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    *
  • + *
  • `columnNameOfCorruptRecord` (default is the value specified in + * `spark.sql.columnNameOfCorruptRecord`): defines a field name for malformed strings created by + * `PERMISSIVE` mode. If a user-specified `schema` has this named field, Spark puts malformed + * strings in this field. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • * * * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 047f286b915a9..a89509551f779 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkException -import org.apache.spark.sql.{DataFrame, QueryTest, Row, UDT} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ @@ -701,12 +701,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { }.getMessage assert(msg.contains("CSV data source does not support array data type")) - msg = intercept[SparkException] { + msg = intercept[UnsupportedOperationException] { val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil) spark.range(1).write.csv(csvDir) spark.read.schema(schema).csv(csvDir).collect() - }.getCause.getMessage - assert(msg.contains("Unsupported type: array")) + }.getMessage + assert(msg.contains("CSV data source does not support array data type.")) } } @@ -962,28 +962,38 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { val schema = new StructType().add("a", IntegerType).add("b", TimestampType) - val errMsg = intercept[SparkException] { - spark - .read - .option("mode", "PERMISSIVE") - .schema(schema) - .csv(testFile(valueMalformedFile)) - .collect - }.getCause.getMessage - assert(errMsg.contains("Timestamp format must be")) + val df1 = spark + .read + .option("mode", "PERMISSIVE") + .schema(schema) + .csv(testFile(valueMalformedFile)) + checkAnswer(df1, + Row(null, null) :: + Row(1, java.sql.Date.valueOf("1983-08-04")) :: + Nil) // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records val columnNameOfCorruptRecord = "_unparsed" - val df = spark + val df2 = spark .read .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .schema(schema.add(columnNameOfCorruptRecord, StringType)) .csv(testFile(valueMalformedFile)) - - checkAnswer(df, - Row(0, null, "0,2013-111-11 12:13:14") :: + checkAnswer(df2, + Row(null, null, "0,2013-111-11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) + + val errMsg = intercept[AnalysisException] { + spark + .read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema.add(columnNameOfCorruptRecord, IntegerType)) + .csv(testFile(valueMalformedFile)) + .collect + }.getMessage + assert(errMsg.startsWith("A field for corrupt records must be a string type and nullable")) } } From 448e6fe9f20f11c1171bcaeebf27620fd2f93ac3 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 22 Feb 2017 11:10:29 +0900 Subject: [PATCH 05/12] Apply review comments --- python/pyspark/sql/readwriter.py | 12 +++++----- .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 22 ++++++++++--------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 0c0cbc57f460b..d12ceb7900d67 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -303,9 +303,9 @@ def text(self, paths): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, columnNameOfCorruptRecord=None, dateFormat=None, - timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None, timeZone=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -389,10 +389,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat, maxColumns=maxColumns, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 215fc8f59893e..652effb0848bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -110,7 +110,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val f = dataSchema(corruptFieldIndex) if (f.dataType != StringType || !f.nullable) { throw new AnalysisException( - "A field for corrupt records must be a string type and nullable") + "The field for corrupt records must be string type and nullable") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 295ebc1fd427a..ee56d33b26392 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -27,7 +27,6 @@ import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -48,13 +47,11 @@ private[csv] class UnivocityParser( private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) corruptFieldIndex.foreach { corrFieldIndex => - require(schema(corrFieldIndex).dataType == StringType, - "A field for corrupt records must have a string type") - require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable") + require(schema(corrFieldIndex).dataType == StringType) + require(schema(corrFieldIndex).nullable) } private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) - CSVUtils.verifySchema(inputSchema) private val valueConverters = inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray @@ -65,6 +62,8 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) + // This parser loads an `indexArr._1`-th position value in input tokens, + // then put the value in `row(indexArr._2)`. private val indexArr: Array[(Int, Int)] = { val fields = if (options.dropMalformed) { // If `dropMalformed` is enabled, then it needs to parse all the values @@ -165,6 +164,9 @@ private[csv] class UnivocityParser( case udt: UserDefinedType[_] => (datum: String) => makeConverter(name, udt.sqlType, nullable, options) + + // We don't actually hit this exception though, we keep it for understandability + case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}") } private def nullSafeDatum( @@ -188,16 +190,16 @@ private[csv] class UnivocityParser( * the record is malformed). */ def parse(input: String): Option[InternalRow] = { - parseWithMode(input) { tokens => + convertWithParseMode(input) { tokens => var i: Int = 0 while (i < indexArr.length) { - val (pos, _) = indexArr(i) + val (pos, rowIdx) = indexArr(i) // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. val value = valueConverters(pos).apply(tokens(pos)) if (i < requiredSchema.length) { - row(i) = value + row(rowIdx) = value } i += 1 } @@ -205,8 +207,8 @@ private[csv] class UnivocityParser( } } - private def parseWithMode(input: String)(convert: Array[String] => InternalRow) - : Option[InternalRow] = { + private def convertWithParseMode( + input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = { val tokens = parser.parseLine(input) if (options.dropMalformed && inputSchema.length != tokens.length) { if (numMalformedRecords < options.maxMalformedLogPerPartition) { From 619094a4dbb0e400daac0d94905b40df07b650b6 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 22 Feb 2017 11:35:28 +0900 Subject: [PATCH 06/12] Add more tests --- .../execution/datasources/csv/CSVSuite.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a89509551f779..bb961d117cd6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -974,17 +974,35 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records val columnNameOfCorruptRecord = "_unparsed" + val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) val df2 = spark .read .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) - .schema(schema.add(columnNameOfCorruptRecord, StringType)) + .schema(schemaWithCorrField1) .csv(testFile(valueMalformedFile)) checkAnswer(df2, Row(null, null, "0,2013-111-11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) + // We put a `columnNameOfCorruptRecord` field in the middle of a schema + new StructType + val schemaWithCorrField2 = new StructType() + .add("a", IntegerType) + .add(columnNameOfCorruptRecord, StringType) + .add("b", TimestampType) + val df3 = spark + .read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schemaWithCorrField2) + .csv(testFile(valueMalformedFile)) + checkAnswer(df3, + Row(null, "0,2013-111-11 12:13:14", null) :: + Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: + Nil) + val errMsg = intercept[AnalysisException] { spark .read @@ -994,6 +1012,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .csv(testFile(valueMalformedFile)) .collect }.getMessage - assert(errMsg.startsWith("A field for corrupt records must be a string type and nullable")) + assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) } } From 80c3775fa2f5d4641dd05769cc85bf928f7806fe Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 23 Feb 2017 11:16:37 +0900 Subject: [PATCH 07/12] Replace map with foreach --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 652effb0848bd..59f2919edfe2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -106,7 +106,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession.sessionState.conf.columnNameOfCorruptRecord) // Check a field requirement for corrupt records here to throw an exception in a driver side - dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).map { corruptFieldIndex => + dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => val f = dataSchema(corruptFieldIndex) if (f.dataType != StringType || !f.nullable) { throw new AnalysisException( From c86febe6b018faafa62e0bf6444f8cd4326fb021 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 23 Feb 2017 13:50:15 +0900 Subject: [PATCH 08/12] Apply review comments --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 16 +++++++++++++--- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 2 +- .../spark/sql/streaming/DataStreamReader.scala | 2 +- .../sql/execution/datasources/csv/CSVSuite.scala | 1 - 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index d12ceb7900d67..251df07577f3f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -368,7 +368,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - If users set a string-type field named ``columnNameOfCorruptRecord`` in a + If users set a string type field named ``columnNameOfCorruptRecord`` in a user-specified ``schema``, it puts the malformed string into the field. When a ``schema`` is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 965c8f6b269e9..60ca2b9e8fd7e 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -558,7 +558,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -619,10 +620,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + If users set a string type field named ``columnNameOfCorruptRecord`` in a + user-specified ``schema``, it puts the malformed string into the field. When + a ``schema`` is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: defines a field name for malformed strings created + by ``PERMISSIVE`` mode. If a user-specified `schema` + has this named field, Spark puts malformed strings + in this field. This overrides + `spark.sql.columnNameOfCorruptRecord`. + >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming True @@ -636,7 +645,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 5208c72363760..07206532879bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -423,7 +423,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users - * set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`, + * set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`, * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` * for extra fields.
    • *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index ee56d33b26392..de46fed97508d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -208,7 +208,7 @@ private[csv] class UnivocityParser( } private def convertWithParseMode( - input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = { + input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = { val tokens = parser.parseLine(input) if (options.dropMalformed && inputSchema.length != tokens.length) { if (numMalformedRecords < options.maxMalformedLogPerPartition) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index de62a738b1c5d..929f955af7bc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -246,7 +246,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
        *
      • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users - * set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`, + * set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`, * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` * for extra fields.
      • *
      • `DROPMALFORMED` : ignores the whole corrupted records.
      • diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bb961d117cd6b..371d4311baa3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -987,7 +987,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Nil) // We put a `columnNameOfCorruptRecord` field in the middle of a schema - new StructType val schemaWithCorrField2 = new StructType() .add("a", IntegerType) .add(columnNameOfCorruptRecord, StringType) From 8d9386abea0941a40a89fd4860c5568ec55d7d95 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 23 Feb 2017 15:41:23 +0900 Subject: [PATCH 09/12] Update docs --- python/pyspark/sql/readwriter.py | 24 ++++++++++--------- python/pyspark/sql/streaming.py | 24 ++++++++++--------- .../apache/spark/sql/DataFrameReader.scala | 18 +++++++------- .../sql/streaming/DataStreamReader.scala | 18 +++++++------- 4 files changed, 44 insertions(+), 40 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 251df07577f3f..6c4bedde1cccb 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + ``columnNameOfCorruptRecord``. An user-defined schema can include \ + a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ + When a schema is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -367,18 +368,19 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - If users set a string type field named ``columnNameOfCorruptRecord`` in a - user-specified ``schema``, it puts the malformed string into the field. When - a ``schema`` is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record and puts the malformed string into a new field configured by \ + ``columnNameOfCorruptRecord``. An user-defined schema can include \ + a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ + When a schema is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. - :param columnNameOfCorruptRecord: defines a field name for malformed strings created - by ``PERMISSIVE`` mode. If a user-specified `schema` - has this named field, Spark puts malformed strings - in this field. This overrides - `spark.sql.columnNameOfCorruptRecord`. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 60ca2b9e8fd7e..b338379638964 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -465,8 +465,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + ``columnNameOfCorruptRecord``. An user-defined schema can include \ + a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ + When a schema is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -619,18 +620,19 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - If users set a string type field named ``columnNameOfCorruptRecord`` in a - user-specified ``schema``, it puts the malformed string into the field. When - a ``schema`` is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record and puts the malformed string into a new field configured by \ + ``columnNameOfCorruptRecord``. An user-defined schema can include \ + a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ + When a schema is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. - :param columnNameOfCorruptRecord: defines a field name for malformed strings created - by ``PERMISSIVE`` mode. If a user-specified `schema` - has this named field, Spark puts malformed strings - in this field. This overrides - `spark.sql.columnNameOfCorruptRecord`. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 07206532879bc..417c76519f055 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -286,8 +286,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
          *
        • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. When - * a schema is set by user, it sets `null` for extra fields.
        • + * the malformed string into a new field configured by `columnNameOfCorruptRecord`. + * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` + * for corrupt records. When a schema is set by user, it sets `null` for extra fields. *
        • `DROPMALFORMED` : ignores the whole corrupted records.
        • *
        • `FAILFAST` : throws an exception when it meets corrupted records.
        • *
        @@ -422,18 +423,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
      • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. *
          - *
        • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users - * set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`, - * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` - * for extra fields.
        • + *
        • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts + * the malformed string into a new field configured by `columnNameOfCorruptRecord`. + * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` + * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
        • *
        • `DROPMALFORMED` : ignores the whole corrupted records.
        • *
        • `FAILFAST` : throws an exception when it meets corrupted records.
        • *
        *
      • *
      • `columnNameOfCorruptRecord` (default is the value specified in - * `spark.sql.columnNameOfCorruptRecord`): defines a field name for malformed strings created by - * `PERMISSIVE` mode. If a user-specified `schema` has this named field, Spark puts malformed - * strings in this field. This overrides `spark.sql.columnNameOfCorruptRecord`.
      • + * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string + * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`. *
      * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 929f955af7bc1..7bc99aafc1624 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -168,8 +168,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
        *
      • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. When - * a schema is set by user, it sets `null` for extra fields.
      • + * the malformed string into a new field configured by `columnNameOfCorruptRecord`. + * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` + * for corrupt records. When a schema is set by user, it sets `null` for extra fields. *
      • `DROPMALFORMED` : ignores the whole corrupted records.
      • *
      • `FAILFAST` : throws an exception when it meets corrupted records.
      • *
      @@ -245,18 +246,17 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
    • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. *
        - *
      • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users - * set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`, - * it puts the malformed string into the field. When a `schema` is set by user, it sets `null` - * for extra fields.
      • + *
      • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts + * the malformed string into a new field configured by `columnNameOfCorruptRecord`. + * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` + * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
      • *
      • `DROPMALFORMED` : ignores the whole corrupted records.
      • *
      • `FAILFAST` : throws an exception when it meets corrupted records.
      • *
      *
    • *
    • `columnNameOfCorruptRecord` (default is the value specified in - * `spark.sql.columnNameOfCorruptRecord`): defines a field name for malformed strings created by - * `PERMISSIVE` mode. If a user-specified `schema` has this named field, Spark puts malformed - * strings in this field. This overrides `spark.sql.columnNameOfCorruptRecord`.
    • + * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string + * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`. *
    * * @since 2.0.0 From 512fb42404fee1c702bc9e18ad36f15da9e0b273 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 23 Feb 2017 16:26:51 +0900 Subject: [PATCH 10/12] Clean up code in UnivocityParser --- .../datasources/csv/UnivocityParser.scala | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index de46fed97508d..4d248c82d58de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -51,10 +51,10 @@ private[csv] class UnivocityParser( require(schema(corrFieldIndex).nullable) } - private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) + private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) private val valueConverters = - inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray private val parser = new CsvParser(options.asParserSettings) @@ -62,9 +62,7 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - // This parser loads an `indexArr._1`-th position value in input tokens, - // then put the value in `row(indexArr._2)`. - private val indexArr: Array[(Int, Int)] = { + private val indexArr: Array[Int] = { val fields = if (options.dropMalformed) { // If `dropMalformed` is enabled, then it needs to parse all the values // so that we can decide which row is malformed. @@ -72,14 +70,7 @@ private[csv] class UnivocityParser( } else { requiredSchema } - val fieldsWithIndexes = fields.zipWithIndex - corruptFieldIndex.map { case corrFieldIndex => - fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex } - }.getOrElse { - fieldsWithIndexes - }.map { case (f, i) => - (inputSchema.indexOf(f), i) - }.toArray + fields.map(schema.indexOf(_: StructField)).toArray } /** @@ -193,13 +184,13 @@ private[csv] class UnivocityParser( convertWithParseMode(input) { tokens => var i: Int = 0 while (i < indexArr.length) { - val (pos, rowIdx) = indexArr(i) + val pos = indexArr(i) // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. val value = valueConverters(pos).apply(tokens(pos)) if (i < requiredSchema.length) { - row(rowIdx) = value + row(i) = value } i += 1 } @@ -210,7 +201,7 @@ private[csv] class UnivocityParser( private def convertWithParseMode( input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = { val tokens = parser.parseLine(input) - if (options.dropMalformed && inputSchema.length != tokens.length) { + if (options.dropMalformed && dataSchema.length != tokens.length) { if (numMalformedRecords < options.maxMalformedLogPerPartition) { logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") } @@ -221,14 +212,30 @@ private[csv] class UnivocityParser( } numMalformedRecords += 1 None - } else if (options.failFast && inputSchema.length != tokens.length) { + } else if (options.failFast && dataSchema.length != tokens.length) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) { - tokens ++ new Array[String](inputSchema.length - tokens.length) - } else if (options.permissive && inputSchema.length < tokens.length) { - tokens.take(inputSchema.length) + val checkedTokens = if (options.permissive) { + // If a length of parsed tokens is not equal to expected one, it makes the length the same + // with the expected. If the length is shorter, it adds extra tokens in the tail. + // If longer, it drops extra tokens. + val lengthSafeTokens = if (dataSchema.length > tokens.length) { + tokens ++ new Array[String](dataSchema.length - tokens.length) + } else if (dataSchema.length < tokens.length) { + tokens.take(dataSchema.length) + } else { + tokens + } + + // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed + // strings when loading parsed tokens into a resulting `row`. + corruptFieldIndex.map { corrFieldIndex => + val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex) + front ++ new Array[String](1) ++ back + }.getOrElse { + lengthSafeTokens + } } else { tokens } @@ -238,7 +245,7 @@ private[csv] class UnivocityParser( } catch { case NonFatal(e) if options.permissive => val row = new GenericInternalRow(requiredSchema.length) - corruptFieldIndex.map(row(_) = UTF8String.fromString(input)) + corruptFieldIndex.foreach(row(_) = UTF8String.fromString(input)) Some(row) case NonFatal(e) if options.dropMalformed => if (numMalformedRecords < options.maxMalformedLogPerPartition) { From 3d514e546a77a3c73b8c7b2668932a90c1d92fd8 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 23 Feb 2017 21:18:59 +0900 Subject: [PATCH 11/12] Revert clean-up code for a CSV parser --- .../datasources/csv/UnivocityParser.scala | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 4d248c82d58de..eb471651db2e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -54,7 +54,7 @@ private[csv] class UnivocityParser( private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) private val valueConverters = - schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray private val parser = new CsvParser(options.asParserSettings) @@ -62,7 +62,9 @@ private[csv] class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val indexArr: Array[Int] = { + // This parser loads an `indexArr._1`-th position value in input tokens, + // then put the value in `row(indexArr._2)`. + private val indexArr: Array[(Int, Int)] = { val fields = if (options.dropMalformed) { // If `dropMalformed` is enabled, then it needs to parse all the values // so that we can decide which row is malformed. @@ -70,7 +72,17 @@ private[csv] class UnivocityParser( } else { requiredSchema } - fields.map(schema.indexOf(_: StructField)).toArray + // TODO: Revisit this; we need to clean up code here for readability. + // See an URL below for related discussions: + // https://github.com/apache/spark/pull/16928#discussion_r102636720 + val fieldsWithIndexes = fields.zipWithIndex + corruptFieldIndex.map { case corrFieldIndex => + fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex } + }.getOrElse { + fieldsWithIndexes + }.map { case (f, i) => + (dataSchema.indexOf(f), i) + }.toArray } /** @@ -184,13 +196,13 @@ private[csv] class UnivocityParser( convertWithParseMode(input) { tokens => var i: Int = 0 while (i < indexArr.length) { - val pos = indexArr(i) + val (pos, rowIdx) = indexArr(i) // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. val value = valueConverters(pos).apply(tokens(pos)) if (i < requiredSchema.length) { - row(i) = value + row(rowIdx) = value } i += 1 } @@ -216,25 +228,19 @@ private[csv] class UnivocityParser( throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(options.delimiter.toString)}") } else { - val checkedTokens = if (options.permissive) { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - val lengthSafeTokens = if (dataSchema.length > tokens.length) { + // If a length of parsed tokens is not equal to expected one, it makes the length the same + // with the expected. If the length is shorter, it adds extra tokens in the tail. + // If longer, it drops extra tokens. + // + // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, + // we probably need to treat it as a malformed record. + // See an URL below for related discussions: + // https://github.com/apache/spark/pull/16928#discussion_r102657214 + val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { + if (dataSchema.length > tokens.length) { tokens ++ new Array[String](dataSchema.length - tokens.length) - } else if (dataSchema.length < tokens.length) { - tokens.take(dataSchema.length) } else { - tokens - } - - // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed - // strings when loading parsed tokens into a resulting `row`. - corruptFieldIndex.map { corrFieldIndex => - val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex) - front ++ new Array[String](1) ++ back - }.getOrElse { - lengthSafeTokens + tokens.take(dataSchema.length) } } else { tokens From a58ff1f772e8ec86b4a320b826d2fc2959bb6439 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 24 Feb 2017 00:08:14 +0900 Subject: [PATCH 12/12] Update descriptions for comments --- python/pyspark/sql/readwriter.py | 24 +++++++++++-------- python/pyspark/sql/streaming.py | 24 +++++++++++-------- .../apache/spark/sql/DataFrameReader.scala | 16 ++++++++----- .../sql/streaming/DataStreamReader.scala | 16 ++++++++----- 4 files changed, 48 insertions(+), 32 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6c4bedde1cccb..b5e5b18bcbefa 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -191,11 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. An user-defined schema can include \ - a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -368,11 +370,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. An user-defined schema can include \ - a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index b338379638964..bd19fd4e385b4 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -463,11 +463,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. An user-defined schema can include \ - a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -620,11 +622,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. An user-defined schema can include \ - a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 417c76519f055..3efe9c4dfcf5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -286,9 +286,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. - * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` - * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
    • + * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep + * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` + * in an user-defined schema. If a schema does not have the field, it drops corrupt records + * during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord` + * field in an output schema. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    @@ -424,9 +426,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. - * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` - * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
    • + * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep + * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` + * in an user-defined schema. If a schema does not have the field, it drops corrupt records + * during parsing. When a length of parsed CSV tokens is shorter than an expected length + * of a schema, it sets `null` for extra fields. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 7bc99aafc1624..f78e73f319de7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -168,9 +168,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. - * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` - * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
    • + * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep + * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` + * in an user-defined schema. If a schema does not have the field, it drops corrupt records + * during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord` + * field in an output schema. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    @@ -247,9 +249,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts - * the malformed string into a new field configured by `columnNameOfCorruptRecord`. - * An user-defined schema can include a string type field named `columnNameOfCorruptRecord` - * for corrupt records. When a schema is set by user, it sets `null` for extra fields.
    • + * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep + * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` + * in an user-defined schema. If a schema does not have the field, it drops corrupt records + * during parsing. When a length of parsed CSV tokens is shorter than an expected length + * of a schema, it sets `null` for extra fields. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *