Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20848][SQL] Shutdown the pool after reading parquet files #18073

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging {
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
val pool = new ForkJoinPool(8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be better to share one global thread pool? Creating a lot of thread pools may not increase the concurrency

Copy link
Member Author

@viirya viirya May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern is that if we share a thread pool for parquet reading, we may limit the concurrency as @srowen pointed out in the JIRA.

If we have multiple parquet reading in parallel, they will share one pool. Currently they own their pools.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if using a shared one will change current behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's keep the previous behavior

parFiles.tasksupport = new ForkJoinTaskSupport(pool)
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
Expand All @@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging {
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
} finally {
pool.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we terminate pool inside flatMap?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing it outside? For example?

    val parFiles = partFiles.par
    val pool = new ForkJoinPool(8)
    parFiles.tasksupport = new ForkJoinTaskSupport(pool)
    try {
      parFiles.flatMap { currentFile =>
        ...
      }.seq
    } finally {
      pool.shutdown()
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this will fail some test, but it didn't...

When you fix this error, could you call ThreadUtils.newForkJoinPool instead to set a better thread name?

Copy link
Member

@zsxwing zsxwing May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing it outside? For example?

Just realized the toSeq is lazy. But shutting down in flatMap is also not correct. NVM. I was wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing @gatorsmile

I was shutdowning it outside at the beginning of this PR. I changed to current way after @srowen's suggestion.

I was thinking it can be wrong initially. But seems it is fine and I think the tasks are all invoked at the beginning and no more tasks are submitted later, so to shutdown inside is ok.

I can go to submit a follow-up if you still think we need to change it. Thank you.

Copy link
Member

@zsxwing zsxwing May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't check the details. But I guess the implementation will submit tasks one by one. Then it's possible that when the first task is shutting down the pool, some tasks has not yet been submitted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We should take a safer approach. Let me submit a follow-up for this. Thanks @zsxwing.

}
}.seq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ import org.apache.spark.sql.test.SharedSQLContext

class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {

test("Number of threads doesn't grow extremely after parquet file reading") {
withTempDir { dir =>
val file = dir.toString + "/file"
spark.range(1).toDF("a").coalesce(1).write.parquet(file)
spark.read.parquet(file)
val numThreadBefore = Thread.activeCount
(1 to 100).map { _ =>
spark.read.parquet(file)
}
val numThreadAfter = Thread.activeCount
// Hard to test a correct thread number,
// but it shouldn't increase more than a reasonable number.
assert(numThreadAfter - numThreadBefore < 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after waiting for enough time, can we expect this to be 0?

Copy link
Member Author

@viirya viirya May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It reduces to a few (about 3) after waiting an enough time. The number returned by Thread.activeCount is only an estimate. So we may not expect this to be 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks hacky, can we think of a better way to test it? If not, I suggest to remove this test, as the fix is straightforward and we can verify it manually by some profile tools.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's remove the test.

}
}

test("read parquet footers in parallel") {
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
withTempDir { dir =>
Expand Down