Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator #44542

Closed
wants to merge 7 commits into from

Conversation

anishshri-db
Copy link
Contributor

@anishshri-db anishshri-db commented Dec 30, 2023

What changes were proposed in this pull request?

Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator

Why are the changes needed?

This change fixes a race condition that causes a deadlock between the task thread and the maintenance thread. This is primarily only possible with the streaming aggregation operator. In this case, we use 2 physical operators - StateStoreRestoreExec and StateStoreSaveExec. The first one opens the store in read-only mode and the 2nd one does the actual commit.

However, the following sequence of events creates an issue

  1. Task thread runs the StateStoreRestoreExec and gets the store instance and thereby the DB instance lock
  2. Maintenance thread fails with an error for some reason
  3. Maintenance thread takes the loadedProviders lock and tries to call close on all the loaded providers
  4. Task thread tries to execute the StateStoreRDD for the StateStoreSaveExec operator and tries to acquire the loadedProviders lock which is held by the thread above

So basically if the maintenance thread is interleaved between the restore/save operations, there is a deadlock condition based on the loadedProviders lock and the DB instance lock.

The fix proposes to simply release the resources at the end of the StateStoreRestoreExec operator (note that abort for ReadStateStore is likely a misnomer - but we choose to follow the already provided API in this case)

Relevant Logs:
Link - https://github.com/anishshri-db/spark/actions/runs/7356847259/job/20027577445?pr=4

2023-12-27T09:59:02.6362466Z 09:59:02.635 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error in maintenanceThreadPool
2023-12-27T09:59:02.6365616Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6367861Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6369383Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6370693Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6371781Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6372876Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6373967Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6375104Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
2023-12-27T09:59:02.6376676Z 09:59:02.636 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error running maintenance thread
2023-12-27T09:59:02.6379079Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6381083Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6382490Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6383816Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6384875Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6386294Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6387439Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6388674Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
...
2023-12-27T10:01:02.4292831Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- changing schema of state when restarting query - state format version 2 (RocksDBStateStore) *** FAILED *** (2 minutes)�[0m�[0m
2023-12-27T10:01:02.4295311Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  Timed out waiting for stream: The code passed to failAfter did not complete within 120 seconds.�[0m�[0m
2023-12-27T10:01:02.4297271Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  java.base/java.lang.Thread.getStackTrace(Thread.java:1619)�[0m�[0m
2023-12-27T10:01:02.4299084Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)�[0m�[0m
2023-12-27T10:01:02.4300948Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
...
2023-12-27T10:01:02.6474472Z 10:01:02.646 WARN org.apache.spark.sql.execution.streaming.state.RocksDB StateStoreId(opId=0,partId=0,name=default): Error closing RocksDB
2023-12-27T10:01:02.6482792Z org.apache.spark.SparkException: [CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR] An error occurred during loading state. StateStoreId(opId=0,partId=0,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1858)] as it was not released by [ThreadId: Some(3835), task: partition 0.0 in stage 513.0, TID 1369] after 120009 ms.
2023-12-27T10:01:02.6488483Z Thread holding the lock has trace: app//org.apache.spark.sql.execution.streaming.state.StateStore$.getStateStoreProvider(StateStore.scala:577)
2023-12-27T10:01:02.6490896Z app//org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:565)
2023-12-27T10:01:02.6493072Z app//org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:128)
2023-12-27T10:01:02.6494915Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6496232Z app//org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
2023-12-27T10:01:02.6497655Z app//org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
2023-12-27T10:01:02.6499153Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6556758Z 10:01:02.654 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 513.0 (TID 1369) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 260 cancelled part of cancelled job group cf26288c-0158-48ce-8a86-00a596dd45d8 SQLSTATE: XXKDA)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing unit tests

[info] Run completed in 6 minutes, 20 seconds.
[info] Total number of tests run: 80
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Was this patch authored or co-authored using generative AI tooling?

Yes

@anishshri-db anishshri-db changed the title [SPARK-46547] Fix deadlock between maintenance thread and streaming aggregation operator [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator Dec 30, 2023
@anishshri-db
Copy link
Contributor Author

cc - @HeartSaVioR - PTAL whenever you get a chance, thx !

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 9, 2024

I believe the real thing here is that the failure of maintenance task is hammering all active state store providers, effectively impacting all stateful tasks on the executor.

Let's look back what we do in maintenance task. Mostly we do snapshotting and cleaning up orphaned files. If we suppose the task fails, would the state store (provider) be impacted? From what I understand, no, it is not impacted.

This is reflected in the HDFS backed state store provider. If we look at maintenance task in HDFS backed state store provider, it swallows non-fatal exception. If we agree that the failure of maintenance task in RocksDB state store provider does not impact the actual state store (provider), we can do the same to RocksDB state store provider.

@HeartSaVioR
Copy link
Contributor

That said, what we really need to fix is the behavior when maintenance task fails. A single streaming query using faulty state store provider implementation can lead every other stateful queries to fail in the same executor. While this is really something we have to fix (maybe only close the state store provider for the faulty one), but as I commented earlier, if we do not think the failure of maintenance task in RocksDB state store provider impacts the actual state store (provider), we should just swallow the exception.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only nits. Could you please update the PR title and description to reflect the new direction? Thanks!

@anishshri-db anishshri-db changed the title [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator Jan 10, 2024
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Nice RCA and thanks for the concise fix!

rocksDB.doMaintenance()
} catch {
case NonFatal(ex) =>
logWarning(s"Error performing maintenance operations with exception=$ex")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this to include full stacktrace logWarning("...", ex)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make it explicit that this error is ignored in the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - updated the PR

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the description now that fix has changed after @HeartSaVioR's comments?
I didn't fully grok the issue or the fix. Overall, avoiding throwing seems like a work around not to trigger the bug. I am sure both of you have discussed it.
LGTM.

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 10, 2024

Thanks! Merging to master/3.5.

HeartSaVioR pushed a commit that referenced this pull request Jan 10, 2024
…avoid deadlock between maintenance thread and streaming aggregation operator

### What changes were proposed in this pull request?
Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator

### Why are the changes needed?
This change fixes a race condition that causes a deadlock between the task thread and the maintenance thread. This is primarily only possible with the streaming aggregation operator. In this case, we use 2 physical operators - `StateStoreRestoreExec` and `StateStoreSaveExec`. The first one opens the store in read-only mode and the 2nd one does the actual commit.

However, the following sequence of events creates an issue
1. Task thread runs the `StateStoreRestoreExec` and gets the store instance and thereby the DB instance lock
2. Maintenance thread fails with an error for some reason
3. Maintenance thread takes the `loadedProviders` lock and tries to call `close` on all the loaded providers
4. Task thread tries to execute the StateStoreRDD for the `StateStoreSaveExec` operator and tries to acquire the `loadedProviders` lock which is held by the thread above

So basically if the maintenance thread is interleaved between the `restore/save` operations, there is a deadlock condition based on the `loadedProviders` lock and the DB instance lock.

The fix proposes to simply release the resources at the end of the `StateStoreRestoreExec` operator (note that `abort` for `ReadStateStore` is likely a misnomer - but we choose to follow the already provided API in this case)

Relevant Logs:
Link - https://github.com/anishshri-db/spark/actions/runs/7356847259/job/20027577445?pr=4
```
2023-12-27T09:59:02.6362466Z 09:59:02.635 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error in maintenanceThreadPool
2023-12-27T09:59:02.6365616Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6367861Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6369383Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6370693Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6371781Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6372876Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6373967Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6375104Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
2023-12-27T09:59:02.6376676Z 09:59:02.636 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error running maintenance thread
2023-12-27T09:59:02.6379079Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist
2023-12-27T09:59:02.6381083Z 	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
2023-12-27T09:59:02.6382490Z 	at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
2023-12-27T09:59:02.6383816Z 	at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
2023-12-27T09:59:02.6384875Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
2023-12-27T09:59:02.6386294Z 	at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
2023-12-27T09:59:02.6387439Z 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
2023-12-27T09:59:02.6388674Z 	at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
...
2023-12-27T10:01:02.4292831Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- changing schema of state when restarting query - state format version 2 (RocksDBStateStore) *** FAILED *** (2 minutes)�[0m�[0m
2023-12-27T10:01:02.4295311Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  Timed out waiting for stream: The code passed to failAfter did not complete within 120 seconds.�[0m�[0m
2023-12-27T10:01:02.4297271Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  java.base/java.lang.Thread.getStackTrace(Thread.java:1619)�[0m�[0m
2023-12-27T10:01:02.4299084Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)�[0m�[0m
2023-12-27T10:01:02.4300948Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
...
2023-12-27T10:01:02.6474472Z 10:01:02.646 WARN org.apache.spark.sql.execution.streaming.state.RocksDB StateStoreId(opId=0,partId=0,name=default): Error closing RocksDB
2023-12-27T10:01:02.6482792Z org.apache.spark.SparkException: [CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR] An error occurred during loading state. StateStoreId(opId=0,partId=0,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1858)] as it was not released by [ThreadId: Some(3835), task: partition 0.0 in stage 513.0, TID 1369] after 120009 ms.
2023-12-27T10:01:02.6488483Z Thread holding the lock has trace: app//org.apache.spark.sql.execution.streaming.state.StateStore$.getStateStoreProvider(StateStore.scala:577)
2023-12-27T10:01:02.6490896Z app//org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:565)
2023-12-27T10:01:02.6493072Z app//org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:128)
2023-12-27T10:01:02.6494915Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6496232Z app//org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
2023-12-27T10:01:02.6497655Z app//org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
2023-12-27T10:01:02.6499153Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
2023-12-27T10:01:02.6556758Z 10:01:02.654 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 513.0 (TID 1369) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 260 cancelled part of cancelled job group cf26288c-0158-48ce-8a86-00a596dd45d8 SQLSTATE: XXKDA)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests
```
[info] Run completed in 6 minutes, 20 seconds.
[info] Total number of tests run: 80
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
Yes

Closes #44542 from anishshri-db/task/SPARK-46547.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f7b0b45)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants