Skip to content

Commit

Permalink
[SPARK-46547][SS] Swallow non-fatal exception in maintenance task to …
Browse files Browse the repository at this point in the history
…avoid deadlock between maintenance thread and streaming aggregation operator

### What changes were proposed in this pull request?
Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator

### Why are the changes needed?
This change fixes a race condition that causes a deadlock between the task thread and the maintenance thread. This is primarily only possible with the streaming aggregation operator. In this case, we use 2 physical operators - `StateStoreRestoreExec` and `StateStoreSaveExec`. The first one opens the store in read-only mode and the 2nd one does the actual commit.

However, the following sequence of events creates an issue
1. Task thread runs the `StateStoreRestoreExec` and gets the store instance and thereby the DB instance lock
2. Maintenance thread fails with an error for some reason
3. Maintenance thread takes the `loadedProviders` lock and tries to call `close` on all the loaded providers
4. Task thread tries to execute the StateStoreRDD for the `StateStoreSaveExec` operator and tries to acquire the `loadedProviders` lock which is held by the thread above

So basically if the maintenance thread is interleaved between the `restore/save` operations, there is a deadlock condition based on the `loadedProviders` lock and the DB instance lock.

The fix proposes to simply release the resources at the end of the `StateStoreRestoreExec` operator (note that `abort` for `ReadStateStore` is likely a misnomer - but we choose to follow the already provided API in this case)

Relevant Logs:
Link - https://github.com/anishshri-db/spark/actions/runs/7356847259/job/20027577445?pr=4
```
2023-12-27T09:59:02.6362466Z 09:59:02.635 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error in maintenanceThreadPool
2023-12-27T09:59:02.6365616Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6367861Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6369383Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6370693Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6371781Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6372876Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6373967Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6375104Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
2023-12-27T09:59:02.6376676Z 09:59:02.636 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error running maintenance thread
2023-12-27T09:59:02.6379079Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6381083Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6382490Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6383816Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6384875Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6386294Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6387439Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6388674Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
...
2023-12-27T10:01:02.4292831Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- changing schema of state when restarting query - state format version 2 (RocksDBStateStore) *** FAILED *** (2 minutes)�[0m�[0m
2023-12-27T10:01:02.4295311Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  Timed out waiting for stream: The code passed to failAfter did not complete within 120 seconds.�[0m�[0m
2023-12-27T10:01:02.4297271Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  java.base/java.lang.Thread.getStackTrace(Thread.java:1619)�[0m�[0m
2023-12-27T10:01:02.4299084Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)�[0m�[0m
2023-12-27T10:01:02.4300948Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
...
2023-12-27T10:01:02.6474472Z 10:01:02.646 WARN org.apache.spark.sql.execution.streaming.state.RocksDB StateStoreId(opId=0,partId=0,name=default): Error closing RocksDB
2023-12-27T10:01:02.6482792Z org.apache.spark.SparkException: [CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR] An error occurred during loading state. StateStoreId(opId=0,partId=0,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1858)] as it was not released by [ThreadId: Some(3835), task: partition 0.0 in stage 513.0, TID 1369] after 120009 ms.
2023-12-27T10:01:02.6488483Z Thread holding the lock has trace: app//org.apache.spark.sql.execution.streaming.state.StateStore$.getStateStoreProvider(StateStore.scala:577)
2023-12-27T10:01:02.6490896Z app//org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:565)
2023-12-27T10:01:02.6493072Z app//org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:128)
2023-12-27T10:01:02.6494915Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6496232Z app//org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
2023-12-27T10:01:02.6497655Z app//org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
2023-12-27T10:01:02.6499153Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6556758Z 10:01:02.654 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 513.0 (TID 1369) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 260 cancelled part of cancelled job group cf26288c-0158-48ce-8a86-00a596dd45d8 SQLSTATE: XXKDA)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests
```
[info] Run completed in 6 minutes, 20 seconds.
[info] Total number of tests run: 80
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
Yes

Closes #44542 from anishshri-db/task/SPARK-46547.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Jan 10, 2024
1 parent a73ff66 commit f7b0b45
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.state

import java.io._

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkEnv}
Expand Down Expand Up @@ -233,7 +235,15 @@ private[sql] class RocksDBStateStoreProvider
}

override def doMaintenance(): Unit = {
rocksDB.doMaintenance()
try {
rocksDB.doMaintenance()
} catch {
// SPARK-46547 - Swallow non-fatal exception in maintenance task to avoid deadlock between
// maintenance thread and streaming aggregation operator
case NonFatal(ex) =>
logWarning(s"Ignoring error while performing maintenance operations with exception=",
ex)
}
}

override def close(): Unit = {
Expand Down

0 comments on commit f7b0b45

Please sign in to comment.