Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32779][SQL] Avoid using synchronized API of SessionCatalog in withClient flow, this leads to DeadLock #29649

Closed

Conversation

sandeep-katta
Copy link
Contributor

@sandeep-katta sandeep-katta commented Sep 4, 2020

What changes were proposed in this pull request?

No need of using database name in loadPartition API of Shim_v3_0 to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with SessionCatalog can be removed in Shim layer

Why are the changes needed?

To avoid deadlock when communicating with Hive metastore 3.1.x

Found one Java-level deadlock:
=============================
"worker3":
  waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
  which is held by "worker0"
"worker0":
  waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
  which is held by "worker3"

Java stack information for the threads listed above:
===================================================
"worker3":
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
  - waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
  - locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
  at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)
"worker0":
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - waiting to lock <0x0000000785c15c80
  > (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
  - locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock. 

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tested using below script by executing in spark-shell and I found no dead lock

launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true

code

def testHiveDeadLock = {
      import scala.collection.mutable.ArrayBuffer
      import scala.util.Random
      println("test hive DeadLock")
      spark.sql("drop database if exists testDeadLock cascade")
      spark.sql("create database testDeadLock")
      spark.sql("use testDeadLock")
      val tableCount = 100
      val tableNamePrefix = "testdeadlock"
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"drop table if exists $tableName")
        spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
      }

      val threads = new ArrayBuffer[Thread]
      for (i <- 0 until tableCount) {
        threads.append(new Thread( new Runnable {
          override def run: Unit = {
            val tableName = s"$tableNamePrefix${i + 1}"
            val rand = Random
            val df = spark.range(0, 20000).toDF("a")
            val location = s"/tmp/${rand.nextLong.abs}"
            df.write.mode("overwrite").orc(location)
            spark.sql(
              s"""
        LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
          }
        }, s"worker$i"))
        threads(i).start()
      }

      for (i <- 0 until tableCount) {
        println(s"Joining with thread $i")
        threads(i).join()
      }
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"select count(*) from $tableName").show(false)
      }
      println("All done")
    }

    for(i <- 0 until 100) {
      testHiveDeadLock
      println(s"completed {$i}th iteration")
    }
  }

…thHiveClient flow , this eventually leads to Deadlock.
@sandeep-katta
Copy link
Contributor Author

cc @wangyum @cloud-fan

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128298 has finished for PR 29649 at commit 8d45542.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sandeep-katta
Copy link
Contributor Author

CC @dongjoon-hyun

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.0.

HyukjinKwon pushed a commit that referenced this pull request Sep 7, 2020
…withClient flow, this leads to DeadLock

### What changes were proposed in this pull request?

No need of using database name in `loadPartition` API of `Shim_v3_0` to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with `SessionCatalog` can be removed in Shim layer

### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
  waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
  which is held by "worker0"
"worker0":
  waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
  which is held by "worker3"

Java stack information for the threads listed above:
===================================================
"worker3":
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
  - waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
  - locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
  at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)
"worker0":
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - waiting to lock <0x0000000785c15c80
  > (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
  - locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead lock

launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true

**code**
```
def testHiveDeadLock = {
      import scala.collection.mutable.ArrayBuffer
      import scala.util.Random
      println("test hive DeadLock")
      spark.sql("drop database if exists testDeadLock cascade")
      spark.sql("create database testDeadLock")
      spark.sql("use testDeadLock")
      val tableCount = 100
      val tableNamePrefix = "testdeadlock"
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"drop table if exists $tableName")
        spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
      }

      val threads = new ArrayBuffer[Thread]
      for (i <- 0 until tableCount) {
        threads.append(new Thread( new Runnable {
          override def run: Unit = {
            val tableName = s"$tableNamePrefix${i + 1}"
            val rand = Random
            val df = spark.range(0, 20000).toDF("a")
            val location = s"/tmp/${rand.nextLong.abs}"
            df.write.mode("overwrite").orc(location)
            spark.sql(
              s"""
        LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
          }
        }, s"worker$i"))
        threads(i).start()
      }

      for (i <- 0 until tableCount) {
        println(s"Joining with thread $i")
        threads(i).join()
      }
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"select count(*) from $tableName").show(false)
      }
      println("All done")
    }

    for(i <- 0 until 100) {
      testHiveDeadLock
      println(s"completed {$i}th iteration")
    }
  }
```

Closes #29649 from sandeep-katta/metastore3.1DeadLock.

Authored-by: sandeep.katta <sandeep.katta2007@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit b0322bf)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@@ -1329,8 +1329,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
isSrcLocal: Boolean): Unit = {
val session = SparkSession.getActiveSession
assert(session.nonEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it seems that we can remove the above two lines together, doesn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah!! sorry, I missed that will raise follow-up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised the follow-up PR 29736. Sorry for the trouble

@dongjoon-hyun
Copy link
Member

Thank you, @sandeep-katta and all.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…withClient flow, this leads to DeadLock

### What changes were proposed in this pull request?

No need of using database name in `loadPartition` API of `Shim_v3_0` to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with `SessionCatalog` can be removed in Shim layer

### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
  waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
  which is held by "worker0"
"worker0":
  waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
  which is held by "worker3"

Java stack information for the threads listed above:
===================================================
"worker3":
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
  - waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
  - locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
  at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)
"worker0":
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - waiting to lock <0x0000000785c15c80
  > (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
  - locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead lock

launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true

**code**
```
def testHiveDeadLock = {
      import scala.collection.mutable.ArrayBuffer
      import scala.util.Random
      println("test hive DeadLock")
      spark.sql("drop database if exists testDeadLock cascade")
      spark.sql("create database testDeadLock")
      spark.sql("use testDeadLock")
      val tableCount = 100
      val tableNamePrefix = "testdeadlock"
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"drop table if exists $tableName")
        spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
      }

      val threads = new ArrayBuffer[Thread]
      for (i <- 0 until tableCount) {
        threads.append(new Thread( new Runnable {
          override def run: Unit = {
            val tableName = s"$tableNamePrefix${i + 1}"
            val rand = Random
            val df = spark.range(0, 20000).toDF("a")
            val location = s"/tmp/${rand.nextLong.abs}"
            df.write.mode("overwrite").orc(location)
            spark.sql(
              s"""
        LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
          }
        }, s"worker$i"))
        threads(i).start()
      }

      for (i <- 0 until tableCount) {
        println(s"Joining with thread $i")
        threads(i).join()
      }
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"select count(*) from $tableName").show(false)
      }
      println("All done")
    }

    for(i <- 0 until 100) {
      testHiveDeadLock
      println(s"completed {$i}th iteration")
    }
  }
```

Closes apache#29649 from sandeep-katta/metastore3.1DeadLock.

Authored-by: sandeep.katta <sandeep.katta2007@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit b0322bf)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants