New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42794][SS] Increase the lockAcquireTimeoutMs to 2 minutes for acquiring the RocksDB state store in Structure Streaming #40425
Conversation
…ring the RocksDB state store in Structure Streaming
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
cc @HeartSaVioR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending builds.
Thanks! Merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
…acquiring the RocksDB state store in Structure Streaming We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks. * at t1, we shrink the cluster to only have one executor ``` 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled)) 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled)) ``` * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to the alive executor ``` 23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID 685) (10.166.225.249, executor 0, partition 7, ANY, ``` It seems that task 7.0 is able to pass dataRDD.iterator(partition, ctxt) and acquires the rocksdb lock as we are seeing ``` 23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms. 23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60006 ms. 23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms. ``` Increasing the lockAcquireTimeoutMs to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s). ### What changes were proposed in this pull request? Increase the lockAcquireTimeoutMs to 2 minutes for acquiring the RocksDB state store in Structure Streaming ### Why are the changes needed? hanging the thread for lock acquisition rather than giving up easily ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Trivial change Closes apache#40425 from huanliwang-db/increase-timeout. Authored-by: Huanli Wang <huanli.wang@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…acquiring the RocksDB state store in Structure Streaming We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks. * at t1, we shrink the cluster to only have one executor ``` 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled)) 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled)) ``` * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to the alive executor ``` 23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID 685) (10.166.225.249, executor 0, partition 7, ANY, ``` It seems that task 7.0 is able to pass dataRDD.iterator(partition, ctxt) and acquires the rocksdb lock as we are seeing ``` 23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms. 23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60006 ms. 23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms. ``` Increasing the lockAcquireTimeoutMs to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s). ### What changes were proposed in this pull request? Increase the lockAcquireTimeoutMs to 2 minutes for acquiring the RocksDB state store in Structure Streaming ### Why are the changes needed? hanging the thread for lock acquisition rather than giving up easily ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Trivial change Closes apache#40425 from huanliwang-db/increase-timeout. Authored-by: Huanli Wang <huanli.wang@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks.
It seems that task 7.0 is able to pass dataRDD.iterator(partition, ctxt) and acquires the rocksdb lock as we are seeing
Increasing the lockAcquireTimeoutMs to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s).
What changes were proposed in this pull request?
Increase the lockAcquireTimeoutMs to 2 minutes for acquiring the RocksDB state store in Structure Streaming
Why are the changes needed?
hanging the thread for lock acquisition rather than giving up easily
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Trivial change