Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dashboard: Fix Fine Perf. Metrics mis-aligned ColumnData lengths #7893

Merged
merged 3 commits into from Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 21 additions & 9 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -3426,6 +3426,27 @@ def update(self):
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)

activities = {
activity
for (*_, activity, _), _ in items
if activity not in self.task_activities
}
if activities:
self.substantial_change = True
self.task_activities.extend(activities)

functions = {
func
for (ctx, func, *_), _ in items
if ctx == "execute" and func not in self.task_exec_data["functions"]
}
if functions:
self.substantial_change = True
self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions)
self.function_selector.options.extend(functions)
self.task_exec_data["functions"].extend(functions)

for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts
Expand All @@ -3452,15 +3473,6 @@ def update(self):
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.task_activities:
self.substantial_change = True
self.task_activities.append(activity)

if prefix not in self.task_exec_data["functions"]:
self.substantial_change = True
self.function_selector.options.append(prefix)
self.task_exec_data["functions"].append(prefix)
self.task_exec_data["timestamp"].append(datetime.utcnow())
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
Expand Down
63 changes: 38 additions & 25 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Expand Up @@ -53,6 +53,7 @@
from distributed.dashboard.scheduler import applications
from distributed.diagnostics.task_stream import TaskStreamPlugin
from distributed.metrics import time
from distributed.spans import span
from distributed.utils import format_dashboard_link
from distributed.utils_test import (
block_on_event,
Expand Down Expand Up @@ -329,38 +330,50 @@ async def test_WorkersMemory(c, s, a, b):
assert all(d["width"])


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={
"distributed.worker.memory.target": 0.7,
"distributed.worker.memory.spill": False,
"distributed.worker.memory.pause": False,
},
worker_kwargs={"memory_limit": 100},
)
async def test_FinePerformanceMetrics(c, s, a, b):
cl = FinePerformanceMetrics(s)

futures = c.map(slowinc, range(10), delay=0.001)
await wait(futures)
await asyncio.sleep(1) # wait for metrics to arrive
# execute on default span; multiple tasks in same TaskGroup
x0 = c.submit(inc, 0, key="x-0", workers=[a.address])
x1 = c.submit(inc, 1, key="x-1", workers=[a.address])

assert not cl.task_exec_data
# execute with spill (output size is individually larger than target)
w = c.submit(lambda: "x" * 75, key="w", workers=[a.address])

cl.update()
assert cl.task_exec_data
assert cl.task_exec_data["functions"] == ["slowinc"]
await wait([x0, x1, w])

a.data.evict()
a.data.evict()
assert not a.data.fast
assert set(a.data.slow) == {"x-0", "x-1", "w"}

@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_FinePerformanceMetrics_simulated_spill_no_crash(c, s, a, b):
metrics = {
("execute", "inc", "disk-read", "seconds"): 1.0,
("execute", "inc", "disk-read", "count"): 16.0,
("execute", "inc", "disk-read", "bytes"): 2059931767.0,
("execute", "inc", "disk-write", "seconds"): 0.12,
("execute", "inc", "disk-write", "count"): 2.0,
("execute", "inc", "disk-write", "bytes"): 268435938.0,
}
s.cumulative_worker_metrics.clear()
s.cumulative_worker_metrics.update(metrics)
http_client = AsyncHTTPClient()
response = await http_client.fetch(
f"http://localhost:{s.http_server.port}/individual-fine-performance-metrics"
)
assert response.code == 200
with span("foo"):
# execute on named span, with unspill
y0 = c.submit(inc, x0, key="y-0", workers=[a.address])
# get_data with unspill + gather_dep + execute on named span
y1 = c.submit(inc, x1, key="y-1", workers=[b.address])

# execute on named span (duplicate name, different id)
with span("foo"):
z = c.submit(inc, 3, key="z")
await wait([y0, y1, z])

await a.heartbeat()
await b.heartbeat()

assert not cl.task_exec_data
cl.update()
assert cl.task_exec_data
assert set(cl.task_exec_data["functions"]) == {"w", "x", "y", "z"}


@gen_cluster(client=True)
Expand Down