Skip to content

Commit

Permalink
Add more ordering diagnostics to dask.visualize (#7992)
Browse files Browse the repository at this point in the history
Added to help investigate #7929
  • Loading branch information
eriknw committed Nov 2, 2021
1 parent 6353103 commit 89d93a8
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 10 deletions.
82 changes: 74 additions & 8 deletions dask/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,21 @@ def visualize(*args, **kwargs):
optimize_graph : bool, optional
If True, the graph is optimized before rendering. Otherwise,
the graph is displayed as is. Default is False.
color : {None, 'order'}, optional
color : {None, 'order', 'ages', 'freed', 'memoryincreases', 'memorydecreases',
'memorypressure'}, optional
Options to color nodes.
colormap
- None, the default, no colors.
- 'order', colors the nodes' border based on the order they appear in the graph
- 'order', colors the nodes' border based on the order they appear in the graph.
- 'ages', how long the data of a node is held.
- 'freed', the number of dependencies released after running a node.
- 'memoryincreases', how many more outputs are held after the lifetime of a node.
Large values may indicate nodes that should have run later.
- 'memorydecreases', how many fewer outputs are held after the lifetime of a node.
Large values may indicate nodes that should have run sooner.
- 'memorypressure', the number of data held when:
- the node is run (circle)
- the data is released (rectangle)
collapse_outputs : bool, optional
Whether to collapse output boxes, which often have empty labels.
Default is False.
Expand Down Expand Up @@ -652,10 +662,22 @@ def visualize(*args, **kwargs):

color = kwargs.get("color")

if color == "order":
if color in {
"order",
"order-age",
"order-freed",
"order-memoryincreases",
"order-memorydecreases",
"order-memorypressure",
"age",
"freed",
"memoryincreases",
"memorydecreases",
"memorypressure",
}:
import matplotlib.pyplot as plt

from .order import order
from .order import diagnostics, order

o = order(dsk)
try:
Expand All @@ -666,13 +688,57 @@ def visualize(*args, **kwargs):
import matplotlib.pyplot as plt

cmap = getattr(plt.cm, cmap)
mx = max(o.values()) + 1
colors = {k: _colorize(cmap(v / mx, bytes=True)) for k, v in o.items()}

def label(x):
return str(values[x])

data_values = None
if color != "order":
info = diagnostics(dsk, o)[0]
if color.endswith("age"):
values = {key: val.age for key, val in info.items()}
elif color.endswith("freed"):
values = {key: val.num_dependencies_freed for key, val in info.items()}
elif color.endswith("memorypressure"):
values = {key: val.num_data_when_run for key, val in info.items()}
data_values = {
key: val.num_data_when_released for key, val in info.items()
}
elif color.endswith("memoryincreases"):
values = {
key: max(0, val.num_data_when_released - val.num_data_when_run)
for key, val in info.items()
}
else: # memorydecreases
values = {
key: max(0, val.num_data_when_run - val.num_data_when_released)
for key, val in info.items()
}

if color.startswith("order-"):

def label(x):
return str(o[x]) + "-" + str(values[x])

else:
values = o
maxval = kwargs.pop("maxval", None)
if maxval is None:
maxval = max(1, max(values.values()))
colors = {k: _colorize(cmap(v / maxval, bytes=True)) for k, v in values.items()}
if data_values is None:
data_values = values
data_colors = colors
else:
data_colors = {
k: _colorize(cmap(v / maxval, bytes=True))
for k, v in data_values.items()
}

kwargs["function_attributes"] = {
k: {"color": v, "label": str(o[k])} for k, v in colors.items()
k: {"color": v, "label": label(k)} for k, v in colors.items()
}
kwargs["data_attributes"] = {k: {"color": v} for k, v in colors.items()}
kwargs["data_attributes"] = {k: {"color": v} for k, v in data_colors.items()}
elif color:
raise NotImplementedError("Unknown value color=%s" % color)

Expand Down
69 changes: 68 additions & 1 deletion dask/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
This relies on the regularity of graph constructors like dask.array to be a
good proxy for ordering. This is usually a good idea and a sane default.
"""
from collections import defaultdict
from collections import defaultdict, namedtuple
from math import log

from .core import get_dependencies, get_deps, getcycle, reverse_dict # noqa: F401
Expand Down Expand Up @@ -1004,3 +1004,70 @@ def __lt__(self, other):
return self.obj < other.obj
except Exception:
return str(self.obj) < str(other.obj)


OrderInfo = namedtuple(
"OrderInfo",
(
"order",
"age",
"num_data_when_run",
"num_data_when_released",
"num_dependencies_freed",
),
)


def diagnostics(dsk, o=None, dependencies=None):
"""Simulate runtime metrics as though running tasks one at a time in order.
These diagnostics can help reveal behaviors of and issues with ``order``.
Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time.
OrderInfo fields:
- order : the order in which the node is run.
- age : how long the output of a node is held.
- num_data_when_run : the number of outputs held in memory when a node is run.
- num_data_when_released : the number of outputs held in memory when the output is released.
- num_dependencies_freed : the number of dependencies freed by running the node.
"""
if dependencies is None:
dependencies, dependents = get_deps(dsk)
else:
dependents = reverse_dict(dependencies)
if o is None:
o = order(dsk, dependencies=dependencies)

pressure = []
num_in_memory = 0
age = {}
runpressure = {}
releasepressure = {}
freed = {}
num_needed = {key: len(val) for key, val in dependents.items()}
for i, key in enumerate(sorted(dsk, key=o.__getitem__)):
pressure.append(num_in_memory)
runpressure[key] = num_in_memory
released = 0
for dep in dependencies[key]:
num_needed[dep] -= 1
if num_needed[dep] == 0:
age[dep] = i - o[dep]
releasepressure[dep] = num_in_memory
released += 1
freed[key] = released
if dependents[key]:
num_in_memory -= released - 1
else:
age[key] = 0
releasepressure[key] = num_in_memory
num_in_memory -= released

rv = {
key: OrderInfo(
val, age[key], runpressure[key], releasepressure[key], freed[key]
)
for key, val in o.items()
}
return rv, pressure
87 changes: 86 additions & 1 deletion dask/tests/test_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dask
from dask.core import get_deps
from dask.order import ndependencies, order
from dask.order import diagnostics, ndependencies, order
from dask.utils_test import add, inc


Expand Down Expand Up @@ -980,3 +980,88 @@ def cost(deps):
# Allow one to be bad, b/c this is hard!
costs = sorted(cost_of_pairs.values())
assert sum(costs[:-1]) <= 25 or sum(costs) <= 31


def test_diagnostics(abcde):
r"""
a1 b1 c2 d1 e1
/|\ /|\ /|\ /| /
/ | X | X | X | /
/ |/ \|/ \|/ \|/
a0 b0 c0 d0 e0
"""
a, b, c, d, e = abcde
dsk = {
(a, 0): (f,),
(b, 0): (f,),
(c, 0): (f,),
(d, 0): (f,),
(e, 0): (f,),
(a, 1): (f, (a, 0), (b, 0), (c, 0)),
(b, 1): (f, (b, 0), (c, 0), (d, 0)),
(c, 1): (f, (c, 0), (d, 0), (e, 0)),
(d, 1): (f, (d, 0), (e, 0)),
(e, 1): (f, (e, 0)),
}
info, memory_over_time = diagnostics(dsk)
assert memory_over_time == [0, 1, 2, 3, 2, 3, 2, 3, 2, 1]
assert {key: val.order for key, val in info.items()} == {
(a, 0): 0,
(b, 0): 1,
(c, 0): 2,
(d, 0): 4,
(e, 0): 6,
(a, 1): 3,
(b, 1): 5,
(c, 1): 7,
(d, 1): 8,
(e, 1): 9,
}
assert {key: val.age for key, val in info.items()} == {
(a, 0): 3,
(b, 0): 4,
(c, 0): 5,
(d, 0): 4,
(e, 0): 3,
(a, 1): 0,
(b, 1): 0,
(c, 1): 0,
(d, 1): 0,
(e, 1): 0,
}
assert {key: val.num_dependencies_freed for key, val in info.items()} == {
(a, 0): 0,
(b, 0): 0,
(c, 0): 0,
(d, 0): 0,
(e, 0): 0,
(a, 1): 1,
(b, 1): 1,
(c, 1): 1,
(d, 1): 1,
(e, 1): 1,
}
assert {key: val.num_data_when_run for key, val in info.items()} == {
(a, 0): 0,
(b, 0): 1,
(c, 0): 2,
(d, 0): 2,
(e, 0): 2,
(a, 1): 3,
(b, 1): 3,
(c, 1): 3,
(d, 1): 2,
(e, 1): 1,
}
assert {key: val.num_data_when_released for key, val in info.items()} == {
(a, 0): 3,
(b, 0): 3,
(c, 0): 3,
(d, 0): 2,
(e, 0): 1,
(a, 1): 3,
(b, 1): 3,
(c, 1): 3,
(d, 1): 2,
(e, 1): 1,
}

0 comments on commit 89d93a8

Please sign in to comment.