[BEAM-115] Runner API representation of windowing strategies for Python#2190
[BEAM-115] Runner API representation of windowing strategies for Python#2190robertwb wants to merge 6 commits intoapache:masterfrom
Conversation
|
R: @vikkyrk |
|
Refer to this link for build results (access rights to CI server needed): |
|
Is the plan to check in the generated code? |
sdks/python/apache_beam/pipeline.py
Outdated
|
|
||
| def _unique_ref(self): | ||
| self._counter += 1 | ||
| return "ref_%s_%s" % (obj_type.__name__, self._counter) |
There was a problem hiding this comment.
obj_type is not available in this function, please use self._obj_type
There was a problem hiding this comment.
Done. I'm going to add tests for this.
sdks/python/apache_beam/pipeline.py
Outdated
|
|
||
| def to_runner_api(self): | ||
| context_proto = beam_runner_api_pb2.Components() | ||
| for name, cls in self.__COMPONENT_TYEPS: |
There was a problem hiding this comment.
typo in self.__COMPONENT_TYEPS
There was a problem hiding this comment.
Thanks for addressing my comments!
There is still a typo here: *TYEPS vs. *TYPES
There was a problem hiding this comment.
Oh. Fixed. I'll add some tests to make sure this actually works an push again. (Oh, for a statically typechecked Python...)
| return self._is_default | ||
|
|
||
| def to_runner_api(self, context): | ||
| raise NotImplementedError |
There was a problem hiding this comment.
This probably not intentional as you are returning a WindowingStrategy the exception is raised
sdks/python/apache_beam/pipeline.py
Outdated
| context_proto = beam_runner_api_pb2.Components() | ||
| for name, cls in self.__COMPONENT_TYEPS: | ||
| getattr(self, name).populate_map(getattr(context_proto, name)) | ||
| return components |
There was a problem hiding this comment.
Seems like components is undefined at this point
|
|
||
| @staticmethod | ||
| def from_runner_api(self, proto, context): | ||
| return Windowing( |
There was a problem hiding this comment.
Windowing class has different named parameters: windowfn and triggerfn is used there (without underscore).
| def from_runner_api(self, proto, context): | ||
| return Windowing( | ||
| window_fn=WindowFn.from_runner_api(proto.window_fn), | ||
| trigger_fn=TriggerFn.from_runner_api(proto.trigger), |
There was a problem hiding this comment.
TriggerFn was not imported in this file
There was a problem hiding this comment.
Yes, there was a circular import. Fixed.
| allowed_lateness=0) | ||
|
|
||
| @staticmethod | ||
| def from_runner_api(self, proto, context): |
There was a problem hiding this comment.
A staticmethod shall not need 'self' parameter
| return beam_runner_api_pb2.Trigger( | ||
| after_each=beam_runner_api_pb2.Trigger.AfterEach( | ||
| subtriggers=[ | ||
| subtrigger.to_runner_api(context) for subtrigger in triggers])) |
There was a problem hiding this comment.
typo: please replace triggers by self.triggers
There was a problem hiding this comment.
Done and test added.
| return beam_runner_api_pb2.Trigger( | ||
| or_finally=beam_runner_api_pb2.Trigger.OrFinally( | ||
| main=self.triggers[0].to_runner_api(context), | ||
| **{'finally': self.triggers[1].to_runner_api(context)})) |
There was a problem hiding this comment.
Maybe it's just me, but I needed some time to figure out what it does.
Wouldn't be simpler just passing finally=self.triggers[1].to_runner_api(context)?
There was a problem hiding this comment.
Yeah, if only finally wasn't a keyword in Python, so I can't put it literally.
|
Thanks for taking a look, @tibkiss. I though I had added tests for Windowing, but clearly not. |
|
@kennknowles As for checking in generated code, I poked around a bit, and asked around, and that's what we're currently doing for the other protos and apitools generated code. It'd be nice to avoid a protoc/mvn dependency for contributors, but I figured this was better than generating the json manually. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
f077d5e to
36269d6
Compare
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
R: @kennknowles |
|
@robertwb with regard to checking it in, I actually don't have much of an objection. It seems like a potentially annoying problem, and this unblocks other work. Perhaps we'll get to a point where it is autogenerated later, and it won't be hard to adjust other things. I'll take a little bit to get to this review in more depth, though. I trust the LGTM of someone involved in the Python SDK more deeply. |
| SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" | ||
| SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" | ||
|
|
||
| PICKLED_CODER = "dataflow:coder:pickled_python:v0.1" |
| OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW | ||
| OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE | ||
| OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE | ||
| OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED' |
There was a problem hiding this comment.
This will fail when converting to_runner_api as it is not defined in the proto, maybe comment it out with a TODO?
| return pickler.loads(fn_parameter.value) | ||
|
|
||
| def to_runner_api_parameter(self, context): | ||
| raise TypeError(self) |
| trigger=self.triggerfn.to_runner_api(context), | ||
| accumulation_mode=self.accumulation_mode, | ||
| output_time=self.output_time_fn, | ||
| closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, |
There was a problem hiding this comment.
Add a TODO to support EMIT_IF_NONEMPTY?
sdks/python/apache_beam/pipeline.py
Outdated
| visitor.visit_value(v, self) | ||
|
|
||
|
|
||
| class PipelineContextMap(object): |
There was a problem hiding this comment.
Tests for PipelineContextMap and PipelineContext would be great.
sdks/python/apache_beam/pipeline.py
Outdated
| return self._id_to_obj[id] | ||
|
|
||
|
|
||
| class PipelineContext(object): |
There was a problem hiding this comment.
Is this a user facing class or should it it be in runners module.
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.96 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-09T18:10:22.481 [ERROR] 2017-03-09T18:10:22.481 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-09T18:10:22.481 [ERROR] 2017-03-09T18:10:22.481 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-09T18:10:22.481 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-09T18:10:22.481 [ERROR] 2017-03-09T18:10:22.481 [ERROR] After correcting the problems, you can resume the build with the command2017-03-09T18:10:22.481 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of d2f6d5b to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8268/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.69 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-09T20:03:54.653 [ERROR] 2017-03-09T20:03:54.653 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-09T20:03:54.653 [ERROR] 2017-03-09T20:03:54.653 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-09T20:03:54.653 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-09T20:03:54.654 [ERROR] 2017-03-09T20:03:54.654 [ERROR] After correcting the problems, you can resume the build with the command2017-03-09T20:03:54.654 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 29cfcbe to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8272/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
|
Refer to this link for build results (access rights to CI server needed): |
|
LGTM. |
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.96 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-10T00:36:53.061 [ERROR] 2017-03-10T00:36:53.061 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-10T00:36:53.061 [ERROR] 2017-03-10T00:36:53.061 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-10T00:36:53.061 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-10T00:36:53.062 [ERROR] 2017-03-10T00:36:53.062 [ERROR] After correcting the problems, you can resume the build with the command2017-03-10T00:36:53.062 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 3bc8d8d to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8289/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.96 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-10T01:50:03.593 [ERROR] 2017-03-10T01:50:03.593 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-10T01:50:03.593 [ERROR] 2017-03-10T01:50:03.593 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-10T01:50:03.593 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-10T01:50:03.593 [ERROR] 2017-03-10T01:50:03.594 [ERROR] After correcting the problems, you can resume the build with the command2017-03-10T01:50:03.594 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 0cc50dc to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8293/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
|
Changes Unknown when pulling 86e1754 on robertwb:py-runner-api into ** on apache:master**. |
|
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.