-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32345][state] Improve parallel download of RocksDB incremental state. #22788
[FLINK-32345][state] Improve parallel download of RocksDB incremental state. #22788
Conversation
@flinkbot run azure |
7553cf6
to
a10fcd4
Compare
@flinkbot run azure |
This commit improves RocksDBStateDownloader to support parallelized state download across multiple state types and across multiple state handles. This can improve our download times for scale-in.
a10fcd4
to
e697f77
Compare
@flinkbot run azure |
...ava/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
Outdated
Show resolved
Hide resolved
StateHandleDownloadSpec downloadRequest = | ||
new StateHandleDownloadSpec( | ||
(IncrementalRemoteKeyedStateHandle) stateHandle, | ||
absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, @zoltar9264 is changing how the local path is derived in #22669.
...ava/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
Outdated
Show resolved
Hide resolved
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
entry -> { | ||
StateHandleID stateHandleID = entry.getKey(); | ||
StreamStateHandle remoteFileHandle = | ||
entry.getValue(); | ||
Path downloadDest = | ||
downloadRequest | ||
.getDownloadDestination() | ||
.resolve( | ||
stateHandleID | ||
.toString()); | ||
return ThrowingRunnable.unchecked( | ||
() -> | ||
downloadDataForStateHandle( | ||
downloadDest, | ||
remoteFileHandle, | ||
closeableRegistry)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extracting a method createDownloadRunnables(k, v, dst)
?
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
Outdated
Show resolved
Hide resolved
...d-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
Outdated
Show resolved
Hide resolved
@rkhachatryan I addressed all remaining comments, please take another look to approve the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @StefanRRichter ,
LGTM!
What is the purpose of the change
This commit improves RocksDBStateDownloader to support parallelized state download across multiple state types and across multiple state handles. This can improve our download times for scale-in.
Brief change log
StateHandleDownloadSpec
to combine incremental remote handles with their download paths.RocksDBStateDownloader
to accept a list ofStateHandleDownloadSpec
that can combine files from multiple handles and sub-states (shared, private).RocksDBIncrementalRestoreOperation
to first assemble a complete list ofStateHandleDownloadSpec
before invoking the download process.RocksDBStateDownloaderTest
.Verifying this change
This change is already covered by existing tests, such as
RocksDBStateDownloaderTest
. I added more tests there.EmbeddedRocksDBStateBackendTest
EmbeddedRocksDBStateBackendMigrationTest
RescalingBenchmarkTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation