Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1630] Adds API for defining Splittable DoFns using Python SDK. #3882

Closed
wants to merge 7 commits into from

Conversation

chamikaramj
Copy link
Contributor

See https://s.apache.org/splittable-do-fn-python-sdk for the design.

This PR and the above doc were updated to reflect following recent updates to Splittable DoFn.

  • Support for ProcessContinuations
  • Support for dynamically updating output watermark irrespective of the output element production.

This will be followed by a PR that adds support for reading Splittable DoFns using DirectRunner.

See https://s.apache.org/splittable-do-fn-python-sdk for the design.

This PR and the above doc were updated to reflect following recent updates to Splittable DoFn.
* Support for ProcessContinuations
* Support for dynamically updating output watermark irrespective of the output element production.

This will be followed by a PR that adds support for reading Splittable DoFns using DirectRunner.
@chamikaramj
Copy link
Contributor Author

R: @robertwb

cc: @jkff

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

need to write a Splittable DoFn.

Not all runners support Splittable DoFn. See the capability matrix
(a href="https://beam.apache.org/documentation/runners/capability-matrix/)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The matrix is currently focused on Java, so this is a bit misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this paragraph.

A Splittable DoFn must provide suitable overrides for the following methods
of the ``DoFn`` class.
* new_tracker()
* restriction_coder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java, restriction_coder() and split() are not required (have defaults). Will this be addressed in Python too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made those two methods optional and added default implementations.

pass

@staticmethod
def stop():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary in Python? Seems like with the "last yielded element is a ProcessContinuation" thing, instead of saying yield stop() you can simply not yield anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense to remove this. Removed and updated docs.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

need to write a Splittable DoFn.

Not all runners support Splittable DoFn. See the capability matrix
(a href="https://beam.apache.org/documentation/runners/capability-matrix/)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this paragraph.

A Splittable DoFn must provide suitable overrides for the following methods
of the ``DoFn`` class.
* new_tracker()
* restriction_coder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made those two methods optional and added default implementations.

pass

@staticmethod
def stop():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense to remove this. Removed and updated docs.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.03%) to 69.61% when pulling 95a209d on chamikaramj:sdf_api into d6c6339 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, delegating the rest of the review to Robert.

@chamikaramj
Copy link
Contributor Author

Friendly ping :)

See following documents for more details.
* https://s.apache.org/splittable-do-fn
* https://s.apache.org/splittable-do-fn-python-sdk
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document which methods maybe called concurrently (and hence need locking).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""Returns the current restriction.

Returns a restriction accurately describing the full range of work the
current ``DoFn.process()`` call will do, including already completed work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be updated over time? Should we note that anything in this restriction is a candidate for removal, and try_claim must be called before actually doing any work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't have try_claim() (and other methods needed for dynamic work rebalancing). For now I mentioned that current restriction might be updated dynamically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...due to concurrent invocation of other methods...

I think the fact that this may be called from other threads is key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

fully read.

Returns: ``True`` if current restriction has been fully processed.
Raises ValueError: if there is still any unclaimed work remaining in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it legal for a DoFn to return before fully processing the restriction?

Seems like it'd be better to call checkpoint() after the process has completed, at which point, if the entire restriction was processed, the current (remaining) restriction would be None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the runner will take a checkpoint after the processing has completed. This method will be called after that as an additional check to make sure that there are no data remaining in the current restriction after checkpoint is taken.

"""
raise NotImplementedError

def checkpoint(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about try_split? I'd like to avoid redundancy with that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't support dynamic work rebalancing (neither does Java SDF). I think we should have further discussions before finalizing the API needed for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't introduce a method that we know will be redundant with future developments.

single parameter of type ``Timestamp`` or as an integer that gives the
watermark in number of seconds.

** Splittable DoFns **
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps move the Args section up to here (or maybe even higher). It does feel odd that 99% of DoFns won't be splittable, but 80% of this docstring is about splittable DoFns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done :).

yield restriction

@staticmethod
def resume(resume_delay=0):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to be used as

return self.resume(...)

Maybe it'd be better to put this as a static method on ProcessContinuation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -189,6 +276,54 @@ def finish_bundle(self):
"""
pass

def new_tracker(self, restriction):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear that all of these methods are only used in the splitable case. Maybe some documentation? (Maybe the names should all reflect this, e.g. containing the word "splittable"?)

I might go so far as to put these in a mixin class (interface) that would be checked for if the splittable argument was used, which we could also then to enforce that they're implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated documentation. I don't think it makes sense to move these to a MixIn since we don't expect any type other than DoFn to have this functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage is that we can make these many methods abstract such that the user won't forget any of them when they need them.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

See following documents for more details.
* https://s.apache.org/splittable-do-fn
* https://s.apache.org/splittable-do-fn-python-sdk
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""Returns the current restriction.

Returns a restriction accurately describing the full range of work the
current ``DoFn.process()`` call will do, including already completed work.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't have try_claim() (and other methods needed for dynamic work rebalancing). For now I mentioned that current restriction might be updated dynamically.

"""
raise NotImplementedError

def checkpoint(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't support dynamic work rebalancing (neither does Java SDF). I think we should have further discussions before finalizing the API needed for that.

single parameter of type ``Timestamp`` or as an integer that gives the
watermark in number of seconds.

** Splittable DoFns **
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done :).

yield restriction

@staticmethod
def resume(resume_delay=0):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -189,6 +276,54 @@ def finish_bundle(self):
"""
pass

def new_tracker(self, restriction):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated documentation. I don't think it makes sense to move these to a MixIn since we don't expect any type other than DoFn to have this functionality.

fully read.

Returns: ``True`` if current restriction has been fully processed.
Raises ValueError: if there is still any unclaimed work remaining in the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the runner will take a checkpoint after the processing has completed. This method will be called after that as an additional check to make sure that there are no data remaining in the current restriction after checkpoint is taken.

@chamikaramj
Copy link
Contributor Author

Failure seems to be unrelated.

Retest this please.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 69.561% when pulling 10067f0 on chamikaramj:sdf_api into d6c6339 on apache:master.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't realize I handn't sent these comments out yet.

"""

def __init__(self, should_resume, resume_delay=0):
"""Initializes a ProcessContinuation object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the "should_resume" parameter if it's never returned (yielded) from a DoFn that shouldn't?

"""
raise NotImplementedError

def checkpoint(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't introduce a method that we know will be redundant with future developments.

@@ -189,6 +276,54 @@ def finish_bundle(self):
"""
pass

def new_tracker(self, restriction):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage is that we can make these many methods abstract such that the user won't forget any of them when they need them.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

"""
raise NotImplementedError

def checkpoint(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
We shouldn't introduce a method that we know will be redundant with future developments.

Seems like, based on previous discussions [1], we are moving towards maintaining both methods (try_split() and checkpoint()) due to the need for calling checkpoint() after a resume() ? Also Java API currently has the checkpoint() method [2], we should add this to Python for consistency and if needed update both APIs (again maintaining consistency) after future discussions/experiments ?

[1] https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
[2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L42

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, checkpoint() enforces that immediately following claim (try_claim() once we have that method) should be rejected while try_split(0) will not enforce that.

"""

def __init__(self, should_resume, resume_delay=0):
"""Initializes a ProcessContinuation object.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Do we need the "should_resume" parameter if it's never returned (yielded) from a DoFn that shouldn't?

Removed.

@@ -189,6 +276,54 @@ def finish_bundle(self):
"""
pass

def new_tracker(self, restriction):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
The advantage is that we can make these many methods abstract such that the user won't forget any of them when they need them.

I'm not 100% sure of this one. Seems like you are suggesting to ask users to directly implement the MixIn (say SplittableMixIn). But this means that every SDF author will have to implement two classes due to the need to make it easy for the SDK to perform validation. I think it might be better to make the API easy for users (just implement DoFn and implement required SDF method if needed) and make validation the hard (slightly) way.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.008%) to 69.568% when pulling d538a57 on chamikaramj:sdf_api into d6c6339 on apache:master.

@robertwb
Copy link
Contributor

robertwb commented Oct 4, 2017

To follow up, I'm OK with leaving checkpoint() for parity with Java until we add try_split. Of note, the former can be called concurrently with DoFn.process, but also is called whenever resume is returned and in this case requires stricter semantics.

Regarding the mixin/documentation issues, here's a better idea: Rather than take a contentless DoFn.RestrictionTrackerParam identifier, have the RestrictionTracker parameter have a default argument that would be a RestrictionProvider instance. This new type would hold all the methods for dealing with restrictions (element -> initial restriction, restriction -> restriction* splitting, and restriction -> tracker, and restriction coder) and of course be a natural place to put all the documentation as well.

@chamikaramj
Copy link
Contributor Author

Thanks.

Added 'RestrictionProvider' and updated documentation accordingly.

PTAL.

@chamikaramj
Copy link
Contributor Author

Retest this please.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!

"""Returns the current restriction.

Returns a restriction accurately describing the full range of work the
current ``DoFn.process()`` call will do, including already completed work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...due to concurrent invocation of other methods...

I think the fact that this may be called from other threads is key.

Splittable ``DoFn``s.

To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``
method of that class should have exactly one parameter of a type that is a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one parameter whose default value is an instance of RestrictionProvider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

gives the watermark in number of seconds.
"""

def new_tracker(self, restriction):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/new/create/ (which is consistent with the rest of our API).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""
raise NotImplementedError

def restriction_coder(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this one last.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

``DoFn.SideInputParam``: a side input that may be used when processing.
``DoFn.TimestampParam``: timestamp of the input element.
``DoFn.WindowParam``: ``Window`` the input element belongs to.
An object of type ``RestrictionProvider``: having a parameter of a type that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An iobase.RestrictionProvider instance: a restriction tracker will be provided here to allow treatment as a Splittable `DoFn``.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

"""Returns the current restriction.

Returns a restriction accurately describing the full range of work the
current ``DoFn.process()`` call will do, including already completed work.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Splittable ``DoFn``s.

To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``
method of that class should have exactly one parameter of a type that is a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

gives the watermark in number of seconds.
"""

def new_tracker(self, restriction):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""
raise NotImplementedError

def restriction_coder(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

``DoFn.SideInputParam``: a side input that may be used when processing.
``DoFn.TimestampParam``: timestamp of the input element.
``DoFn.WindowParam``: ``Window`` the input element belongs to.
An object of type ``RestrictionProvider``: having a parameter of a type that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chamikaramj
Copy link
Contributor Author

Retest this please

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 69.579% when pulling f05b4cc on chamikaramj:sdf_api into d6c6339 on apache:master.

@chamikaramj
Copy link
Contributor Author

Jenkins failure above is unrelated.

Retest this please.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

``current_restriction()`` and the return value of this method invocation
combined.

This method must be called at most once on a given ``RestrictionTracker``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can't enforce or require this, as we may call checkpint immediately before a resume is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chamikaramj
Copy link
Contributor Author

Thanks.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 69.579% when pulling cad1178 on chamikaramj:sdf_api into d6c6339 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.006%) to 69.582% when pulling cad1178 on chamikaramj:sdf_api into d6c6339 on apache:master.

@asfgit asfgit closed this in 31da49c Oct 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants