New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-802] Python templates #2545
Conversation
R: @sb2nov |
Run Python PostCommit |
Refer to this link for build results (access rights to CI server needed): |
Run Python PostCommit |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 2.32 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-04-14T23:18:59.203 [ERROR] 2017-04-14T23:18:59.203 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-04-14T23:18:59.203 [ERROR] 2017-04-14T23:18:59.203 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-04-14T23:18:59.203 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-04-14T23:18:59.203 [ERROR] 2017-04-14T23:18:59.203 [ERROR] After correcting the problems, you can resume the build with the command2017-04-14T23:18:59.203 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 2b06490 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/9558/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is fine to merge this for now, I'm going to spend sometime on making this runner agnostic next.
@@ -104,6 +106,10 @@ def to_json_value(obj, with_type=False): | |||
raise TypeError('Can not encode {} as a 64-bit integer'.format(obj)) | |||
elif isinstance(obj, float): | |||
return extra_types.JsonValue(double_value=obj) | |||
elif isinstance(obj, ValueProvider): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update json_values_tests to have this case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
if (not (isinstance(file_pattern, basestring) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isinstance(file_pattern, (basestring, ValueProvider))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -220,22 +235,33 @@ def close(self, file_handle): | |||
if file_handle is not None: | |||
file_handle.close() | |||
|
|||
@check_accessible(['file_path_prefix', 'file_name_suffix']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sammcveety random question:
I wanted to learn more about the accessibility check in the value providers:
Pros:
- Check is done before any user code is executed so we can minimize any side effects.
Cons:
- More change to existing user code and the end user needs to make sure they are using the correct strings in the decorator.
- We could just throw a RunTimeException in the get as that minimizes the concepts that the user needs to be aware of when writing the pipeline.
The user can already just ignore the check if they want so I guess it is already sort of optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java SDK does not have a concept similar to check_accesible
decorators. The common API across both languages are is_accesible
and get
, where the second one will throw a RuntimeException
if the underlying value is not accesible (i.e. called at pipeline building time).
Using check_accessible
is optional. A PTransform author can build a templatable transform without using it. Another pro is, it improves the readability by clearly marking functions that depend on run time values of some value.
@@ -93,7 +95,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): | |||
def test_file_sink_writing(self): | |||
temp_path = os.path.join(self._new_tempdir(), 'filesink') | |||
sink = MyFileSink( | |||
temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) | |||
temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need this foo -> output change in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think output is more descriptive, that is why I made the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(kept this as is in the original PR)
@@ -88,6 +89,11 @@ def run(self, pipeline): | |||
evaluation_context) | |||
# Start the executor. This is a non-blocking call, it will start the | |||
# execution in background threads and return. | |||
|
|||
if pipeline.options: | |||
# DirectRunner does not support RuntimeValueProviders. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a BEAM issue to try to address this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this comment needs to be changed. If options are passed in the command line, they will be of type StaticValueProvider
and DirectRunner
can use them. If they were not passed at the command line (or in another way in PipelineOptions
) prior to calling run
, there will be no opportunity for the user to tell what the run time values should be. Given that, I do not think DirectRunner
can support RuntimeValueProvider
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the if here? If None, set_runtime_options will handle it.
sdks/python/apache_beam/io/fileio.py
Outdated
if not isinstance(file_name_suffix, basestring): | ||
raise TypeError('file_name_suffix must be a string; got %r instead' % | ||
file_name_suffix) | ||
if not (isinstance(file_path_prefix, basestring) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge these isinstances
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -233,6 +233,7 @@ def __init__(self, packages, options, environment_version): | |||
options_dict = {k: v | |||
for k, v in sdk_pipeline_options.iteritems() | |||
if v is not None} | |||
options_dict['_options_id'] = 0 # TODO(altay): Remove. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BEAM issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
retest this please |
Run Python PostCommit |
@@ -119,21 +189,22 @@ def get_all_options(self, drop_default=False): | |||
|
|||
# TODO(BEAM-1319): PipelineOption sub-classes in the main session might be | |||
# repeated. Pick last unique instance of each subclass to avoid conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-opened.
PTAL, updated for new lint errors. |
Run Python PostCommit |
LGTM as this was already merged Thanks for the help. |
Re-reverts the python templates. First two commits are clean reverts of the original commits.
The last commit: