[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.#11514
[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.#11514lukecwik merged 6 commits intoapache:masterfrom
Conversation
…ing to fix them up.
| inject_pcollection(mi) | ||
|
|
||
| return infos_list | ||
| pcollection_ids = self.process_bundle_descriptor.transforms[ |
There was a problem hiding this comment.
I believe the id order aligns with the receiver order since transform_consumers built above iterates the outputs map in the same order and this gets plumbed down through to Operation.
There was a problem hiding this comment.
In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here?
|
Run Portable_Python PreCommit |
robertwb
left a comment
There was a problem hiding this comment.
I suppose this would be to finish the transition from reporting counters on PTransform outputs to recording them on the various PCollections.
LGTM if you can change to use a mapping of tags to pcoll ids rather than relying on ordering being the same.
| (self.receivers, pcollection_ids)) | ||
|
|
||
| all_monitoring_infos = {} | ||
| for i in range(len(self.receivers)): |
There was a problem hiding this comment.
This will change if you use a mapping, but zip would be the idiom to use here.
| inject_pcollection(mi) | ||
|
|
||
| return infos_list | ||
| pcollection_ids = self.process_bundle_descriptor.transforms[ |
There was a problem hiding this comment.
In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here?
| tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL] | ||
|
|
||
| if not ptransform_label in self.process_bundle_descriptor.transforms: | ||
| return |
There was a problem hiding this comment.
This would be a bug, right?
| if not ptransform_label in self.process_bundle_descriptor.transforms: | ||
| return | ||
| if not tag_label in self.process_bundle_descriptor.transforms[ | ||
| ptransform_label].outputs: |
There was a problem hiding this comment.
Actually, this can happen, and might be what's happening here. There is no PCollection for this tag, but the user outputted a value to this tag. It would make sense to record this output even if we didn't use it. This is another downside of attaching these counters to PCollections themselves rather than to PTransform outputs.
There was a problem hiding this comment.
unknown outputs should probably be reported another way
I'll try it out but I worry that the consumers/receivers are either using indices or the post string converted tag names since python does some post processing converting the string tags to non string tags. |
|
I looked through the implementation and it seems as though adding the pcollection id to the ConsumerSet doesn't work out since operations don't have that level of visibility in pipeline proto and consumers works off of a index -> receiver map and expects tags to get mapped to indices so we would need to go through all three layers. I suggest that we stick with this brittle approach until we can delete the non-portable Python worker implementation which would make a lot of the layers simpler. |
HuangLED
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the fix.
robertwb
left a comment
There was a problem hiding this comment.
Thanks, LGTM. Can't wait 'till we can get rid of the legacy worker and clean this up!
|
Run Python PreCommit |
|
I am suspecting that this made precommits flaky |
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.