Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet #16474

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,17 @@ class FileScanRDD(
try {
if (ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
private val internalIter = readFunction(currentFile)
private val internalIter = {
try {
// The readFunction may read files before consuming the iterator.
// E.g., vectorized Parquet reader.
readFunction(currentFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that we can make readFunction guarantee that data reading must happen after the first Iterator.next?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is hard to guarantee this because readFunction is coming from individual data source. Even we can modify current data sources, we may not be able to prevent other data sources doing this.

} catch {
case e @(_: RuntimeException | _: IOException) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also check the error message? or RuntimeException may catch other unexpected exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I have this concern too in the pr description.

One problem is the error message is varying across data sources. To list all error messages here looks not a good idea.

logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
Iterator.empty
}
}

override def getNext(): AnyRef = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.IOException
import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Try}

import org.apache.hadoop.conf.Configuration
Expand All @@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
Expand Down Expand Up @@ -151,7 +155,7 @@ class ParquetFileFormat
}
}

def inferSchema(
override def inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
Expand Down Expand Up @@ -542,6 +546,36 @@ object ParquetFileFormat extends Logging {
StructType(parquetSchema ++ missingFields)
}

/**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
* files when reading footers.
*/
private[parquet] def readParquetFootersInParallel(
conf: Configuration,
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
}
}.seq
}

/**
* Figures out a merged Parquet schema with a distributed Spark job.
*
Expand Down Expand Up @@ -582,6 +616,8 @@ object ParquetFileFormat extends Logging {
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sparkSession
Expand All @@ -593,13 +629,10 @@ object ParquetFileFormat extends Logging {
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq

// Skips row group information since we only need the schema
val skipRowGroups = true

// Reads footers in multi-threaded manner within each task
val footers =
ParquetFileReader.readAllFootersInParallel(
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
ParquetFileFormat.readParquetFootersInParallel(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happening to readAllFootersInParallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't make it ignore corrupt files. So it successfully reads all footers or completely fails even just one footer is corrupt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some unit tests for readParquetFootersInParallel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {

test("read parquet footers in parallel") {
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
withTempDir { dir =>
val fs = FileSystem.get(sparkContext.hadoopConfiguration)
val basePath = dir.getCanonicalPath

val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
val path3 = new Path(basePath, "third")

spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)

val fileStatuses =
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten

val footers = ParquetFileFormat.readParquetFootersInParallel(
sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)

assert(footers.size == 2)
}
}

testReadFooters(true)
val exception = intercept[java.io.IOException] {
testReadFooters(false)
}
assert(exception.getMessage().contains("Could not read footer for file"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
Expand Down Expand Up @@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

test("Enabling/disabling ignoreCorruptFiles") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
}
}

test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
Expand Down