Skip to content

Commit

Permalink
[SPARK-48143][SQL] Use lightweight exceptions for control-flow betwee…
Browse files Browse the repository at this point in the history
…n UnivocityParser and FailureSafeParser

# What changes were proposed in this pull request?
New lightweight exception for control-flow between UnivocityParser and FalureSafeParser to speed-up malformed CSV parsing

### Why are the changes needed?
Parsing in `PermissiveMode` is slow due to heavy exception construction (stacktrace filling + string template substitution in `SparkRuntimeException`)

### Does this PR introduce _any_ user-facing change?
No, since `FailureSafeParser` unwraps `BadRecordException` and correctly rethrows user-facing exceptions in `FailFastMode`

### How was this patch tested?
- `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
- Manually run csv benchmark on DB benchmark workspace
- Manually checked correct and malformed csv in sherk-shell (org.apache.spark.SparkException is thrown with the stacktrace)

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46400 from vladimirg-db/vladimirg-db/speed-up-csv-parser.

Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
vladimirg-db authored and cloud-fan committed May 7, 2024
1 parent 08c6bb9 commit 326dbb4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,17 @@ class UnivocityParser(
throw BadRecordException(
() => getCurrentInput,
() => Array.empty,
QueryExecutionErrors.malformedCSVRecordError(""))
() => QueryExecutionErrors.malformedCSVRecordError(""))
}

val currentInput = getCurrentInput

var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens. It continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing
// tokens.
Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
} else None
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
Expand All @@ -348,7 +348,7 @@ class UnivocityParser(
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
badRecordException = badRecordException.orElse(Some(() => e))
// Use the corresponding DEFAULT value associated with the column, if any.
row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
}
Expand All @@ -359,7 +359,7 @@ class UnivocityParser(
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
() => currentInput, () => Array(requiredRow.get), badRecordException.get)
() => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get)
} else {
requiredRow
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,16 +613,15 @@ class JacksonParser(
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => Array.empty, e)
throw BadRecordException(() => recordLiteral(record), cause = e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => Array.empty,
wrappedCharException)
throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,31 @@ case class PartialResultArrayException(
extends Exception(cause)

/**
* Exception thrown when the underlying parser meet a bad record and can't parse it.
* Exception thrown when the underlying parser meets a bad record and can't parse it. Used for
* control flow between wrapper and underlying parser without overhead of creating a full exception.
* @param record a function to return the record that cause the parser to fail
* @param partialResults a function that returns an row array, which is the partial results of
* parsing this bad record.
* @param cause the actual exception about why the record is bad and can't be parsed.
* @param cause a function to return the actual exception about why the record is bad and can't be
* parsed.
*/
case class BadRecordException(
@transient record: () => UTF8String,
@transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
cause: Throwable) extends Exception(cause)
@transient partialResults: () => Array[InternalRow],
@transient cause: () => Throwable)
extends Exception() {

override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
override def fillInStackTrace(): Throwable = this
}

object BadRecordException {
def apply(
record: () => UTF8String,
partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
cause: Throwable): BadRecordException =
new BadRecordException(record, partialResults, () => cause)
}

/**
* Exception thrown when the underlying parser parses a JSON array as a struct.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FailureSafeParser[IN](
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
e.getCause match {
e.cause() match {
case _: JsonArraysAsStructsException =>
// SPARK-42298 we recreate the exception here to make sure the error message
// have the record content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,15 @@ class StaxXmlParser(
// XML parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => xmlRecord, () => Array.empty, e)
throw BadRecordException(() => xmlRecord, cause = e)
case e: CharConversionException if options.charset.isEmpty =>
val msg =
"""XML parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => xmlRecord, () => Array.empty,
wrappedCharException)
throw BadRecordException(() => xmlRecord, cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => xmlRecord,
Expand Down

0 comments on commit 326dbb4

Please sign in to comment.