Skip to content

Commit

Permalink
Dashboard: Fine Performance Metrics (#7725)
Browse files Browse the repository at this point in the history
Co-authored-by: crusaderky <crusaderky@gmail.com>
  • Loading branch information
milesgranger and crusaderky committed May 16, 2023
1 parent 6303f95 commit 7f272d9
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 9 deletions.
2 changes: 1 addition & 1 deletion distributed/comm/tests/test_ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_registered(ucx_loop):


async def get_comm_pair(
listen_addr="ucx://" + HOST, listen_args=None, connect_args=None, **kwargs
listen_addr=f"ucx://{HOST}", listen_args=None, connect_args=None, **kwargs
):
listen_args = listen_args or {}
connect_args = connect_args or {}
Expand Down
297 changes: 295 additions & 2 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from datetime import datetime
from datetime import datetime, timedelta
from numbers import Number
from typing import Any, TypeVar

Expand All @@ -30,12 +30,14 @@
HelpTool,
HoverTool,
HTMLTemplateFormatter,
MultiChoice,
NumberFormatter,
NumeralTickFormatter,
OpenURL,
PanTool,
Range1d,
ResetTool,
Select,
Tabs,
TapTool,
Title,
Expand All @@ -44,7 +46,7 @@
)
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models.widgets.markups import Div
from bokeh.palettes import Viridis11
from bokeh.palettes import Viridis11, small_palettes
from bokeh.plotting import figure
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack
Expand Down Expand Up @@ -3375,6 +3377,297 @@ def update(self):
)


class FinePerformanceMetrics(DashboardComponent):
"""
The main overview of the Fine Performance Metrics page.
"""

@log_errors
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.senddata = defaultdict(list)
self.sendsrc = ColumnDataSource(data=dict())
self.task_exec_data = defaultdict(list)
self.task_exec_data_limited = defaultdict(list)
self.task_exec_by_prefix_src = ColumnDataSource(data=dict())
self.task_exec_by_activity_src = ColumnDataSource(data=dict())
self.substantial_change = False
self.task_activities = []
self.init_root()

def init_root(self):
def handle_selector_chng(attr, old, new):
self.unit_selected = new
self.substantial_change = True

self.function_selector = MultiChoice(value=[], options=[])
self.function_selector.placeholder = "Select specific functions"
self.unit_selector = Select(title="Unit selection", options=[])
self.unit_selector.on_change("value", handle_selector_chng)
self.unit_selected = "seconds"
self.task_exec_by_activity_chart = figure()
self.task_exec_by_prefix_chart = figure()
self.senddata_by_activity_chart = figure()
self.root = column(
self.function_selector,
self.unit_selector,
row(
[
self.task_exec_by_prefix_chart,
self.task_exec_by_activity_chart,
self.senddata_by_activity_chart,
],
sizing_mode="stretch_width",
),
sizing_mode="scale_width",
)

def format(self, unit: str, val: Any) -> str:
formatters = {"bytes": format_bytes, "seconds": format_time}
return formatters.get(unit, str)(val)

@without_property_validation
@log_errors
def update(self):
items = sorted(
(k, v)
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)
for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

elif context == "execute":
prefix, activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.task_activities:
self.substantial_change = True
self.task_activities.append(activity)

if prefix not in self.task_exec_data["functions"]:
self.substantial_change = True
self.function_selector.options.append(prefix)
self.task_exec_data["functions"].append(prefix)
self.task_exec_data["timestamp"].append(datetime.utcnow())
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(
unit, val
)

data = self.task_exec_data.copy()
# If user has manually selected function(s) then we are only showing them.
if self.function_selector.value:
indexes = [data["functions"].index(f) for f in self.function_selector.value]
for key, values in data.items():
data[key] = [values[idx] for idx in indexes]

# Otherwise limit those being shown which have 'expired' to be displayed
else:
cutoff = datetime.utcnow() - timedelta(seconds=10)
n_show = len([d for d in data["timestamp"] if d > cutoff]) or 5
for key in data:
data[key] = data[key][-n_show:]
self.task_exec_data_limited = data.copy()

# Show total number of functions to choose from
self.function_selector.title = (
f"Filter by function ({len(self.function_selector.options)}):"
)

task_exec_piechart = self._build_task_execution_by_activity_chart(
self.task_exec_data_limited.copy()
)
task_exec_barchart = self._build_task_execution_by_prefix_chart(
self.task_exec_data_limited.copy()
)
senddata_piechart = self._build_senddata_chart(self.senddata.copy())

# Replacing the child causes small blips if done every iteration vs updating
# renderers, but it's needed when new functions and/or activities show up to
# rerender plot
if self.substantial_change:
self.root.children[-1].children[0] = task_exec_barchart
self.root.children[-1].children[1] = task_exec_piechart
self.root.children[-1].children[2] = senddata_piechart
self.substantial_change = False
else:
self.task_exec_by_prefix_chart.renderers = task_exec_piechart.renderers
self.task_exec_by_activity_chart.renderers = task_exec_barchart.renderers
self.senddata_by_activity_chart.renderers = senddata_piechart.renderers

def _build_task_execution_by_activity_chart(
self, task_exec_data: defaultdict[str, list]
) -> figure:
piechart_data = {}
piechart_data["value"] = [
sum(task_exec_data[f"{op}_{self.unit_selected}"])
for op in self.task_activities
]
piechart_data["text"] = [
self.format(self.unit_selected, v) for v in piechart_data["value"]
]
piechart_data["angle"] = [
(
sum(task_exec_data[f"{activity}_{self.unit_selected}"])
/ sum(piechart_data["value"])
if sum(piechart_data["value"])
else 0 # may not have any bytes movement reported, avoid divide by zero
)
* 2
* math.pi
for activity in self.task_activities
]
piechart_data["color"] = small_palettes["YlGnBu"].get(
len(self.task_activities), []
)
piechart_data["activity"] = self.task_activities
self.task_exec_by_activity_src.data = piechart_data

piechart = figure(
height=500,
sizing_mode="scale_both",
title="Task execution, by activity",
tools="hover",
tooltips="@{activity}: @text",
x_range=(-0.5, 1.0),
)
piechart.axis.axis_label = None
piechart.axis.visible = False
piechart.grid.grid_line_color = None

piechart.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angle", include_zero=True),
end_angle=cumsum("angle"),
line_color="white",
fill_color="color",
legend_field="activity",
source=self.task_exec_by_activity_src,
)
return piechart

def _build_task_execution_by_prefix_chart(
self, task_exec_data: defaultdict[str, list]
) -> figure:
barchart = figure(
x_range=task_exec_data["functions"],
height=500,
sizing_mode="scale_both",
title="Task execution, by function",
tools="pan,wheel_zoom,box_zoom,reset",
)
barchart.yaxis.visible = False
barchart.xaxis.major_label_orientation = 0.2
barchart.grid.grid_line_color = None
stackers = [
name for name in task_exec_data if name.endswith(self.unit_selected)
]
if stackers:
renderers = barchart.vbar_stack(
stackers,
x="functions",
width=0.9,
source=self.task_exec_by_prefix_src,
color=small_palettes["YlGnBu"].get(len(self.task_activities), []),
legend_label=self.task_activities,
)
for vbar in renderers:
tooltips = [
(
vbar.name,
f"@{{{vbar.name}_text}}",
),
("function", "@functions"),
]
barchart.add_tools(HoverTool(tooltips=tooltips, renderers=[vbar]))

if any(
len(self.task_exec_by_prefix_src.data[k]) != len(task_exec_data[k])
for k in self.task_exec_by_prefix_src.data
):
self.substantial_change = True

self.task_exec_by_prefix_src.data = dict(task_exec_data)
barchart.renderers = renderers
return barchart

def _build_senddata_chart(self, senddata: defaultdict[str, list]) -> figure:
piedata = {}
piedata["activity"] = senddata["activity"]
piedata["value"] = [
(sum(senddata[f"{op}_{self.unit_selected}"])) for op in senddata["activity"]
]
piedata["text"] = [self.format(self.unit_selected, v) for v in piedata["value"]]
piedata["angle"] = [
(
(sum(senddata[f"{op}_{self.unit_selected}"]) / sum(piedata["value"]))
if sum(piedata["value"])
else 0.0
)
* 2
* math.pi
for op in piedata["activity"]
]
piedata["color"] = small_palettes["YlGnBu"].get(len(piedata["activity"]), [])

self.sendsrc.data = piedata
senddata_piechart = figure(
height=500,
sizing_mode="scale_both",
title="Send data, by activity",
tools="hover",
tooltips="@{activity}: @text",
x_range=(-0.5, 1.0),
)
senddata_piechart.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angle", include_zero=True),
end_angle=cumsum("angle"),
line_color="white",
fill_color="color",
legend_field="activity",
source=self.sendsrc,
)
senddata_piechart.axis.axis_label = None
senddata_piechart.axis.visible = False
senddata_piechart.grid.grid_line_color = None
return senddata_piechart


class Contention(DashboardComponent):
"""
Event Loop Health (and GIL Contention, if configured)
Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Contention,
CurrentLoad,
ExceptionsTable,
FinePerformanceMetrics,
MemoryByKey,
Occupancy,
SystemMonitor,
Expand Down Expand Up @@ -112,6 +113,7 @@
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
"/individual-scheduler-system": individual_doc(SystemMonitor, 500),
"/individual-contention": individual_doc(Contention, 500),
"/individual-fine-performance-metrics": individual_doc(FinePerformanceMetrics, 500),
"/individual-profile": individual_profile_doc,
"/individual-profile-server": individual_profile_server_doc,
"/individual-gpu-memory": gpu_memory_doc,
Expand Down
16 changes: 16 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Contention,
CurrentLoad,
Events,
FinePerformanceMetrics,
Hardware,
MemoryByKey,
MemoryColor,
Expand Down Expand Up @@ -328,6 +329,21 @@ async def test_WorkersMemory(c, s, a, b):
assert all(d["width"])


@gen_cluster(client=True)
async def test_FinePerformanceMetrics(c, s, a, b):
cl = FinePerformanceMetrics(s)

futures = c.map(slowinc, range(10), delay=0.001)
await wait(futures)
await asyncio.sleep(1) # wait for metrics to arrive

assert not cl.task_exec_data

cl.update()
assert cl.task_exec_data
assert cl.task_exec_data["functions"] == ["slowinc"]


@gen_cluster(client=True)
async def test_ClusterMemory(c, s, a, b):
cl = ClusterMemory(s)
Expand Down
Loading

0 comments on commit 7f272d9

Please sign in to comment.