Skip to content

Commit

Permalink
fix dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 8, 2023
1 parent f7721e1 commit dd9abfa
Showing 1 changed file with 77 additions and 54 deletions.
131 changes: 77 additions & 54 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -7,6 +7,7 @@
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from datetime import datetime, timedelta
from itertools import chain
from numbers import Number
from typing import Any, TypeVar

Expand Down Expand Up @@ -3418,76 +3419,98 @@ def format(self, unit: str, val: Any) -> str:
formatters = {"bytes": format_bytes, "seconds": format_time}
return formatters.get(unit, str)(val)

def get_metrics(
self,
) -> tuple[list[tuple[str, str, str, float]], list[tuple[str, str, float]]]:
"""Pre-process fine performance metrics
Returns
-------
- 'execute' metrics: [(function, activity, unit, value), ...]
- 'get_data' metrics: [(activity, unit, value), ...]
"""
execute: defaultdict[tuple[str, str, str], float] = defaultdict(float)
get_data = []

for k, v in self.scheduler.cumulative_worker_metrics.items():
if not isinstance(k, tuple):
continue
context, *other, activity, unit = k
assert isinstance(unit, str)

if context == "execute":
span_id, function = other
assert isinstance(function, str)
# Custom metrics can provide any hashable as the label
# Squash all span_ids together
# TODO offer a filter by span_id, like we already do by function
execute[function, str(activity), unit] += v
elif context == "get-data":
assert not other
assert isinstance(activity, str)
get_data.append((activity, unit, v))

# Ignore memory-monitor and gather-dep metrics

return (
[
(function, activity, unit, v)
for (function, activity, unit), v in sorted(execute.items())
],
sorted(get_data),
)

@without_property_validation
@log_errors
def update(self):
items = sorted(
# Custom metrics can be any hashable
(tuple(map(str, k)), v)
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)
execute_metrics, get_data_metrics = self.get_metrics()

activities = {
activity
for (*_, activity, _), _ in items
if activity not in self.task_activities
activity for *_, activity, _, _ in chain(execute_metrics, get_data_metrics)
}
activities.difference_update(self.task_activities)
if activities:
self.substantial_change = True
self.task_activities.extend(activities)

functions = {
func
for (ctx, func, *_), _ in items
if ctx == "execute" and func not in self.task_exec_data["functions"]
}
units = {unit for *_, unit, _ in chain(execute_metrics, get_data_metrics)}
units.difference_update(self.unit_selector.options)
# extend and append don't work here
self.unit_selector.options += list(units)

functions = {function for function, *_ in execute_metrics}
functions.difference_update(self.task_exec_data["functions"])
if functions:
self.substantial_change = True
self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions)
self.function_selector.options.extend(functions)
self.task_exec_data["functions"].extend(functions)

for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

elif context == "execute":
prefix, activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(
unit, val
)
for activity, unit, val in get_data_metrics:
if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

for prefix, activity, unit, val in execute_metrics:
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(unit, val)

data = self.task_exec_data.copy()
# If user has manually selected function(s) then we are only showing them.
Expand Down

0 comments on commit dd9abfa

Please sign in to comment.