[BEAM-802] Add ValueProvider class for FileBasedSource I/O Transforms#1945
[BEAM-802] Add ValueProvider class for FileBasedSource I/O Transforms#1945mariapython wants to merge 1 commit intoapache:masterfrom
Conversation
mariapython
commented
Feb 8, 2017
- Add ValueProvider class.
- Derive StaticValueProvider and RuntimeValueProvider from ValueProvider.
- Derive ValueProviderArgumentParser from argparse.ArgumentParser as API for the template user.
- Modify FileBasedSource I/O transforms to accept objects of type ValueProvider.
- Modify display_data.
- Handle serialization / deserialization.
|
Refer to this link for build results (access rights to CI server needed): |
|
R: @robertwb |
f0dab8f to
aaa6963
Compare
|
Refer to this link for build results (access rights to CI server needed): |
aaa6963 to
9560065
Compare
|
Refer to this link for build results (access rights to CI server needed): |
aaltay
left a comment
There was a problem hiding this comment.
Thank you @mariapython. I added my comments, let's start iterating on those.
| elif isinstance(obj, ValueProvider): | ||
| if obj.is_accessible(): | ||
| return to_json_value(obj.get()) | ||
| return extra_types.JsonValue(is_null=True) |
There was a problem hiding this comment.
Is this the correct serialization for runtime value providers? (pipeline options will have a "key": None serialization)
There was a problem hiding this comment.
| from apache_beam.transforms.display import DisplayDataItem | ||
| from apache_beam.utils.value_provider import ValueProvider | ||
| from apache_beam.utils.value_provider import StaticValueProvider | ||
| from apache_beam.utils.value_provider import RuntimeValueProvider |
There was a problem hiding this comment.
Does RuntimeValueProvider need to be exposed as a public interface. ValueProvider and StaticValueProvider could be the only things that is public outside the value_provider module and maybe a few places.
There was a problem hiding this comment.
Oops, that was a slip--fixed.
| # We can't split compressed files efficiently so turn off splitting. | ||
| self._splittable = False | ||
| if validate: | ||
| if validate and not isinstance(file_pattern, RuntimeValueProvider): |
There was a problem hiding this comment.
Could this be if validate and not file_pattern.is_accessible()? Is there a reason to check its type?
|
|
||
| def _get_concat_source(self): | ||
| if self._concat_source is None: | ||
| if not self._pattern.is_accessible(): |
There was a problem hiding this comment.
Should we add a get_or_error method to ValueProvider?. It is a common pattern to check is_accessible and raising RuntimeError.
| if not self.file_path_prefix.is_accessible(): | ||
| raise RuntimeError('%s not accessible' % self.file_path_prefix) | ||
| file_path_prefix = self.file_path_prefix.get() | ||
| file_name_suffix = self.file_name_suffix.get() |
There was a problem hiding this comment.
Do you need to check if this is_accessible?
There was a problem hiding this comment.
Forgot to pop a stash--done
| ) | ||
| value = ( | ||
| pipeline_option.get() | ||
| if isinstance(pipeline_option, StaticValueProvider) |
There was a problem hiding this comment.
why not use is_accessible instead of type checking?
There was a problem hiding this comment.
| pipeline_options_dict = RuntimeValueProvider.pipeline_options_dict | ||
| if pipeline_options_dict is None: | ||
| raise RuntimeError('%s.get() not called from a runtime context' %self) | ||
| pipeline_option = ( |
There was a problem hiding this comment.
What is the reason here for recreating an options instance? Can we read the value out of pipeline_options_dict?
There was a problem hiding this comment.
This was modeled after Java's.
|
|
||
| @classmethod | ||
| def set_runtime_options(cls, pipeline_options): | ||
| RuntimeValueProvider.pipeline_options_dict = pipeline_options |
There was a problem hiding this comment.
maybe assert that RuntimeValueProvider.pipeline_options_dict is None
| type=int, | ||
| help='This flag is a value provider') | ||
|
|
||
| # options = UserOptions(['--vp_arg', 'hola']) |
| assert options.vp_arg3.is_accessible() is True | ||
| assert options.vp_arg3.get() == '123' | ||
| assert options.vp_arg4.is_accessible() is True | ||
| assert options.vp_arg4.get() is None |
There was a problem hiding this comment.
Should not this fail? It is of RuntimeValueProvider and has no default.
There was a problem hiding this comment.
RuntimeValueProvider objects with no default and no value injected don't fail.
In Java: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java#L191
9560065 to
9aff2a5
Compare
|
Refer to this link for build results (access rights to CI server needed): |
9aff2a5 to
97a3b53
Compare
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.55 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-01T07:08:47.327 [ERROR] 2017-03-01T07:08:47.327 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-01T07:08:47.327 [ERROR] 2017-03-01T07:08:47.327 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-01T07:08:47.327 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-01T07:08:47.328 [ERROR] 2017-03-01T07:08:47.328 [ERROR] After correcting the problems, you can resume the build with the command2017-03-01T07:08:47.328 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 97a3b53 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8003/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
97a3b53 to
2916651
Compare
|
Refer to this link for build results (access rights to CI server needed): |
2916651 to
9bdd605
Compare
|
Refer to this link for build results (access rights to CI server needed): |
| """ | ||
| def add_value_provider_argument(self, *args, **kwargs): | ||
| # Extract the option name from positional argument ['pos_arg'] | ||
| if args[0][0] != '-': |
There was a problem hiding this comment.
There are two assumptions here. Could you assert those for a better error message.
- args has at least length of 1 and args[0] exists.
- and args[0] is indexable, like args[0] is a string.
There was a problem hiding this comment.
Both are enforced by argparse's add_argument. i.e. CLI arguments are strings and an argument always requires a name. If you try to add an argument without a name, you get:
TypeError: _get_positional_kwargs() takes exactly 2 arguments (1 given)
Since the call to add_argument happens later, I have placed one assert to check for args to exist and have at least one element of size one. There is more I could add, but I don't want to obscure the code by doing heavy type checking here, since add_argument will take care of it a few lines below.
| option_name = args[0] | ||
| if kwargs.get('nargs') is None: # make them optionally templated | ||
| kwargs['nargs'] = '?' | ||
| # or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg] |
There was a problem hiding this comment.
Move this comment inside the else block
There was a problem hiding this comment.
OK. The reason why I usually don't do it is because rigorously, that would make me have an extra comment line:
# Extract the option name
if args[0][0] != '-':
# from positional argument ['pos_arg']
option_name = args[0]
if kwargs.get('nargs') is None: # make them optionally templated
kwargs['nargs'] = '?'
else:
# or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
| ) | ||
|
|
||
|
|
||
| def check_accessible(fields): |
There was a problem hiding this comment.
Add a comment about what is this and how users could use it. It will have utility to developers building components.
There was a problem hiding this comment.
Done long ago, although forgot to respond, it seems.
| if pipeline_options_dict is None: | ||
| raise RuntimeError('%s.get() not called from a runtime context' %self) | ||
| pipeline_option = ( | ||
| self.pipeline_options_subclass.from_dictionary(pipeline_options_dict) |
There was a problem hiding this comment.
PipelineOptions.from_dictionary(pipeline_options_dict).view_as(pipeline_options_subclass).get(self.option_name)
There was a problem hiding this comment.
PipelineOptions.from_dictionary(pipeline_options_dict).view_as(self.pipeline_options_subclass) would be OK, but not .get(...)
9bdd605 to
1ff44d5
Compare
|
Refer to this link for build results (access rights to CI server needed): |
a6b5196 to
c5569ca
Compare
|
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 2
--none-- |
|
Refer to this link for build results (access rights to CI server needed): |
| dest='output', | ||
| required=True, | ||
| help='Output file to write results to.') | ||
| pipeline_options = UserOptions(argv) |
There was a problem hiding this comment.
Our canonical form is pipeline_options = PipelineOptions(argv). Could you convert to that form? It should work exactly the same.
There was a problem hiding this comment.
If you mean this:
pipeline_options = PipelineOptions(argv).view_as(UserOptions)
Yes, it works, but I found it less readable.
There was a problem hiding this comment.
It is possible for a user code to define multiple options. We want them to use:
pipeline_options = PipelineOptions(argv)
user_options = pipeline_options.view_as(UserOption)
This is the standard way for the existing examples and documentation.
There was a problem hiding this comment.
OK, I can then use those two lines, that is no problem
| required=True, | ||
| help='Output file to write results to.') | ||
| known_args, pipeline_args = parser.parse_known_args(argv) | ||
| class UserOptions(PipelineOptions): |
There was a problem hiding this comment.
We used to have a problem with defining options here (See: #1889 (comment))
Other tests started to fail because newly defined options stay in scope affected other tests. Is it resolved? Or this is not happening any more because the options are defaulting to ValueProviders?
There was a problem hiding this comment.
I haven't seen that happening, as long as there is no call to get_all_options()
| for vp in [getattr(self, field) for field in fields]: | ||
| if not vp.is_accessible(): | ||
| raise RuntimeError('%s not accessible' % vp) | ||
| for obj in [getattr(self, vp) for vp in value_provider_list]: |
There was a problem hiding this comment.
Could you use something other than obj. This is representing an option not just an object.
| self.assertEqual(options.vp_pos_arg.get(), 'abc') | ||
|
|
||
| def test_runtime_value_provider_positional_argument(self): | ||
| class UserOptions(PipelineOptions): |
There was a problem hiding this comment.
Change UserOptions to a more unique name, they potentially cause name conflicts with other tests.
| self.assertFalse(options.vp_arg3.is_accessible()) | ||
| self.assertFalse(options.vp_arg4.is_accessible()) | ||
| self.assertFalse(options.vp_pos_arg.is_accessible()) | ||
| # self.assertFalse(options.vp_pos_arg.is_accessible()) |
There was a problem hiding this comment.
Remove commented out code. And why this is accessible when other vps are not?
There was a problem hiding this comment.
Done. It is accessible because a positional argument value has been given as option: '1.2'
| required=True, | ||
| help='Output file to write results to.') | ||
| known_args, pipeline_args = parser.parse_known_args(argv) | ||
| class UserOptions(PipelineOptions): |
There was a problem hiding this comment.
I wanted to push all the changes at once, and I am working on one last commit.
|
|
||
| # provide values at graph-construction time | ||
| # (options not provided here become of the type RuntimeValueProvider) | ||
| options = UserOptions1(['1.2']) |
There was a problem hiding this comment.
I may have asked this before. If that is the case point me to the answer please.
Does it make sense to wrap things as value providers when template_location flag is not set. If the flag is not set we know that RVP values will be missing because it will not be run as a template.
There was a problem hiding this comment.
No, you haven't.
Sure, I will add a warning for the case where there are RuntimeValueProvider arguments and template_location is missing.
There was a problem hiding this comment.
Should it be an error or a warning? What is the behavior in Java SDK?
There was a problem hiding this comment.
I am not finding anything in Java to enforce the template_location, so I am not adding this at this point, in order to be able to merge this PR. Should I open a JIRA issue for this to be done_ If so, I guess two JIRA issues, one for python and one for java?
There was a problem hiding this comment.
Sounds good. Please reference the JIRA issue here.
c5569ca to
6351e5a
Compare
|
Refer to this link for build results (access rights to CI server needed): |
6351e5a to
5a7e5ff
Compare
|
Changes Unknown when pulling 5a7e5ff on mariapython:ppp_inmaster into ** on apache:master**. |
|
Refer to this link for build results (access rights to CI server needed): |
5a7e5ff to
761b38a
Compare
|
Refer to this link for build results (access rights to CI server needed): |
761b38a to
dd37fb6
Compare
Incorporate a BeamArgumentParser (argparse.ArgumentParser + ValueProviders). Add StaticValueProvider and RuntimeValueProvider derived from ValueProvider. Add serialization for ValueProvider objects. Add testing for ValueProvider objects. Modify FileBasedSource and FileSink to accept ValueProvider objects.
dd37fb6 to
0f9b1d1
Compare
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 1--none-- |
|
retest this please |
|
Refer to this link for build results (access rights to CI server needed): |
|
LGTM. Thank you @mariapython |
|
Run Python PostCommit |