[SPARK-49363][SS][TESTS] Add unit tests for potential RocksDB state store SST file mismatch#47850
[SPARK-49363][SS][TESTS] Add unit tests for potential RocksDB state store SST file mismatch#47850siying wants to merge 7 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
should we add comments within the test to explain what we think should happen ?
There was a problem hiding this comment.
+1 Shall we describe the scenario and the expected behavior?
There was a problem hiding this comment.
Why do we choose 5 and 2 for this test ? Should we also print some additional logs that might help to know which current version we are processing ?
There was a problem hiding this comment.
Sure. I can comment on why 5 and 2, but the 2 is to have a balance between moving on to the next version and keep to the current one. 5 is to simulate skip checkpoints and make it more likely to delay to more than one snapshot scheduling, considering snapshot checkpointing is every 3 batches.
There was a problem hiding this comment.
from the test - this might not be clear. should we explain the issue in more detail around foreachbatch and not persisting the dataframe
There was a problem hiding this comment.
Any reason for choosing this specific seed ?
There was a problem hiding this comment.
No. I just randomly chose a number.
|
Let's clarify which ticket fixed the issue of this test - reviewer would want to validate the test with reverting the commit and seeing this test to fail. If the test does not fail, we can't merge this in. |
There was a problem hiding this comment.
+1 Shall we describe the scenario and the expected behavior?
There was a problem hiding this comment.
Does this test need to be randomized? If we know how it could fail for which reason, I'd rather want to see it be direct. (If we could construct e2e test which failed without the fix then would be ideal.) Randomized tests seem to be covered in above test.
|
@HeartSaVioR I removed one of the two randomized test and put two more common scenarios in the unit test (they are passing). |
There was a problem hiding this comment.
I don't get how this test is simulating the scenario described here. We are talking about the race condition, but this test does not trigger the race condition because doMaintenance() is synchronous.
Same with randomized test in below. Both tests do not fail with reverting SPARK-48931.
There was a problem hiding this comment.
Sorry the ticket I gave was the wrong one. I updated it and it should now work.
The commit cannot be easily reverted now, but I applied the task to the parent hash of the commit (b5a55e4) and confirmed that it would fail:
[info] - reloading the same version (with changelog checkpointing) *** FAILED *** (292 milliseconds)
[info] org.rocksdb.RocksDBException: Mismatch in unique ID on table file 8. Expected: {12521394303566436904,8218421418606057953} Actual: {12521394303566436907,7844327260652356763} The file /home/siying.dong/spark2/target/tmp/spark-3438851b-7e84-4630-a96d-22b595c58b1b/workingDir-a30c2e57-73c7-49fa-b96b-5b6d8b863383/MANIFEST-000005 may be corrupted.
[info] at org.rocksdb.RocksDB.open(Native Method)
[info] at org.rocksdb.RocksDB.open(RocksDB.java:325)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:901)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:194)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267(RocksDBSuite.scala:2052)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267$adapted(RocksDBSuite.scala:2020)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withDB(RocksDBSuite.scala:2411)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$266(RocksDBSuite.scala:2020)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info] at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(RocksDBSuite.scala:165)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:248)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:246)
[info] at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withSQLConf(RocksDBSuite.scala:165)
[info] at org.apache.spark.sql.execution.streaming.state.AlsoTestWithChangelogCheckpointingEnabled.$anonfun$testWithChangelogCheckpointingEnabled$1(RocksDBSuite.scala:107)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:334)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
and after applying the commit 40ad829, the test would pass.
I can explain why it fails in person. But it is nothing to do with data race (I hope I didn't say it in the commnts). It's related to the sequence of:
- create snapshot for version n
- reload another version n from cloud
- upload snapshot n (overwriting the existing one)
and this uploading is problematic.
There was a problem hiding this comment.
Yeah now I get the picture of it. Was confused due to incorrect JIRA ticket. Thanks for the detailed explanation!
There was a problem hiding this comment.
nit: The ticket info is not updated here
// The test was accidentally fixed by SPARK-48586 (https://github.com/apache/spark/pull/47130)
There was a problem hiding this comment.
Sorry to ask about more effort, but shall we leave "walkthrough" code comment for future readers? I think it'd be much easier to understand than understanding the scenario described in above, and try to think through by themselves. Let's ensure that the test is understandable for moderate people.
Please refer to the test suites for stateful operator which we track watermark value (and state rows for complicated case) - we put code comment per microbatch to walkthrough.
There was a problem hiding this comment.
nit: use camelCase for consistency
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
Add unit test to for RocksDB state store snapshot checkpointing for changelog. We intentionally add the same content in each batch, so that it is likely that SST files generated are all of the same size. We have some randomness on loading the existing version or move to the next, and whether maintenance task is executed. All three tests would fail for previous versions but not in master.
Why are the changes needed?
Recently we discovered some RocksDB state store file version ID mismatch issues. Although it happens to have been fixed by other change, we don't have test coverage for it. Add unit tests for them.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Run the tests
Was this patch authored or co-authored using generative AI tooling?
No