Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10036] More flexible dataframes partitioning. #11766

Merged
merged 7 commits into from Jun 5, 2020

Conversation

robertwb
Copy link
Contributor

Also adds (naive) dataframe.agg() that uses this.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't dug into the logic much yet, but I have some bikesheddy comments I wanted to go ahead and send out since answers to them could help clarify things.

@@ -23,6 +23,7 @@

from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import frames # pylint: disable=unused-import
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sure the wrapper code is populated with the various types of frames.

Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
"""

_INDEX_PARTITIONS = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this was 10 right (in partitioned_by_index)? Assuming this intentional, but I just wanted to double-check its not a typo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was just testing things. I'll change it back. (It would be great to get rid of this altogether, as it limits parallelism, but that's not part of this change.)

class Partitioning(object):
"""A class representing a (consistent) partitioning of dataframe objects.
"""
def is_subpartition_of(self, other):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think I'd prefer is_subpartitioning_of

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def preserves_partition_by_index(self): # type: () -> bool
"""Whether the result of this expression will be partitioned by index
whenever all of its inputs are partitioned by index."""
def preserves_partition_by(self): # type: () -> Partitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meaning of this function is a little confusing now since it implies some connection to the input partitioning, but it also has it's own partitioning. Would renaming it to outputs_.. or produces_.. still be accurate, or is the output partitioning actually a function of both "preserves" and the input?

I also think we should consider changing .._partition_by to .._partitioning for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a function of both the input and the operation. E.g. an elementwise operation preserves all existing partitioning, but does not guarantee any.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense. So perhaps "preserves" could be thought of as an upper bound on the partitioning of the output (similar to how "requires" is a lower bound on the partitioning of the input).

It looks like every current expression has preserves set to either Nothing or Singleton. Wouldn't it be simpler to just keep preserves as a boolean? Or maybe you have some other expression in mind where a boolean won't be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are operations, such as setting a column to be an additional level of the index, that would do partial preservation. But perhaps that's not worth the additional complexity. I can change this to a boolean if you'd rather.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense. And I guess the name "preserves" is actually intuitive now that I understand it's setting an upper bound on the output partitioning.

I think the complexity is worth it, unless there's a chance those operations will never materialize. Can you just add a docstring indicating that "preserves" sets an upper bound on the output partitioning (or any other language to make sure readers can grok it)? A similar comment about requires would be good too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring comments added.

@TheNeuralBit TheNeuralBit self-requested a review June 2, 2020 00:06
Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I have a couple minor suggestions and questions (in addition to the one about docstrings for preserves and requires above).

def __bool__(self):
return False

__nonzero__ = __bool__
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that making Nothing falsy and relying on that in logic elsewhere harms readability. What do you think about dropping this and just explicitly checking for Nothing when needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



class Singleton(Partitioning):
"""A partitioning co-locating all data to a singleton partition.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't Singleton completely partitioning the data into one element per partition? This description doesn't seem consistent to me, maybe I'm misunderstanding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded.

elif expr in stage.inputs:
return stage.partitioning.is_subpartitioning_of(partitioning)
elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
if expr.requires_partition_by().is_subpartitioning_of(partitioning):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had trouble justifying this logic to myself, ended up writing a little proof which I'm a bit embarrassed to share with a Math PhD:

output partitioning of expr = min(expr.preserves, input partitioning)
input partitioning >= expr.requires

thus if expr.requires >= required output AND expr.preserves >= required output
then output partitioning of expr >= required output

Otherwise we need to go up the tree of inputs to figure out their partitionings

This may be the least concise way to express this so I don't know if it's worth putting in a comment verbatim, but something to that effect would be helpful (assuming I've got it right)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fleshing this out with more comments.

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.

def preserves_partition_by_index(self): # type: () -> bool
"""Whether the result of this expression will be partitioned by index
whenever all of its inputs are partitioned by index."""
def preserves_partition_by(self): # type: () -> Partitioning
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring comments added.



class Singleton(Partitioning):
"""A partitioning co-locating all data to a singleton partition.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded.

def __bool__(self):
return False

__nonzero__ = __bool__
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

elif expr in stage.inputs:
return stage.partitioning.is_subpartitioning_of(partitioning)
elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
if expr.requires_partition_by().is_subpartitioning_of(partitioning):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fleshing this out with more comments.

@robertwb robertwb merged commit a74374d into apache:master Jun 5, 2020
yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
Also add simple dataframe.agg() which uses these features.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants