Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: Full branch_id implementation #896

Open
wants to merge 41 commits into
base: main
Choose a base branch
from

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Feb 26, 2024

No description provided.

@@ -29,6 +29,10 @@
]


class BranchId(NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you prefer a NamedTuple over a NewType('BranchId', int) here?

@@ -39,3 +41,12 @@ def assert_eq(a, b, *args, serialize_graph=True, **kwargs):

# Use `dask.dataframe.assert_eq`
return dd_assert_eq(a, b, *args, **kwargs)


def _check_consumer_node(expr, expected, consumer_node=IO, branch_id_counter=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is a "consumer" node in this context?

_name = inst._name
if _name in Expr._instances:
return Expr._instances[_name]

Expr._instances[_name] = inst
return inst

@classmethod
def _check_branch_id_given(cls, args, _branch_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
def _check_branch_id_given(cls, args, _branch_id):
def _maybe_check_branch_id_given(cls, args, _branch_id):

return
return self._bubble_branch_id_down()

def _bubble_branch_id_down(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
def _bubble_branch_id_down(self):
def _propagate_branch_id_down(self):

# is used during optimization to capture the dependents of any given
# expression. A reuse consumer will have the same dependents independently
# of the branch_id parameter, since we want to reuse everything that comes
# before us and split branches up everything that is processed after
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a word missing, maybe for?

Suggested change
# before us and split branches up everything that is processed after
# before us and split branches up for everything that is processed after

Comment on lines +2775 to +2776
if not common_subplan_elimination:
out = result.rewrite("reuse", cache={})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we have cleaned up the meaning of common_subplan_elimination, it's still weird to me that we execute the reuse rule to avoid CSE. Maybe we should rename it to something like avoid_common_subplan_elimination?

@@ -2805,6 +2824,10 @@ def optimize(expr: Expr, fuse: bool = True) -> Expr:
Input expression to optimize
fuse:
whether or not to turn on blockwise fusion
common_subplan_elimination : bool, default False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that common_subplan_elimination does not mean no CSE but rather less CSE. We may want to reflect that in the docstring.

Comment on lines +482 to 501
return (
funcname(type(self)).lower()
+ "-"
+ _tokenize_deterministic(*self.operands, self._branch_id)
)

@functools.cached_property
def _dep_name(self):
# The name identifies every expression uniquely. The dependents name
# is used during optimization to capture the dependents of any given
# expression. A reuse consumer will have the same dependents independently
# of the branch_id parameter, since we want to reuse everything that comes
# before us and split branches up everything that is processed after
# us. So we have to ignore the branch_id from tokenization for those
# nodes.
if not self._reuse_consumer:
return self._name
return (
funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels prone to errors/inconsistencies when subclassing. Would it make sense to define a property _dep_name_tokens that could be overriden and a property _name_tokens that just always adds branch_id to the _dep_name_tokens? This could then feed into a common function used to generate the name using the tokens as input.

For example, FromGraph already implements a new _name but not a new _dep_name.

@@ -43,9 +47,17 @@ class Expr:
_parameters = []
_defaults = {}
_instances = weakref.WeakValueDictionary()
_branch_id_required = False
_reuse_consumer = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is ambiguous. Can we come up with something more descriptive?

]
return type(self)(*ops)

def _substitute_branch_id(self, branch_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
def _substitute_branch_id(self, branch_id):
def _maybe_substitute_branch_id(self, branch_id):

or something else that highlights the conditionality.

expected = expected.a + expected.a.sum()
pd.testing.assert_series_equal(x.sort_index(), expected)

# Check that we have 1 shuffle barrier but 20 p2pshuffle tasks for the output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this PR introduces functionality that relies on being able to read shuffle outputs several times. The fact that this seems to work with disk-based P2P is a lucky implementation detail but not guaranteed to work, let alone tested. Before releasing, we should at the very least test this. (It will also not work with in-memory P2P, but that's currently not supported in dask-expr anyway.)

@hendrikmakait
Copy link
Member

One general comment: This PR introduces many different names for seemingly similar things, e.g., branch vs. subplan, reuse vs. elimination. We may want to clean this up before merging to make it easier to grasp concepts.

Comment on lines +574 to +575
# Ensure that shuffles with different branch_ids have the same barrier
token = self._dep_name.split("-")[-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty strongly -1 for this. The fact that this works is purely coincidental. The barrier should be treated as an internal implementation detail since way too much logic depends on this. If we want/need this functionality, it should be supported as a proper API of the extension

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the benefit of reusing results that are written to disk, but I agree that the fact that this works is purely coincidental and very brittle wrt to changes.

From what I see, this might be useful if it were

  • well-tested (also on the P2P side)
  • not hidden as what looks like an implementation detail within the _layer

I'm not sure if this is something we can implement within the P2P extension. Maybe we can do this after we have the scheduler integration? I could also see this become an optimization pass that makes this very explicit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants