-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Interactive runner and the corresponding tests #5818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
run test |
|
This is very cool! you can test the cython failure by running |
|
Thanks, Pablo! I was exactly trying to figure out how to run test locally. |
robertwb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat stuff.
sdks/python/setup.py
Outdated
| 'crcmod>=1.7,<2.0', | ||
| 'dill==0.2.6', | ||
| 'fastavro==0.19.7', | ||
| 'graphviz==0.8.3', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't pin unless you need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
graphviz is imported in display_graph.py to construct the DOT representation of the pipeline. Don't we need to specify all required modules here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @robertwb is suggesting to remove the "==0.8.3" qualifier.
| """Given vertices, edges and their attributes, construct graphviz.Digraph. | ||
| Args: | ||
| vertex_dict: (dict(str->(dict(str->str))) dict mapping vertex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Use standard notation for typing information (https://www.python.org/dev/peps/pep-0484/). Similarly elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks very neat, but it seems this kind of annotation is not used anywhere in apache_beam codebase (searched "from __future__ import annotations")
Would you like me to use annotations from module typehints like @with_input_types @with_output_types or something like ~apache_beam.runners.runner.PipelineRunner instead?
| def is_top_level_transform(transform): | ||
| return transform.unique_name and '/' not in transform.unique_name | ||
|
|
||
| def leaf_transforms(parent_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this leaf_transform_ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| pcollection_stats=None): | ||
| """Constructor of PipelineGraph. | ||
| All fields except for pipeline_proto should be left as None unless used by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than taking interactive-specific parameters here, take a map of {pcollections,ptransforms} -> extra attributes. The interactive runner would then populate these according to its notion of cached, etc. but other runners could highlight different data.
| from apache_beam.runners.interactive import display_manager | ||
| from apache_beam.transforms import combiners | ||
|
|
||
| SAMPLE_SIZE = 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constant deserves a comment.
| self._cache_manager.cleanup() | ||
|
|
||
| def apply(self, transform, pvalueish): | ||
| # TODO(qinyeli): Remove runner interception of apply. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe phrase as "remove once runner interception of apply is gone" with a(n existing?) JIRA?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| # pylint: disable=import-error | ||
| from apache_beam.pipeline import Pipeline | ||
|
|
||
| # When possible, invoke a round trip through the runner API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still need (or desirable)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.. This is not desirable. Thanks!
| return self._pipeline_info.derivation(pcoll_id).cache_label() | ||
|
|
||
| def wait_until_finish(self): | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment as to why we can do this.
|
|
||
| def test_basic(self): | ||
| # TODO(qinyeli) remove explicitly overriding underlying runner | ||
| # once interactive_runner works with FnAPI mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what the issue is? This will make tests pass, but be broken for users.
|
Run Python PreCommit |
charlesccychen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This LGTM, after the remaining minor comments.
sdks/python/setup.py
Outdated
| 'oauth2client>=2.0.1,<5', | ||
| # grpcio 1.8.1 and above requires protobuf 3.5.0.post1. | ||
| 'protobuf>=3.5.0.post1,<4', | ||
| 'pydot>=1.2.0,<=1.2.4', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just do <1.3 for now.
|
|
||
|
|
||
| class Derivation(object): | ||
| """Helper for PipelineInfo.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a more descriptive comment?
| return str(self.json()) | ||
|
|
||
|
|
||
| class SafeFastPrimitivesCoder(coders.Coder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment for why this extra quote / unquote step is needed?
| vertex_dict[transform.unique_name] = {} | ||
|
|
||
| for pcoll_id in transform.outputs.values(): | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra newline.
|
Would you like me to squash the commits into one? @charlesccychen |
|
Run Python PostCommit |
|
Thanks @qinyeli! I will merge after tests pass. |
| self._desired_cache_labels = set() | ||
| print('Running...') | ||
|
|
||
| # When possible, invoke a round trip through the runner API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove.
This adds a new subproject that exports a new shadow jar that includes the job server and all transitive dependencies. It also includes a runShadow task that can be used for local debugging via gradle.
* Cythonizing a few more hot calls.
Adding combine URNs based on the portable combines doc: https://s.apache.org/beam-runner-api-combine-model I added a new URN to the doc, noticed it already existed as an old URN. For now I will simply be repurposing the URN so I only changed the comment. Also updated the comments for the other Combine URNs to make them more consistent with the other comments and describe what the expected payloads are for them.
Also changed the enum numbers for the existing elements in the Composites enum. Normally this would be bad for protos (reserving the removed enum value is preferable), but in this case the URN enums being changed are never actually used in any proto messages; They're only there for devs to use, so this won't affect anything.
Adding implementations for the components represented by the Combine URNs described in the portable combines doc: https://s.apache.org/beam-runner-api-combine-model This is just an initial implementation so it's lacking one major feature which is an optimization in the Precombine. Planning to add that right away, but want to get this in first.
This is necessary in order to use filtering based on touched files. In the ghprb plugin all trigger criteria is evaluated together, which means that include filters are also applied to phrase triggering. See: jenkinsci/ghprb-plugin#678
…ings FindBugs annotations are a mess. We want to eventually deprecate our own usage of FindBugs, but we still need to deal with the annotations because our dependencies (notable byte-buddy) use the annotations without providing the dependency. This causes compile warnings such as: byte-buddy-1.8.11.jar(/net/bytebuddy/description/type/TypeList.class): warning: Cannot find annotation method 'value()' in type 'SuppressFBWarnings': class file for edu.umd.cs.findbugs.annotations.SuppressFBWarnings not found To fix these warnings, we need to include a findbugs-annotations dependency during compile which includes the SuppressFBWarnings annotation. The dependency options we currently have are: * com.google.code.findbugs:annotations:3.0.1 * Includes @SuppressFBWarnings, but licensed under LGPL * com.github.stephenc.findbugs:findbugs-annotations:1.3.9-1 * Licensed under Apache 2.0, but doesn't include @SuppressFBWarnings It is ok to use LGPL components during the build, as long as they don't make it into the distribution. See: * https://www.apache.org/legal/resolved.html#prohibited * https://www.apache.org/legal/resolved.html#optional
Note that I updated the "TIMESTAMP" coder which was meant to support timers with an updated "TIMER" coder based upon https://s.apache.org/beam-portability-timers
This is a follow-up to PR #5757 which splits the pre-commit jobs and updates the naming. The GradleBuild suffix was dropped since we no longer run Maven variants.
This is a follow-up to PR #5757 which splits the pre-commit jobs and updates the naming. The GradleBuild suffix was dropped since we no longer run Maven variants. The *_Cron versions of the job run on a cron schedule and on pushes to master.
* improvements to fanout
The exclusions here are based on examining how other Apache projects add license headers for various file types. In particular, Apache Zepplin seemed to be a good example [1], and in other cases doing a search across Apache GitHub [2]. [1] https://github.com/apache/zeppelin/tree/master/docs [2] https://github.com/search?utf8=%E2%9C%93&q=%22See+the+License+for+the+specific+language+governing+permissions%22+user%3Aapache+extension%3Ayml&type=Code&ref=advsearch&l=&l=
Added Beam Python Interactive Runner, including
Design doc: s.apache.org/interactive-beam
YouTube demo: https://youtu.be/c5CjA1e3Cqw
R: @robertwb
CC: @ananvay
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.Post-Commit Tests Status (on master branch)