diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 4f731ccada..185e4e1a0f 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -41,7 +41,7 @@ def test_registered(ucx_loop): async def get_comm_pair( - listen_addr="ucx://" + HOST, listen_args=None, connect_args=None, **kwargs + listen_addr=f"ucx://{HOST}", listen_args=None, connect_args=None, **kwargs ): listen_args = listen_args or {} connect_args = connect_args or {} diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index a3c22f6ec5..2f13ed4012 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -6,7 +6,7 @@ import os from collections import OrderedDict, defaultdict from collections.abc import Iterable -from datetime import datetime +from datetime import datetime, timedelta from numbers import Number from typing import Any, TypeVar @@ -30,12 +30,14 @@ HelpTool, HoverTool, HTMLTemplateFormatter, + MultiChoice, NumberFormatter, NumeralTickFormatter, OpenURL, PanTool, Range1d, ResetTool, + Select, Tabs, TapTool, Title, @@ -44,7 +46,7 @@ ) from bokeh.models.widgets import DataTable, TableColumn from bokeh.models.widgets.markups import Div -from bokeh.palettes import Viridis11 +from bokeh.palettes import Viridis11, small_palettes from bokeh.plotting import figure from bokeh.themes import Theme from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack @@ -3375,6 +3377,297 @@ def update(self): ) +class FinePerformanceMetrics(DashboardComponent): + """ + The main overview of the Fine Performance Metrics page. + """ + + @log_errors + def __init__(self, scheduler, **kwargs): + self.scheduler = scheduler + self.senddata = defaultdict(list) + self.sendsrc = ColumnDataSource(data=dict()) + self.task_exec_data = defaultdict(list) + self.task_exec_data_limited = defaultdict(list) + self.task_exec_by_prefix_src = ColumnDataSource(data=dict()) + self.task_exec_by_activity_src = ColumnDataSource(data=dict()) + self.substantial_change = False + self.task_activities = [] + self.init_root() + + def init_root(self): + def handle_selector_chng(attr, old, new): + self.unit_selected = new + self.substantial_change = True + + self.function_selector = MultiChoice(value=[], options=[]) + self.function_selector.placeholder = "Select specific functions" + self.unit_selector = Select(title="Unit selection", options=[]) + self.unit_selector.on_change("value", handle_selector_chng) + self.unit_selected = "seconds" + self.task_exec_by_activity_chart = figure() + self.task_exec_by_prefix_chart = figure() + self.senddata_by_activity_chart = figure() + self.root = column( + self.function_selector, + self.unit_selector, + row( + [ + self.task_exec_by_prefix_chart, + self.task_exec_by_activity_chart, + self.senddata_by_activity_chart, + ], + sizing_mode="stretch_width", + ), + sizing_mode="scale_width", + ) + + def format(self, unit: str, val: Any) -> str: + formatters = {"bytes": format_bytes, "seconds": format_time} + return formatters.get(unit, str)(val) + + @without_property_validation + @log_errors + def update(self): + items = sorted( + (k, v) + for k, v in self.scheduler.cumulative_worker_metrics.items() + if isinstance(k, tuple) + ) + for (context, *parts), val in items: + if context == "get-data": + activity, unit = parts + + if unit not in self.unit_selector.options: + # note append doesn't work here + self.unit_selector.options += [unit] + + if activity not in self.senddata["activity"]: + self.substantial_change = True + self.senddata["activity"].append(activity) + + idx = self.senddata["activity"].index(activity) + while idx >= len(self.senddata[f"{activity}_{unit}"]): + self.senddata[f"{activity}_{unit}"].append(0) + self.senddata[f"{activity}_{unit}_text"].append("") + self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val) + self.senddata[f"{activity}_{unit}"][idx] = val + + elif context == "execute": + prefix, activity, unit = parts + + if unit not in self.unit_selector.options: + # note append doesn't work here + self.unit_selector.options += [unit] + + if activity not in self.task_activities: + self.substantial_change = True + self.task_activities.append(activity) + + if prefix not in self.task_exec_data["functions"]: + self.substantial_change = True + self.function_selector.options.append(prefix) + self.task_exec_data["functions"].append(prefix) + self.task_exec_data["timestamp"].append(datetime.utcnow()) + idx = self.task_exec_data["functions"].index(prefix) + + # Some function/activity combos missing, so need to keep columns aligned + for op in self.task_activities: + while len(self.task_exec_data[f"{op}_{unit}"]) != len( + self.task_exec_data["functions"] + ): + self.task_exec_data[f"{op}_{unit}"].append(0) + self.task_exec_data[f"{op}_{unit}_text"].append("") + + self.task_exec_data[f"{activity}_{unit}"][idx] = val + self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format( + unit, val + ) + + data = self.task_exec_data.copy() + # If user has manually selected function(s) then we are only showing them. + if self.function_selector.value: + indexes = [data["functions"].index(f) for f in self.function_selector.value] + for key, values in data.items(): + data[key] = [values[idx] for idx in indexes] + + # Otherwise limit those being shown which have 'expired' to be displayed + else: + cutoff = datetime.utcnow() - timedelta(seconds=10) + n_show = len([d for d in data["timestamp"] if d > cutoff]) or 5 + for key in data: + data[key] = data[key][-n_show:] + self.task_exec_data_limited = data.copy() + + # Show total number of functions to choose from + self.function_selector.title = ( + f"Filter by function ({len(self.function_selector.options)}):" + ) + + task_exec_piechart = self._build_task_execution_by_activity_chart( + self.task_exec_data_limited.copy() + ) + task_exec_barchart = self._build_task_execution_by_prefix_chart( + self.task_exec_data_limited.copy() + ) + senddata_piechart = self._build_senddata_chart(self.senddata.copy()) + + # Replacing the child causes small blips if done every iteration vs updating + # renderers, but it's needed when new functions and/or activities show up to + # rerender plot + if self.substantial_change: + self.root.children[-1].children[0] = task_exec_barchart + self.root.children[-1].children[1] = task_exec_piechart + self.root.children[-1].children[2] = senddata_piechart + self.substantial_change = False + else: + self.task_exec_by_prefix_chart.renderers = task_exec_piechart.renderers + self.task_exec_by_activity_chart.renderers = task_exec_barchart.renderers + self.senddata_by_activity_chart.renderers = senddata_piechart.renderers + + def _build_task_execution_by_activity_chart( + self, task_exec_data: defaultdict[str, list] + ) -> figure: + piechart_data = {} + piechart_data["value"] = [ + sum(task_exec_data[f"{op}_{self.unit_selected}"]) + for op in self.task_activities + ] + piechart_data["text"] = [ + self.format(self.unit_selected, v) for v in piechart_data["value"] + ] + piechart_data["angle"] = [ + ( + sum(task_exec_data[f"{activity}_{self.unit_selected}"]) + / sum(piechart_data["value"]) + if sum(piechart_data["value"]) + else 0 # may not have any bytes movement reported, avoid divide by zero + ) + * 2 + * math.pi + for activity in self.task_activities + ] + piechart_data["color"] = small_palettes["YlGnBu"].get( + len(self.task_activities), [] + ) + piechart_data["activity"] = self.task_activities + self.task_exec_by_activity_src.data = piechart_data + + piechart = figure( + height=500, + sizing_mode="scale_both", + title="Task execution, by activity", + tools="hover", + tooltips="@{activity}: @text", + x_range=(-0.5, 1.0), + ) + piechart.axis.axis_label = None + piechart.axis.visible = False + piechart.grid.grid_line_color = None + + piechart.wedge( + x=0, + y=1, + radius=0.4, + start_angle=cumsum("angle", include_zero=True), + end_angle=cumsum("angle"), + line_color="white", + fill_color="color", + legend_field="activity", + source=self.task_exec_by_activity_src, + ) + return piechart + + def _build_task_execution_by_prefix_chart( + self, task_exec_data: defaultdict[str, list] + ) -> figure: + barchart = figure( + x_range=task_exec_data["functions"], + height=500, + sizing_mode="scale_both", + title="Task execution, by function", + tools="pan,wheel_zoom,box_zoom,reset", + ) + barchart.yaxis.visible = False + barchart.xaxis.major_label_orientation = 0.2 + barchart.grid.grid_line_color = None + stackers = [ + name for name in task_exec_data if name.endswith(self.unit_selected) + ] + if stackers: + renderers = barchart.vbar_stack( + stackers, + x="functions", + width=0.9, + source=self.task_exec_by_prefix_src, + color=small_palettes["YlGnBu"].get(len(self.task_activities), []), + legend_label=self.task_activities, + ) + for vbar in renderers: + tooltips = [ + ( + vbar.name, + f"@{{{vbar.name}_text}}", + ), + ("function", "@functions"), + ] + barchart.add_tools(HoverTool(tooltips=tooltips, renderers=[vbar])) + + if any( + len(self.task_exec_by_prefix_src.data[k]) != len(task_exec_data[k]) + for k in self.task_exec_by_prefix_src.data + ): + self.substantial_change = True + + self.task_exec_by_prefix_src.data = dict(task_exec_data) + barchart.renderers = renderers + return barchart + + def _build_senddata_chart(self, senddata: defaultdict[str, list]) -> figure: + piedata = {} + piedata["activity"] = senddata["activity"] + piedata["value"] = [ + (sum(senddata[f"{op}_{self.unit_selected}"])) for op in senddata["activity"] + ] + piedata["text"] = [self.format(self.unit_selected, v) for v in piedata["value"]] + piedata["angle"] = [ + ( + (sum(senddata[f"{op}_{self.unit_selected}"]) / sum(piedata["value"])) + if sum(piedata["value"]) + else 0.0 + ) + * 2 + * math.pi + for op in piedata["activity"] + ] + piedata["color"] = small_palettes["YlGnBu"].get(len(piedata["activity"]), []) + + self.sendsrc.data = piedata + senddata_piechart = figure( + height=500, + sizing_mode="scale_both", + title="Send data, by activity", + tools="hover", + tooltips="@{activity}: @text", + x_range=(-0.5, 1.0), + ) + senddata_piechart.wedge( + x=0, + y=1, + radius=0.4, + start_angle=cumsum("angle", include_zero=True), + end_angle=cumsum("angle"), + line_color="white", + fill_color="color", + legend_field="activity", + source=self.sendsrc, + ) + senddata_piechart.axis.axis_label = None + senddata_piechart.axis.visible = False + senddata_piechart.grid.grid_line_color = None + return senddata_piechart + + class Contention(DashboardComponent): """ Event Loop Health (and GIL Contention, if configured) diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index cd48bc260e..4fbe8b24b1 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -21,6 +21,7 @@ Contention, CurrentLoad, ExceptionsTable, + FinePerformanceMetrics, MemoryByKey, Occupancy, SystemMonitor, @@ -112,6 +113,7 @@ "/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500), "/individual-scheduler-system": individual_doc(SystemMonitor, 500), "/individual-contention": individual_doc(Contention, 500), + "/individual-fine-performance-metrics": individual_doc(FinePerformanceMetrics, 500), "/individual-profile": individual_profile_doc, "/individual-profile-server": individual_profile_server_doc, "/individual-gpu-memory": gpu_memory_doc, diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 5abe830d6b..4a7a6f2809 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -28,6 +28,7 @@ Contention, CurrentLoad, Events, + FinePerformanceMetrics, Hardware, MemoryByKey, MemoryColor, @@ -328,6 +329,21 @@ async def test_WorkersMemory(c, s, a, b): assert all(d["width"]) +@gen_cluster(client=True) +async def test_FinePerformanceMetrics(c, s, a, b): + cl = FinePerformanceMetrics(s) + + futures = c.map(slowinc, range(10), delay=0.001) + await wait(futures) + await asyncio.sleep(1) # wait for metrics to arrive + + assert not cl.task_exec_data + + cl.update() + assert cl.task_exec_data + assert cl.task_exec_data["functions"] == ["slowinc"] + + @gen_cluster(client=True) async def test_ClusterMemory(c, s, a, b): cl = ClusterMemory(s) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index f7ce906cd5..92f72310da 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -123,9 +123,7 @@ async def test_async_and_sync_add_remove_worker(s): events = [] class MyAsyncPlugin(SchedulerPlugin): - name: str - - def __init__(self, name: str) -> None: + def __init__(self, name): super().__init__() self.name = name self.in_remove_worker = asyncio.Event() @@ -144,9 +142,7 @@ async def remove_worker(self, scheduler, worker): events.append((self.name, "remove_worker", worker)) class MySyncPlugin(SchedulerPlugin): - name: str - - def __init__(self, name: str): + def __init__(self, name): self.name = name def add_worker(self, worker, scheduler):