Skip to content

Commit

Permalink
[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet…
Browse files Browse the repository at this point in the history
… files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #18100 from viirya/SPARK-20848-followup.

(cherry picked from commit 6b68d61)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
viirya authored and cloud-fan committed May 25, 2017
1 parent 3f82d65 commit e0aa239
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

class ParquetFileFormat
extends FileFormat
Expand Down Expand Up @@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
val pool = new ForkJoinPool(8)
val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
try {
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
}
} finally {
pool.shutdown()
}
}.seq
}.seq
} finally {
pool.shutdown()
}
}

/**
Expand Down

0 comments on commit e0aa239

Please sign in to comment.