[BEAM-8335] Make TestStream to/from runner_api include the output_tags property.#10892
Conversation
677e777 to
c5fb23e
Compare
c5fb23e to
adbc861
Compare
|
R: @davidyan74 |
There was a problem hiding this comment.
Is there a reason why the string 'None' is used as the tag here (instead of just None)? Is there a way to distinguish between a non-existent tag and a tag named "None"?
There was a problem hiding this comment.
Looking through the codebase, it seems that 'None' is the special keyword used in the Python SDK to represent a tag that is specifically None. This keeps with the current style of the rest of the Python SDK. @lukecwik is this true?
There was a problem hiding this comment.
This is a general problem with python tag naming where there is ambiguity. It was worked around for Dataflow by using out for None and out_<tag> for tag until it was removed in #10971
Unfortunately this name mangling was applied inconsistently throughout the codebase which lead to arbitrary fix-ups and bugs.
lukecwik
left a comment
There was a problem hiding this comment.
Looks pretty good, some questions about TestStream constructor contract.
There was a problem hiding this comment.
Why do we need to declare output_tags here?
Are you trying to allow for outputs that have no events, otherwise shouldn't the tags come from the list of events?
The answer here impacts what we should be doing in expand in the no output_tags case in expand below.
There was a problem hiding this comment.
Are you trying to allow for outputs that have no events, otherwise shouldn't the tags come from the list of events?
Yep! The TestStreamService will allow users to define a TestStream with the output_tags specified at creation time and the events supplied at runtime.
|
retest this please |
rohdesamuel
left a comment
There was a problem hiding this comment.
Thanks for the quick review!
There was a problem hiding this comment.
Looking through the codebase, it seems that 'None' is the special keyword used in the Python SDK to represent a tag that is specifically None. This keeps with the current style of the rest of the Python SDK. @lukecwik is this true?
There was a problem hiding this comment.
Are you trying to allow for outputs that have no events, otherwise shouldn't the tags come from the list of events?
Yep! The TestStreamService will allow users to define a TestStream with the output_tags specified at creation time and the events supplied at runtime.
adbc861 to
4dbedcb
Compare
* This also moves the DirectRunner's TestStream implementation to a replacement transform. This is because the TestStream relies on getting the output_tags from the PTransform. Change-Id: Ibd80b0d25cd8cc5ff5c28e127f7313638e6664da
4dbedcb to
437c88f
Compare
|
retest this please EDIT: dang, doesn't work yet |
|
retest this please |
|
|
||
| def expand(self, pbegin): | ||
| """Expands the TestStream into the DirectRunner implementation. | ||
|
|
There was a problem hiding this comment.
This is a general problem with python tag naming where there is ambiguity. It was worked around for Dataflow by using out for None and out_<tag> for tag until it was removed in #10971
Unfortunately this name mangling was applied inconsistently throughout the codebase which lead to arbitrary fix-ups and bugs.
| def to_runner_api(self, unused_element_coder): | ||
| tag = 'None' if self.tag is None else self.tag | ||
|
|
||
| # Assert that no prevision is lost. |
There was a problem hiding this comment.
| # Assert that no prevision is lost. | |
| # Assert that no precision is lost. |
|
|
||
| # Assert that no prevision is lost. | ||
| assert 1000 * ( | ||
| self.new_watermark.micros // 1000) == self.new_watermark.micros |
There was a problem hiding this comment.
nit: This would be clearer by checking % has no remainder
| output. | ||
| """ | ||
| def __init__(self, coder=coders.FastPrimitivesCoder(), events=None): | ||
| def __init__( |
There was a problem hiding this comment.
Please add pydoc comments mentioning the few important pieces:
- specifying the output_tags allows for adding outputs that produce no events
- output_tags must be a superset of tags found in events if events is specified
| @@ -171,13 +184,20 @@ class TestStream(PTransform): | |||
| time. After all of the specified elements are emitted, ceases to produce | |||
There was a problem hiding this comment.
State that if only the default output or only one output tag has been specified, then a PCollection will be returned otherwise a dictionary of output name to PCollection.
The TestStream has the "output_tags" property which keeps track of which events go to which PCollection. A TestStream going through a round-trip to/from proto won't have these fields set. To do this, a modification to the from_runner_api_parameter is needed to include the parent PTransform proto to retrieve the information from the outputs.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.