Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#19851: Proof-of-concept to implement deferred side inputs for combiners #30743

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

hjtran
Copy link
Contributor

@hjtran hjtran commented Mar 26, 2024

#19851
This is a proof-of-concept draft for fixing/implementing deferred side inputs with Combiners. I could use feedback on how to proceed and then I'll clean it up as well.

Issue:
Currently if you try to use a side input with a combine function, you end up with a traceback during the pipeline translation step:

  File "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py", line 1367, in lift_combiners
    expansion = lifted_stages if can_lift(transform) else unlifted_stages
                                 ^^^^^^^^^^^^^^^^^^^
  File "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py", line 1239, in can_lift
    context.components.pcollections[only_element(
                                    ^^^^^^^^^^^^^
  File "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py", line 2082, in only_element
    element, = iterable
    ^^^^^^^^
ValueError: too many values to unpack (expected 1)

Fix?:
The issue is because in the code where we determine whether or not to lift a combiner, we assume the combiner has a single input pcoll, which is not true when the combiner has a deferred side input.

I went through fixing this and then realized that we also don't have plumbing for deferred side inputs implemented at the operations.py layer either. It seemed like a lot of work to duplicate all the side input plumbing that ParDo has when the phases of the combiners can be expressed as ParDos anyways.

Inside of direct/helper_transforms.py we actually already define a ParDo-based version of a lifted combiner. I tried adding an override to use it only if we have a combiner with side inputs but at that point we've already substituted away the side input for a ArgumentPlaceholders. I instead updated it and returned it in CombinePerKey.__new__. I couldn't put it in CombinePerKey.expand since CombinePerKey has the special urn that would've resulted in the subtransforms getting ignored

@hjtran
Copy link
Contributor Author

hjtran commented Mar 26, 2024

reviewer: @robertwb

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@liferoad liferoad requested a review from tvalentyn March 29, 2024 20:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant