Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def _new(

first_cubed_i = min(i for i, s in enumerate(stack_summaries) if s.is_cubed())
first_cubed_summary = stack_summaries[first_cubed_i]
func_name = first_cubed_summary.name

op_name_unique = gensym()

Expand All @@ -111,9 +112,9 @@ def _new(
op_name_unique,
name=op_name_unique,
op_name=op_name,
func_name=func_name,
type="op",
stack_summaries=stack_summaries,
op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}",
hidden=hidden,
)
# array
Expand Down Expand Up @@ -142,9 +143,9 @@ def _new(
op_name_unique,
name=op_name_unique,
op_name=op_name,
func_name=func_name,
type="op",
stack_summaries=stack_summaries,
op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}",
hidden=hidden,
primitive_op=primitive_op,
pipeline=primitive_op.pipeline,
Expand Down Expand Up @@ -216,7 +217,7 @@ def _create_lazy_zarr_arrays(self, dag):
name=name,
op_name=op_name,
type="op",
op_display_name=name,
func_name="",
primitive_op=primitive_op,
pipeline=primitive_op.pipeline,
)
Expand Down Expand Up @@ -388,7 +389,8 @@ def visualize(
tooltip = f"name: {n}\n"
node_type = d.get("type", None)
if node_type == "op":
label = d["op_display_name"]
func_name = d["func_name"]
label = f"{n}\n{func_name}".strip()
op_name = d["op_name"]
if op_name == "blockwise":
d["style"] = '"rounded,filled"'
Expand Down
3 changes: 2 additions & 1 deletion cubed/diagnostics/rich.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def on_compute_start(self, event):
progress_tasks = {}
for name, node in visit_nodes(event.dag):
num_tasks = node["primitive_op"].num_tasks
op_display_name = node["op_display_name"].replace("\n", " ")
func_name = node["func_name"]
op_display_name = f"{name} {func_name}"
progress_task = progress.add_task(
f"{op_display_name}", start=False, total=num_tasks
)
Expand Down
11 changes: 4 additions & 7 deletions cubed/diagnostics/tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ def on_compute_start(self, event):
from tqdm.auto import tqdm

# find the maximum display width so we can align bars below
max_op_display_name = (
max(
len(node["op_display_name"].replace("\n", " "))
for _, node in visit_nodes(event.dag)
)
+ 1 # for the colon
max_op_display_name = max(
len(f"{name} {node['func_name']}:") for name, node in visit_nodes(event.dag)
)

self.pbars = {}
for i, (name, node) in enumerate(visit_nodes(event.dag)):
num_tasks = node["primitive_op"].num_tasks
op_display_name = node["op_display_name"].replace("\n", " ") + ":"
func_name = node["func_name"]
op_display_name = f"{name} {func_name}:"
# note double curlies to get literal { and } for tqdm bar format
bar_format = (
f"{{desc:{max_op_display_name}}} {{percentage:3.0f}}%|{{bar}}{{r_bar}}"
Expand Down
11 changes: 8 additions & 3 deletions cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from cubed.spec import Spec


def make_coiled_function(func, coiled_kwargs):
return coiled.function(**coiled_kwargs)(execution_stats(func))
def make_coiled_function(func, name, coiled_kwargs):
return coiled.function(**coiled_kwargs)(execution_stats(func, name=name))


class CoiledExecutor(DagExecutor):
Expand All @@ -41,7 +41,12 @@ def execute_dag(
for name, node in visit_nodes(dag):
handle_operation_start_callbacks(callbacks, name)
pipeline = node["pipeline"]
coiled_function = make_coiled_function(pipeline.function, merged_kwargs)
# this name will show up on the dask dashboard - need to replace '-' as anything after it is suppressed
func_name = node["func_name"]
op_display_name = f"{name} {func_name}".replace("-", "_")
coiled_function = make_coiled_function(
pipeline.function, op_display_name, merged_kwargs
)
if minimum_workers is not None:
coiled_function.cluster.adapt(minimum=minimum_workers)
# coiled expects a sequence (it calls `len` on it)
Expand Down
8 changes: 6 additions & 2 deletions cubed/runtime/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ def execute_with_timing(function, *args, **kwargs):
)


def execution_stats(func):
def execution_stats(func, name=None):
"""Decorator to measure timing information and peak memory usage of a function call."""

return partial(execute_with_stats, func)
def wrapper(*args, **kwargs):
return execute_with_stats(func, *args, **kwargs)

wrapper.__name__ = name or func.__name__
return wrapper


def execution_timing(func):
Expand Down
Loading