-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error #42979
Conversation
cc @HyukjinKwon |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X Could you rebase this on the recent master Scala 2.13 + Java 17.
Done |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Outdated
Show resolved
Hide resolved
Some(StructType(Nil)) | ||
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true | ||
case e: FileNotFoundException if !options.ignoreMissingFiles => throw e | ||
case e: IOException if options.ignoreCorruptFiles => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about RuntimeException
like at
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
Line 262 in 7796d8a
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like JSON/CSV infer code should be similar to FileScanRDD
and ignoreCorruptFiles
should impact on errors caused by RuntimeException
. What happens if put RuntimeException
in the case? :
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It never reach here if we catch one RuntimeException
. Because it will be catched by https://github.com/apache/spark/pull/42979/files/942b7c38a277a1b38b85a64ad940b56528ae8a03#diff-774d08eb04cd18039c576c7e23609430476d3dd2668535f0432f04b65b8ab234R92. So it would be useless code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean remove it from the case:
case e @ (_: JsonProcessingException | _: MalformedInputException) =>
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e)
...
and put it there:
case e @ (_: IOException | _: RuntimeException) if options.ignoreCorruptFiles =>
logWarning("Skipped the rest of the content in the corrupted file", e)
Some(StructType(Nil))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I got it. But I'm not sure it would change behavior or not. Let me change it and see CI would report error or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, we consider a JSON record as malformed when the JSON parser cannot parse its already retrieved input. So, it was able to read some text from a file but cannot parse. For instance, if we look at JSON parser exceptions:
public JsonParser createParser(InputStream in) throws IOException, JsonParseException {
IOContext ctxt = _createContext(_createContentReference(in), false);
return _createParser(_decorate(in, ctxt), ctxt);
}
...
public abstract JsonToken nextToken() throws IOException;
JsonParseException (JsonProcessingException) + IOException
So, the RuntimeException
can come only for some corrupted files. cc @HyukjinKwon @cloud-fan
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, adjust PR's title and its description regarding to ignoreMissingFiles
.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
Outdated
Show resolved
Hide resolved
@@ -99,6 +102,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { | |||
val wrappedCharException = new CharConversionException(msg) | |||
wrappedCharException.initCause(e) | |||
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, wrappedCharException) | |||
case e: FileNotFoundException if ignoreMissingFiles => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit annoying that we need to handle corrupted/missing files in multiple places, but I don't have a better idea.
+1, LGTM. Merging to master. |
What changes were proposed in this pull request?
Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error, it would be like:
Because multiline CSV/JSON use
BinaryFileRDD
notFileScanRDD
. UnlikeFileScanRDD
, when met corrupt files will checkignoreCorruptFiles
config to avoid report IOException,BinaryFileRDD
will not report error because it return normalPortableDataStream
. So we should catch it when infer schema in lambda function. Also do same thing forignoreMissingFiles
.Why are the changes needed?
Fix the bug when use mulitline mode with ignoreCorruptFiles/ignoreMissingFiles config.
Does this PR introduce any user-facing change?
No
How was this patch tested?
add new test.
Was this patch authored or co-authored using generative AI tooling?
No