-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35785][SS] Cleanup support for RocksDB instance #32933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #139876 has finished for PR 32933 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the change for review in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean, if SST file F was last used in version V, then it won't be used in version V+2 or later?
In other words, a SST file F can be only used in continuous versions. It won't be used in V, not used in V+1, and then used again in V+2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: won't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. The file can not be shared with skipping versions. If a file used in V and not used in V+1, the checkpoint of V+1 should already create new files for all the KVs included in the original file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the second case for? When it will happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, it would occur with reattempt of same micro-batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxVersionPresent -> maxUsedVersion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks, done in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version -> versionFile.
Actually, maybe s"Error deleting version file $versionFile for version $version".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to reduce the files failed to delete? It is possible some files are failed to delete, but seems we ignore such failure and continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah may need to count successful ones and failed ones separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, done in the next commit.
|
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If deleteOldVersions is the only change needed for review, then it looks okay. Just a few minor comments.
|
Kubernetes integration test status failure |
|
Test build #140171 has finished for PR 32933 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail to pass scalastyle now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round. Need to look into test cases a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: won't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, it would occur with reattempt of same micro-batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space before }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah may need to count successful ones and failed ones separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is localTempDir used only here? Just to make sure deleting the directory won't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the localTempDir is only used for unzip files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting - we don't remove any keys but expect some SST files to be invalid. Would compaction chime in and compact several SSTs into bigger one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. We'll do the RocksDB checkpoint for each commit operation, each checkpoint is a full snapshot and includes all data. In this UT we have 50 versions but only retain 10 versions, so the SST files for deleted versions(1 to 40) will be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double empty lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in the next commit.
f8f9d20 to
a45af9c
Compare
|
Test build #140373 has finished for PR 32933 at commit
|
321c8b0 to
56439b9
Compare
|
Test build #140438 has finished for PR 32933 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Rebased and addressed all the comments. Thanks for your heads-up! cc @viirya @HeartSaVioR |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Thanks again for the help of @HeartSaVioR and @viirya. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Test build #140511 has finished for PR 32933 at commit
|
|
Test build #140498 has finished for PR 32933 at commit
|
|
Kubernetes integration test status success |
|
retest this, please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #140521 has finished for PR 32933 at commit
|
| logInfo(s"Rolled back to $loadedVersion") | ||
| } | ||
|
|
||
| def cleanup(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will we call this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be called in the RocksDBStateStoreProvider.doMaintenace. I'll submit the state store provider PR (the last one) today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
| } | ||
| } | ||
|
|
||
| test("disallow concurrent updates to the same RocksDB instance") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test seems not related to clean up change here? Looks like more related to RocksDB instance PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yea, this is the test for rollback.
Actually the original plan is expose rollback and cleanup in this PR. It should be a mistake for the last PR, I introduced the rollback without tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
| override def run(): Unit = { | ||
| try { | ||
| for (version <- 0 to numUpdatesInEachThread) { | ||
| withDB( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, what this test is used for? Each RocksDB in each thread uses the same remote root dir, won't they conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to simulate the multi-thread scenario of updating and cleaning old versions. It will not conflict since we call commit for each update thread and the version get updated for each commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, will it happens? I think RocksDB is not thread-safe, and each state task only has one RocksDB instance. They should update and clean old versions individually as they are for different state store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks to be more likely simulating the case multiple streaming queries with same checkpoint run concurrently.
SST files shouldn't conflict as we make the file name be unique, and for metadata files we use overwriteIfPossible = true, so won't throw error if the file already exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think the purpose of this test is to make sure no error thrown and the result is correct in the end.
After taking a further look, there's a small issue is that exception never used. I'll confirm it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking and me discussed this test offline. Seems there is something wrong with exception usage. It doesn't look completely correct. @xuanyuanking will address it by fixing it or deleting the test later in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Thanks @xuanyuanking for working on this and @HeartSaVioR for the review! Merging to master. |
|
Great thanks for the help! @viirya @HeartSaVioR |
|
Ah, sorry, I forgot branch-3.2 was cut and this should be in branch-3.2 too. @xuanyuanking Can you submit a PR for 3.2? |
|
Sure. Let me do it now. |
### What changes were proposed in this pull request? Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager. ### Why are the changes needed? Part of the implementation of RocksDB state store. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes apache#32933 from xuanyuanking/SPARK-35785. Authored-by: Yuanjian Li <yuanjian.li@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
|
Hi, All. |
|
It seems that https://issues.apache.org/jira/browse/SPARK-35993 is filed already by @attilapiros |
|
Thanks @dongjoon-hyun! Let me ignore the test first to unblock others. @xuanyuanking will address (fix or delete) the test later. |
### What changes were proposed in this pull request? This patch ignores the test "ensure that concurrent update and cleanup consistent versions" in #32933. The test is currently flaky and we will address it later. ### Why are the changes needed? Unblock other developments. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #33195 from viirya/ignore-rocksdb-test. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This patch ignores the test "ensure that concurrent update and cleanup consistent versions" in #32933. The test is currently flaky and we will address it later. ### Why are the changes needed? Unblock other developments. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #33195 from viirya/ignore-rocksdb-test. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit a6e00ee) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager.
Why are the changes needed?
Part of the implementation of RocksDB state store.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.