Skip to content

Commit

Permalink
[SPARK-32779][SQL] Avoid using synchronized API of SessionCatalog in …
Browse files Browse the repository at this point in the history
…withClient flow, this leads to DeadLock

### What changes were proposed in this pull request?

No need of using database name in `loadPartition` API of `Shim_v3_0` to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with `SessionCatalog` can be removed in Shim layer

### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
  waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
  which is held by "worker0"
"worker0":
  waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
  which is held by "worker3"

Java stack information for the threads listed above:
===================================================
"worker3":
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
  - waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
  - locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
  at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)
"worker0":
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  - waiting to lock <0x0000000785c15c80
  > (a org.apache.spark.sql.hive.HiveExternalCatalog)
  at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
  - locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
  at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  - locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
  at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
  at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
  at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead lock

launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true

**code**
```
def testHiveDeadLock = {
      import scala.collection.mutable.ArrayBuffer
      import scala.util.Random
      println("test hive DeadLock")
      spark.sql("drop database if exists testDeadLock cascade")
      spark.sql("create database testDeadLock")
      spark.sql("use testDeadLock")
      val tableCount = 100
      val tableNamePrefix = "testdeadlock"
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"drop table if exists $tableName")
        spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
      }

      val threads = new ArrayBuffer[Thread]
      for (i <- 0 until tableCount) {
        threads.append(new Thread( new Runnable {
          override def run: Unit = {
            val tableName = s"$tableNamePrefix${i + 1}"
            val rand = Random
            val df = spark.range(0, 20000).toDF("a")
            val location = s"/tmp/${rand.nextLong.abs}"
            df.write.mode("overwrite").orc(location)
            spark.sql(
              s"""
        LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
          }
        }, s"worker$i"))
        threads(i).start()
      }

      for (i <- 0 until tableCount) {
        println(s"Joining with thread $i")
        threads(i).join()
      }
      for (i <- 0 until tableCount) {
        val tableName = s"$tableNamePrefix${i + 1}"
        spark.sql(s"select count(*) from $tableName").show(false)
      }
      println("All done")
    }

    for(i <- 0 until 100) {
      testHiveDeadLock
      println(s"completed {$i}th iteration")
    }
  }
```

Closes #29649 from sandeep-katta/metastore3.1DeadLock.

Authored-by: sandeep.katta <sandeep.katta2007@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
sandeep-katta authored and HyukjinKwon committed Sep 7, 2020
1 parent 05fcf26 commit b0322bf
Showing 1 changed file with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1329,8 +1329,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
isSrcLocal: Boolean): Unit = {
val session = SparkSession.getActiveSession
assert(session.nonEmpty)
val database = session.get.sessionState.catalog.getCurrentDatabase
val table = hive.getTable(database, tableName)
val table = hive.getTable(tableName)
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
Expand Down

0 comments on commit b0322bf

Please sign in to comment.