Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dashboard: Fix Fine Perf. Metrics mis-aligned ColumnData lengths #7893

Merged
merged 3 commits into from Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 21 additions & 9 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -3426,6 +3426,27 @@
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)

activities = {

Check warning on line 3430 in distributed/dashboard/components/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/dashboard/components/scheduler.py#L3430

Added line #L3430 was not covered by tests
activity
for (_, *_, activity, _), _ in items
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
if activity not in self.task_activities
}
if activities:
self.substantial_change = True
self.task_activities.extend(activities)

Check warning on line 3437 in distributed/dashboard/components/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/dashboard/components/scheduler.py#L3435-L3437

Added lines #L3435 - L3437 were not covered by tests

functions = {

Check warning on line 3439 in distributed/dashboard/components/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/dashboard/components/scheduler.py#L3439

Added line #L3439 was not covered by tests
func
for (ctx, func, *_), _ in items
if ctx == "execute" and func not in self.task_exec_data["functions"]
}
if functions:
self.substantial_change = True
self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions)
self.function_selector.options.extend(functions)
self.task_exec_data["functions"].extend(functions)

Check warning on line 3448 in distributed/dashboard/components/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/dashboard/components/scheduler.py#L3444-L3448

Added lines #L3444 - L3448 were not covered by tests

for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts
Expand All @@ -3452,15 +3473,6 @@
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.task_activities:
self.substantial_change = True
self.task_activities.append(activity)

if prefix not in self.task_exec_data["functions"]:
self.substantial_change = True
self.function_selector.options.append(prefix)
self.task_exec_data["functions"].append(prefix)
self.task_exec_data["timestamp"].append(datetime.utcnow())
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
Expand Down
63 changes: 38 additions & 25 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Expand Up @@ -53,6 +53,7 @@
from distributed.dashboard.scheduler import applications
from distributed.diagnostics.task_stream import TaskStreamPlugin
from distributed.metrics import time
from distributed.spans import span
from distributed.utils import format_dashboard_link
from distributed.utils_test import (
block_on_event,
Expand Down Expand Up @@ -329,38 +330,50 @@ async def test_WorkersMemory(c, s, a, b):
assert all(d["width"])


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={
"distributed.worker.memory.target": 0.7,
"distributed.worker.memory.spill": False,
"distributed.worker.memory.pause": False,
},
worker_kwargs={"memory_limit": 100},
)
async def test_FinePerformanceMetrics(c, s, a, b):
cl = FinePerformanceMetrics(s)

futures = c.map(slowinc, range(10), delay=0.001)
await wait(futures)
await asyncio.sleep(1) # wait for metrics to arrive
# execute on default span; multiple tasks in same TaskGroup
x0 = c.submit(inc, 0, key="x-0", workers=[a.address])
x1 = c.submit(inc, 1, key="x-1", workers=[a.address])

assert not cl.task_exec_data
# execute with spill (output size is individually larger than target)
w = c.submit(lambda: "x" * 75, key="w", workers=[a.address])

cl.update()
assert cl.task_exec_data
assert cl.task_exec_data["functions"] == ["slowinc"]
await wait([x0, x1, w])

a.data.evict()
a.data.evict()
assert not a.data.fast
assert set(a.data.slow) == {"x-0", "x-1", "w"}

@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_FinePerformanceMetrics_simulated_spill_no_crash(c, s, a, b):
metrics = {
("execute", "inc", "disk-read", "seconds"): 1.0,
("execute", "inc", "disk-read", "count"): 16.0,
("execute", "inc", "disk-read", "bytes"): 2059931767.0,
("execute", "inc", "disk-write", "seconds"): 0.12,
("execute", "inc", "disk-write", "count"): 2.0,
("execute", "inc", "disk-write", "bytes"): 268435938.0,
}
s.cumulative_worker_metrics.clear()
s.cumulative_worker_metrics.update(metrics)
http_client = AsyncHTTPClient()
response = await http_client.fetch(
f"http://localhost:{s.http_server.port}/individual-fine-performance-metrics"
)
assert response.code == 200
with span("foo"):
# execute on named span, with unspill
y0 = c.submit(inc, x0, key="y-0", workers=[a.address])
# get_data with unspill + gather_dep + execute on named span
y1 = c.submit(inc, x1, key="y-1", workers=[b.address])

# execute on named span (duplicate name, different id)
with span("foo"):
z = c.submit(inc, 3, key="z")
await wait([y0, y1, z])

await a.heartbeat()
await b.heartbeat()

assert not cl.task_exec_data
cl.update()
assert cl.task_exec_data
assert set(cl.task_exec_data["functions"]) == {"w", "x", "y", "z"}


@gen_cluster(client=True)
Expand Down