Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2927] Python SDK support for portable side input #4020

Closed
wants to merge 16 commits into from

Conversation

robertwb
Copy link
Contributor

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@robertwb
Copy link
Contributor Author

R: @aaltay

Copy link
Member

@aaltay aaltay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @robertwb!

@@ -288,6 +290,80 @@ def _view_options(self):
def element_type(self):
return typehints.Any

# TODO(robertwb): Get rid of _from_runtime_iterable and _view_options
# in favor of _side_input_data().
def _side_input_data(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default implementation only for AsSingleton?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would work for any of them, but I left this here to show (and test) the relationship.

class _UnpickledSideInput(AsSideInput):
def __init__(self, side_input_data):
self._data = side_input_data
# For older runners.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which older runners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified, this is for non-FnAPI runners. Also moved the logic to be more local.



class SideInputData(object):
"""All of the data about a side input except for the bound PCollection."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bound -> bounded ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bound, as in the PCollection that's bound to this side input.

indexed_side_inputs = [
(int(ix[4:]), pvalue.AsSideInput.from_runner_api(si, context))
for ix, si in pardo_payload.side_inputs.items()]
result.side_inputs = [si for _, si in sorted(indexed_side_inputs)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to sort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added a comment.

while request.id not in self._responses_by_id:
with self._lock:
if request.id not in self._responses_by_id:
response = self._responses.get(timeout=2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why timeout=2? Could not this raise an empty queue exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to have 2, changed to 1.

Basically, this should periodically check for exceptions, and periodically release the lock in case the data for another thread came in.

However, this code isn't at all the easiest to follow. Changed to do the demultiplexing in the reader loop and use events to block. (In addition to being simpler, this should better extend itself to being able to return future-like objects for side inputs and state in the future.)

label='windowed')


# def test_pardo_unfusable_side_inputs(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to remove this test or implement it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to inherit the test that was formerly disabled...

@robertwb
Copy link
Contributor Author

PTAL. I'll fix the failing test, which I think is essentially triggered by BEAM-3085 (due to more pipelines being able to be translated).

@aaltay
Copy link
Member

aaltay commented Oct 21, 2017

LGTM, thank you.

@robertwb
Copy link
Contributor Author

retest this please

@robertwb
Copy link
Contributor Author

Looks like Jenkins Python is finally happy. Merging.

@asfgit asfgit closed this in 77c77ad Oct 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants