[SPARK-45511][SS] Fix state reader suite flakiness by clean up resources after each test run#43831
[SPARK-45511][SS] Fix state reader suite flakiness by clean up resources after each test run#43831chaoqin-li1123 wants to merge 4 commits intoapache:masterfrom
Conversation
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1 Nice finding! Pending CI.
| import org.apache.spark.sql.streaming.util.StreamManualClock | ||
|
|
||
| trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest { | ||
| trait StateDataSourceTestBase extends StreamTest with BeforeAndAfter with StateStoreMetricsTest { |
There was a problem hiding this comment.
StreamTest already has BeforeAndAfterAll. Can we use beforeAll and afterAll instead, @chaoqin-li1123 ?
There was a problem hiding this comment.
The reason we have to clean up StateStore per test is due to maintenance task. When we run the streaming query, state store is being initialized in to the executor, and registration is performed against the coordinator in driver. The lifecycle of the state store provider is not strictly tied to the the lifecycle of the streaming query - the executor closes the state store provider when coordinator indicates to the executor that the state store provider is no longer valid, which is not immediately after the streaming query has stopped. The lifecycle of the state store provider can overlap among tests.
This means maintenance task against the provider can run after test A. We are clearing the temp directory in test A after the test A has completed, which can break the operation being performed against state store provider being used in test A. E.g. directory no longer exists while maintenance task is running.
(It's far more problematic as exception in maintenance task will unload all state store providers which corresponding tasks may run concurrently, leading failures on running queries, or even JVM crash for RocksDB state store provider. That's scary, but it happens seldom/rarely so we can have time to revisit later.)
This won't be an issue in practice because we do not expect the checkpoint location to be temporary, but it is indeed an issue for how we setup and cleanup env for tests.
There was a problem hiding this comment.
Arguably we can defer the cleanup of temp directory till VM shutdown or test suite cleanup and move this to afterAll, but we do this for stream-stream join test suite already, so would like to be consistent with existing practice.
There was a problem hiding this comment.
Thank you. Got it. Then, shall we use beforeEach and afterEach because SparkFunSuite has BeforeAndAfterEach?
There was a problem hiding this comment.
I've just updated the description of PR to have full context on the problem.
There was a problem hiding this comment.
This follows the pattern we used in the existing suite.
But I'm fine either way. Good to know we already extend BeforeAndAfterEach by default and no need to extend others.
There was a problem hiding this comment.
The reason why I requested is that BeforeAndAfter is deprecated, @HeartSaVioR . I hope we follow the scalatest recommendation in Apache Spark 4.0.0 and avoid adding more instances of this deprecated class.
This trait has been deprecated and will be removed in a future version of ScalaTest. If you are only using its beforeEach and/or afterEach methods, mix in BeforeAndAfterEach instead.
There was a problem hiding this comment.
Ah, great point. Thanks for the context! @chaoqin-li1123 Shall we update the trait?
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM (Pending CIs). Thank you, @chaoqin-li1123 and @HeartSaVioR .
|
Thanks! Merging to master. |
|
Ah I missed the title - @chaoqin-li1123 let's add |
What changes were proposed in this pull request?
Fix state reader suite flakiness by clean up resources after each test.
The reason we have to clean up StateStore per test is due to maintenance task. When we run the streaming query, state store is being initialized in to the executor, and registration is performed against the coordinator in driver. The lifecycle of the state store provider is not strictly tied to the the lifecycle of the streaming query - the executor closes the state store provider when coordinator indicates to the executor that the state store provider is no longer valid, which is not immediately after the streaming query has stopped. The lifecycle of the state store provider can overlap among tests.
This means maintenance task against the provider can run after test A. We are clearing the temp directory in test A after the test A has completed, which can break the operation being performed against state store provider being used in test A. E.g. directory no longer exists while maintenance task is running.
This won't be an issue in practice because we do not expect the checkpoint location to be temporary, but it is indeed an issue for how we setup and cleanup env for tests.
Why are the changes needed?
To deflake the test.