-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54590][SS] Support Checkpoint V2 for State Rewriter and Repartitioning #53720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
JIRA Issue Information=== New Feature SPARK-54590 === This comment was automatically generated by GitHub Actions |
80c03a2 to
1199413
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
1199413 to
4879f3a
Compare
4879f3a to
672daff
Compare
672daff to
327039f
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala
Outdated
Show resolved
Hide resolved
546e49c to
72efef7
Compare
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionSuite.scala
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
3253637 to
f2ac9e5
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
545968e to
9e6b29a
Compare
micheal-o
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am stamping this PR so we can move forward, but please lets correctly address the review comments. Thanks
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala
Outdated
Show resolved
Hide resolved
.../apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala
Outdated
Show resolved
Hide resolved
|
@zifeif2 Also fix the PR title to: |
34eba2e to
8e940a1
Compare
Fix logging format in StateRewriter.scala
ce6a893 to
6568663
Compare
| try { | ||
| if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty || | ||
| stateStoreCkptId.get != loadedStateStoreCkptId.get)) { | ||
| if (loadEmpty || loadedVersion != version || loadedStateStoreCkptId.isEmpty || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to support loadEmpty in loadWithCheckpointId in RocksDB is in repartition, we don't need to read previous data, that's why we need to add loadEmpty in RocksDB
I put loadEmpty in this if statement along with loadedVersion != version || loadedStateStoreCkptId.isEmpty ||... to reduce some duplicate code, but looks like it makes it harder to understand. I can refactor the code to make loadEmpty its separate block
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
| log"with uniqueId ${MDC(LogKeys.UUID, stateStoreCkptId)}") | ||
| if (loadEmpty) { | ||
| logInfo(log"Loaded empty store at version ${MDC(LogKeys.VERSION_NUM, version)} " + | ||
| log"with uniqueId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is unique Id not available here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, we don't expect caller function to pass in uniqueId when they are calling loadWithCheckpointId when loadEmpty = true, because we are not load any previous versions of data when loadEmpty=true. We also have a require check above.
require(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty when loadEmpty is true")
I can change it to a less confusing message
| partitionWriter.write(partitionIter) | ||
| } | ||
| Iterator(partitionWriter.write(partitionIter)) | ||
| }.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we calling collect here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we need to add collect() to make the rewrite actually happen and get a list of StateStoreCheckpointInfo
| // Since we cleared the local dir, we should also clear the local file mapping | ||
| rocksDBFileMapping.clear() | ||
| // Set empty metrics since we're not loading any files from DFS | ||
| loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this line to make sure that loadCheckpointMetrics is set correctly when we are loading empty store cc @anishshri-db .
RocksDB will run fileManagerMetrics = fileManager.latestLoadCheckpointMetrics, and latestLoadCheckpointMetrics return loadCheckpointMetrics
9777f6b to
72878bb
Compare
anishshri-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending green CI
What changes were proposed in this pull request?
Support checkpointV2 for repartition writer and StateRewriter by returning the checkpoint Id to caller function after write is done.
Changes include
Why are the changes needed?
This is required in PrPr for repartition project
Does this PR introduce any user-facing change?
No
How was this patch tested?
See added unit tests on moth operator with single state store and multiple state stores
Was this patch authored or co-authored using generative AI tooling?
Yes. Sonnet 4.5