Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``-1`` meaning unlimited length.
:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will
log for each partition. Malformed records beyond this
number will be ignored. If None is set, it
uses the default value, ``10``.
:param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
If specified, it is ignored.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``-1`` meaning unlimited length.
:param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
If specified, it is ignored.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -548,7 +548,7 @@ case class JsonToStructs(
lazy val parser =
new JacksonParser(
rowSchema,
new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

override def dataType: DataType = schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand All @@ -82,15 +83,6 @@ private[sql] class JSONOptions(

val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}

val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
mode: String,
mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String) {

Expand Down Expand Up @@ -58,11 +58,14 @@ class FailureSafeParser[IN](
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
Iterator(toResultRow(e.partialResult(), e.record))
case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
Iterator.empty
case e: BadRecordException => throw e.cause
case e: BadRecordException => mode match {
case PermissiveMode =>
Iterator(toResultRow(e.partialResult(), e.record))
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
throw e.cause
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import org.apache.spark.internal.Logging

sealed trait ParseMode {
/**
* String name of the parse mode.
*/
def name: String
}

/**
* This mode permissively parses the records.
*/
case object PermissiveMode extends ParseMode { val name = "PERMISSIVE" }

/**
* This mode ignores the whole corrupted records.
*/
case object DropMalformedMode extends ParseMode { val name = "DROPMALFORMED" }

/**
* This mode throws an exception when it meets corrupted records.
*/
case object FailFastMode extends ParseMode { val name = "FAILFAST" }

object ParseMode extends Logging {
/**
* Returns the parse mode from the given string.
*/
def fromString(mode: String): ParseMode = mode.toUpperCase match {
case PermissiveMode.name => PermissiveMode
case DropMalformedMode.name => DropMalformedMode
case FailFastMode.name => FailFastMode
case _ =>
logWarning(s"$mode is not a valid parse mode. Using ${PermissiveMode.name}.")
PermissiveMode
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Calendar

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -367,7 +367,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
null
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows
* Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class CSVOptions(

val delimiter = CSVUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))

Expand All @@ -95,15 +96,6 @@ class CSVOptions(
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}

val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)

val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand Down Expand Up @@ -139,8 +131,6 @@ class CSVOptions(

val escapeQuotes = getBool("escapeQuotes", true)

val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)

val quoteAll = getBool("quoteAll", false)

val inputBufferSize = 128
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand All @@ -40,7 +41,7 @@ private[sql] object JsonInferSchema {
json: RDD[T],
configOptions: JSONOptions,
createParser: (JsonFactory, T) => JsonParser): StructType = {
val shouldHandleCorruptRecord = configOptions.permissive
val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord

// perform schema inference on each row and merge afterwards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,9 +992,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
Seq(false, true).foreach { wholeFile =>
val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
// We use `PERMISSIVE` mode by default if invalid string is given.
val df1 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "abcd")
.option("wholeFile", wholeFile)
.schema(schema)
.csv(testFile(valueMalformedFile))
Expand All @@ -1008,7 +1009,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
val df2 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "Permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField1)
Expand All @@ -1025,7 +1026,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.add("b", TimestampType)
val df3 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField2)
Expand Down
Loading