-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet #16474
Conversation
Test build #70903 has finished for PR 16474 at commit
|
// Reads footers in multi-threaded manner within each task | ||
val footers = | ||
ParquetFileReader.readAllFootersInParallel( | ||
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala | ||
ParquetFileFormat.readParquetFootersInParallel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's happening to readAllFootersInParallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't make it ignore corrupt files. So it successfully reads all footers or completely fails even just one footer is corrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some unit tests for readParquetFootersInParallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
partFiles: Seq[FileStatus], | ||
ignoreCorruptFiles: Boolean): Seq[Footer] = { | ||
val footers = partFiles.map { currentFile => | ||
new Callable[Option[Footer]]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems pretty convoluted. can we jsut use parallel collections to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. let me try to change this to parallel collections.
d6878e1
to
6b562eb
Compare
Test build #71053 has finished for PR 16474 at commit
|
Test build #71055 has finished for PR 16474 at commit
|
ping @rxin I do address the previous comments. Can you review again? |
// E.g., vectorized Parquet reader. | ||
readFunction(currentFile) | ||
} catch { | ||
case e @(_: RuntimeException | _: IOException) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also check the error message? or RuntimeException
may catch other unexpected exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I have this concern too in the pr description.
One problem is the error message is varying across data sources. To list all error messages here looks not a good idea.
} catch { | ||
case e @(_: RuntimeException | _: IOException) => | ||
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Iterator.empty
try { | ||
// The readFunction may read files before consuming the iterator. | ||
// E.g., vectorized Parquet reader. | ||
readFunction(currentFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that we can make readFunction
guarantee that data reading must happen after the first Iterator.next
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is hard to guarantee this because readFunction
is coming from individual data source. Even we can modify current data sources, we may not be able to prevent other data sources doing this.
LGTM |
Test build #71417 has finished for PR 16474 at commit
|
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16474 from viirya/fix-ignorecorrupted-parquet-files. (cherry picked from commit 61e48f5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.1! |
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#16474 from viirya/fix-ignorecorrupted-parquet-files.
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#16474 from viirya/fix-ignorecorrupted-parquet-files.
What changes were proposed in this pull request?
We have a config
spark.sql.files.ignoreCorruptFiles
which can be used to ignore corrupt files when reading files in SQL. Currently theignoreCorruptFiles
config has two issues and can't work for Parquet:FileScanRDD
. Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.htmlFileScanRDD
, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case,ignoreCorruptFiles
config doesn't work too.This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc.
Two main changes in this patch:
Replace
ParquetFileReader.readAllFootersInParallel
by implementing the logic to read footers in multi-threaded mannerWe can't ignore corrupt files if we use
ParquetFileReader.readAllFootersInParallel
. So this patch implements the logic to do the similar thing inreadParquetFootersInParallel
.In
FileScanRDD
, we need to ignore corrupt file too when we callreadFunction
to return iterator.One thing to notice is:
We read schema from Parquet file's footer. The method to read footer
ParquetFileReader.readFooter
throwsRuntimeException
, instead ofIOException
, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catchesRuntimeException
. One concern is that it might also shadow other runtime exceptions other than reading corrupt files.How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.