-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22052][python] Add FLIP-142 public classes to python API #15441
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 43c1bd1 (Thu Sep 23 17:28:42 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sjwiesman Thanks for the PR.
file_state_size_threshold, | ||
write_buffer_size) | ||
|
||
self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_j_filesystem_checkpoint_storage could be removed
j_predefined_options = self._j_embedded_rocks_db_state_backend.getPredefinedOptions() | ||
return PredefinedOptions._from_j_predefined_options(j_predefined_options) | ||
|
||
def set_options(self, options_factory_class_name: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's not enough to only pass the class name of the options_factory. For example, for DefaultConfigurableOptionsFactory, there are many methods in it to configure it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is fine (it's actually copied directly from RocksDBStateBackend). This is just loading a factory class that is defined in Java. We could think about making the factory definable in Python but that is a separate issue.
24f4019
to
ca16802
Compare
@dianfu thank you for the ver fast review. I've applied all your suggestions. Please take another look and let me know if you have any other concerns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sjwiesman Thanks a lot for the update. LGTM overall with just one minor comment. Feel free to merge it after addressing this issue.
:param directory The savepoint directory | ||
:return: This object. | ||
""" | ||
return self._j_stream_execution_environment.setDefaultSavepointDirectory(directory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return self
ca16802
to
5f2a1ad
Compare
5f2a1ad
to
5f75c7d
Compare
@dianfu there are some relevant test failures that seem to be related. I am having trouble debugging and was hoping you could take another look. |
@sjwiesman Sure. I'll take a look asap! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sjwiesman The test failures should be caused by the following issues.
raise TypeError("%s is not an instance of CheckpointStorage." % j_checkpoint_storage) | ||
|
||
if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz): | ||
return JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz) | |
return JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage= j_checkpoint_storage) |
if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz): | ||
return JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz) | ||
elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz): | ||
return FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz) | |
return FileSystemCheckpointStorage(j_filesystem_checkpoint_storage= j_checkpoint_storage) |
|
||
def test_create_jobmanager_checkpoint_storage(self): | ||
|
||
self.assertItNotNone(JobManagerCheckpointStorage()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.assertItNotNone(JobManagerCheckpointStorage()) | |
self.assertIsNotNone(JobManagerCheckpointStorage()) |
checkpoint_path = JPath(checkpoint_path) | ||
if max_state_size is None: | ||
max_state_size = JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE | ||
j_jobmanager_checkpoint_storage = JobManagerCheckpointStorage(checkpoint_path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
j_jobmanager_checkpoint_storage = JobManagerCheckpointStorage(checkpoint_path, | |
j_jobmanager_checkpoint_storage = JJobManagerCheckpointStorage(checkpoint_path, |
checkpoint_path = JPath(checkpoint_path) | ||
|
||
if file_state_size_threshold is None: | ||
file_state_size_threshold = FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we should set it as -1 if not specified.
|
||
:return: The number of threads used to transfer files while snapshotting/restoring. | ||
""" | ||
return self._j_state_backend.getNumberOfTransferingThreads() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self._j_state_backend.getNumberOfTransferingThreads() | |
return self._j_state_backend.getNumberOfTransferThreads() |
8b19876
to
a4daf4b
Compare
@sjwiesman What's the status of this PR? |
a4daf4b
to
567ae87
Compare
@dianfu I'm very sorry, this fell off my radar. I've just rebased the branch, if CI is good we can go ahead and merge. |
567ae87
to
3333be5
Compare
3333be5
to
43c1bd1
Compare
What is the purpose of the change
This adds the new public APIs from FLIP-142 to the Python DataStream API. It is based on top of #15429 so only the last 3 commits are relevant.
Brief change log
96aa09c Add new state backend classes and sync updated JavaDoc
7193a21 Add new checkpoint storage classes
57c27b5 Add new methods to StreamExecutionEnvironment
Verifying this change
Added new Python Unit Tests. Note to the reviewer, I had some issues running the python tests locally. I will monitor the CI to ensure everything passes.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation