Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to dask.order: be more eager at times #7929

Merged
merged 10 commits into from
Sep 1, 2021
103 changes: 70 additions & 33 deletions dask/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def finish_now_key(x):
result = {}
i = 0

# `inner_stask` is used to perform a DFS along dependencies. Once emptied
# `inner_stack` is used to perform a DFS along dependencies. Once emptied
# (when traversing dependencies), this continue down a path along dependents
# until a root node is reached.
#
Expand Down Expand Up @@ -307,10 +307,6 @@ def finish_now_key(x):
seen_update(deps)
continue

result[item] = i
i += 1
deps = dependents[item]

# If inner_stack is empty, then we typically add the best dependent to it.
# However, we don't add to it if we complete a node early via "finish_now" below
# or if a dependent is already on an inner_stack. In this case, we add the
Expand All @@ -319,35 +315,77 @@ def finish_now_key(x):
# 1. shrink `deps` so that it can be processed faster,
# 2. make sure we don't process the same dependency repeatedly, and
# 3. make sure we don't accidentally continue down an expensive-to-compute path.

deps = dependents[item]
add_to_inner_stack = True
if metrics[item][3] == 1: # min_height
# Don't leave any dangling single nodes! Finish all dependents that are
# ready and are also root nodes.
finish_now = {
dep
for dep in deps
if not dependents[dep] and num_needed[dep] == 1
}
if finish_now:
deps -= finish_now # Safe to mutate
if len(finish_now) > 1:
finish_now = sorted(finish_now, key=finish_now_key)
for dep in finish_now:
result[dep] = i
i += 1
add_to_inner_stack = False
item_key = partition_keys[item]
while True:
# Be greedy! This loop will repeat while the item has a single dependent
# (after removing `finish_now` and `already_seen` from the dependents) and
# this dependent is ready to run. Let me explain.
#
# In simple terms, if there is a single dependent that's ready to run, and doing
# so will allow the current node to be released from memory, then we should do so.
# In the short term, it gains us little, but it also costs us litle. Longer term,
# this strategy may provide more opportunities for saving memory.
#
# In reality, computing a dependent here *may* allow the parent to be released
# immediately, but not necessarily. There is, however, an expectation that the
# parent will be released *soon*, because its other dependents are already on the
# inner stacks (and in `already_seen`).
#
# Hence, this can introduce locally sub-optimal behavior (more memory, and more
# work that may not go towards our tactical goal) with the goal of being more
# globally optimal. The risk from being locally sub-optimal is most likely low,
# and the reward for being more globally optimal is potentially very high, so I
# think this strategy is probably a good idea in general. Heh, even so, I expect
# there are pathological failure cases to be found :)
result[item] = i
i += 1
if metrics[item][3] == 1: # min_height
# Don't leave any dangling single nodes! Finish all dependents that are
# ready and are also root nodes.
finish_now = {
dep
for dep in deps
if not dependents[dep] and num_needed[dep] == 1
}
if finish_now:
deps -= finish_now # Safe to mutate
if len(finish_now) > 1:
finish_now = sorted(finish_now, key=finish_now_key)
for dep in finish_now:
result[dep] = i
i += 1
add_to_inner_stack = False
if deps:
for dep in deps:
num_needed[dep] -= 1

already_seen = deps & seen
if already_seen:
if len(deps) == len(already_seen):
deps = None
break
add_to_inner_stack = False
deps -= already_seen

if len(deps) == 1:
# Fast path! We trim down `deps` above hoping to reach here.
(item,) = deps
if num_needed[item] == 0:
deps = dependents[item]
key = partition_keys[item]
if key < item_key:
# We have three reasonable choices for item_key: first, last, min.
# I don't have a principled reason for choosing any of the options.
# So, let's choose min. It may be the most conservative by avoiding
# accidentally going down an expensive-to-compute path.
item_key = key
continue
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, if we remove these lines, then the code logic is exactly equivalent to what it was before.

break

if deps:
for dep in deps:
num_needed[dep] -= 1

already_seen = deps & seen
if already_seen:
if len(deps) == len(already_seen):
continue
add_to_inner_stack = False
deps -= already_seen

if len(deps) == 1:
# Fast path! We trim down `deps` above hoping to reach here.
(dep,) = deps
Expand All @@ -367,7 +405,7 @@ def finish_now_key(x):
inner_stack_pop = inner_stack.pop
seen_add(dep)
continue
if key < partition_keys[item]:
if key < item_key:
next_nodes[key].append(deps)
else:
later_nodes[key].append(deps)
Expand All @@ -376,7 +414,6 @@ def finish_now_key(x):
dep_pools = defaultdict(list)
for dep in deps:
dep_pools[partition_keys[dep]].append(dep)
item_key = partition_keys[item]
if inner_stack:
# If we have an inner_stack, we need to look for a "better" path
prev_key = partition_keys[inner_stack[0]]
Expand Down