Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS #22233

Closed
wants to merge 5 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Aug 25, 2018

What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in the scanPartitions method because it can cause a deadlock. Instead of that I propose to do scanPartitions in parallel for top level partitions only.

How was this patch tested?

I extended an existing test to trigger the deadlock.

@@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, out of curiosity, why does this have to be 0?

Copy link
Member Author

@MaxGekk MaxGekk Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the recursive calls this condition

val result = if (partitionNames.length > 1 &&
statuses.length > threshold || partitionNames.length > 2) {
is false because statuses.length is 1 and threshold is 1. So, it leads to sequential listening of files. I just enforce parallel scanning even for 1 file/folder.

@SparkQA
Copy link

SparkQA commented Aug 25, 2018

Test build #95251 has finished for PR 22233 at commit 59a376d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -671,7 +674,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, listFilesInParallel = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change might introduce performance regression. Do you know why it works when using .par previously?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change might introduce performance regression.

Right, if there is significant disbalance of sub-folders, scanning will be slower probably.

Do you know why it works when using .par previously?

Scala parallel collections can cope with nested calls. See this from slide 12: https://www.slideshare.net/AleksandarProkopec/scala-parallel-collections

@gatorsmile I can revert Scala parallel collections here since we use them on the driver, and parmap is not not necessary here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk Do you have a stack trace on each thread when a dead lock occurs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do jstack.txt

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you attaching the stack trace. I have just looked at it. It looks strange to me. Every thread is waiting for. No blocker is there, only one locked exists.
In typical case, a deadlock occurs due to existence of blocker as attached stack trace in #22221

I will investigate it furthermore tomorrow if we need to use this implementation instead of reverting it to the original implementation to use Scala parallel collection.

...
        - parking to wait for  <0x0000000793c0d610> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
        at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:317)
        at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.scanPartitions(ddl.scala:690)
        at org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand.run(ddl.scala:626)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        - locked <0x0000000793b04e88> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean there is no avaiable thread in a given thread pool when a problem try to execute a new Future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk Right, all Futures do the same - trying to execute another Future on the same fixed thread pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worry whether the similar deadlock may occur in other places due to

  • larger parallelism than the fixed thread pool
  • nested parallelism like this

I also realized there is another parmap implementation uses thread pool. Can we use another implemetation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use another implemetation?

This is what @zsxwing proposed. Please, look at my comment #22233 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. sorry for my overlooking.

Are other places safe where parallelism would not reach the fixed thread pool size?

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95343 has finished for PR 22233 at commit 071de47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable = {
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question about the changes in this file. Are they related to the work of this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Yes, the changes are related to an existing test which was modified to reproduce the issue. In particular, this line is related to support of any number of partition columns.

@@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean): CatalogTable = {
isDataSource: Boolean,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface of this function looks strange. The original one is also hacky. We should refine them later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, please don't overwrite a method with a default parameter. It's very easy to use different default values then the value to pick up will depend on the type you are using...

@gatorsmile
Copy link
Member

Basically, this PR is to revert the code to the original .par -based solution.

LGTM Thanks! Merged to master.

@asfgit asfgit closed this in aff8f15 Aug 28, 2018
@MaxGekk MaxGekk deleted the fix-recover-partitions branch August 17, 2019 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants