Skip to content

Commit

Permalink
Revamp SystemMonitor (#7097)
Browse files Browse the repository at this point in the history
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
  • Loading branch information
crusaderky and fjetter committed Oct 6, 2022
1 parent 8a92980 commit 6ac679e
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 231 deletions.
117 changes: 66 additions & 51 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -1050,7 +1050,8 @@ def name(address):
class WorkerNetworkBandwidth(DashboardComponent):
"""Worker network bandwidth chart
Plots horizontal bars with the read_bytes and write_bytes worker state
Plots horizontal bars with the host_net_io.read_bps and host_net_io.write_bps worker
state
"""

@log_errors
Expand All @@ -1075,7 +1076,7 @@ def __init__(self, scheduler, **kwargs):
**kwargs,
)

# read_bytes
# host_net_io.read_bps
self.bandwidth.hbar(
y="y_read",
right="x_read",
Expand All @@ -1087,7 +1088,7 @@ def __init__(self, scheduler, **kwargs):
source=self.source,
)

# write_bytes
# host_net_io.write_bps
self.bandwidth.hbar(
y="y_write",
right="x_write",
Expand Down Expand Up @@ -1116,7 +1117,7 @@ def __init__(self, scheduler, **kwargs):
**kwargs,
)

# read_bytes_disk
# host_disk_io.read_bps
self.disk.hbar(
y="y_read",
right="x_read_disk",
Expand All @@ -1128,7 +1129,7 @@ def __init__(self, scheduler, **kwargs):
source=self.source,
)

# write_bytes_disk
# host_disk_io.write_bps
self.disk.hbar(
y="y_write",
right="x_write_disk",
Expand Down Expand Up @@ -1164,10 +1165,10 @@ def update(self):
x_write_disk = []

for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics.get("read_bytes_disk", 0))
x_write_disk.append(ws.metrics.get("write_bytes_disk", 0))
x_read.append(ws.metrics["host_net_io"]["read_bps"])
x_write.append(ws.metrics["host_net_io"]["write_bps"])
x_read_disk.append(ws.metrics.get("host_disk_io", {}).get("read_bps", 0))
x_write_disk.append(ws.metrics.get("host_disk_io", {}).get("write_bps", 0))

if self.scheduler.workers:
self.bandwidth.x_range.end = max(
Expand Down Expand Up @@ -1202,15 +1203,19 @@ def update(self):
class SystemTimeseries(DashboardComponent):
"""Timeseries for worker network bandwidth, cpu, memory and disk.
bandwidth: plots the average of read_bytes and write_bytes for the workers
as a function of time.
cpu: plots the average of cpu for the workers as a function of time.
memory: plots the average of memory for the workers as a function of time.
disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
as a function of time.
The metrics plotted come from the aggregation of
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
bandwidth
Plots the average of host_net_io.read_bps and host_net_io.write_bps for the
workers as a function of time
cpu
Plots the average of cpu for the workers as a function of time
memory
Plots the average of memory for the workers as a function of time
disk
Plots the average of host_disk_io.read_bps and host_disk_io.write_bps for the
workers as a function of time
The metrics plotted come from the aggregation of from ws.metrics[key] for ws in
scheduler.workers.values() divided by nuber of workers.
"""

@log_errors
Expand All @@ -1219,12 +1224,12 @@ def __init__(self, scheduler, follow_interval=20000, **kwargs):
self.source = ColumnDataSource(
{
"time": [],
"read_bytes": [],
"write_bytes": [],
"host_net_io.read_bps": [],
"host_net_io.write_bps": [],
"cpu": [],
"memory": [],
"read_bytes_disk": [],
"write_bytes_disk": [],
"host_disk_io.read_bps": [],
"host_disk_io.write_bps": [],
}
)

Expand All @@ -1248,14 +1253,14 @@ def __init__(self, scheduler, follow_interval=20000, **kwargs):
self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
y="host_net_io.read_bps",
color="red",
legend_label="read (mean)",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
y="host_net_io.write_bps",
color="blue",
legend_label="write (mean)",
)
Expand Down Expand Up @@ -1321,14 +1326,14 @@ def __init__(self, scheduler, follow_interval=20000, **kwargs):
self.disk.line(
source=self.source,
x="time",
y="read_bytes_disk",
y="host_disk_io.read_bps",
color="red",
legend_label="read (mean)",
)
self.disk.line(
source=self.source,
x="time",
y="write_bytes_disk",
y="host_disk_io.write_bps",
color="blue",
legend_label="write (mean)",
)
Expand All @@ -1343,31 +1348,31 @@ def __init__(self, scheduler, follow_interval=20000, **kwargs):
def get_data(self):
workers = self.scheduler.workers.values()

read_bytes = 0
write_bytes = 0
net_read_bps = 0
net_write_bps = 0
cpu = 0
memory = 0
read_bytes_disk = 0
write_bytes_disk = 0
disk_read_bps = 0
disk_write_bps = 0
time = 0
for ws in workers:
read_bytes += ws.metrics["read_bytes"]
write_bytes += ws.metrics["write_bytes"]
net_read_bps += ws.metrics["host_net_io"]["read_bps"]
net_write_bps += ws.metrics["host_net_io"]["write_bps"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics.get("read_bytes_disk", 0)
write_bytes_disk += ws.metrics.get("write_bytes_disk", 0)
disk_read_bps += ws.metrics.get("host_disk_io", {}).get("read_bps", 0)
disk_write_bps += ws.metrics.get("host_disk_io", {}).get("write_bps", 0)
time += ws.metrics["time"]

result = {
# use `or` to avoid ZeroDivision when no workers
"time": [time / (len(workers) or 1) * 1000],
"read_bytes": [read_bytes / (len(workers) or 1)],
"write_bytes": [write_bytes / (len(workers) or 1)],
"host_net_io.read_bps": [net_read_bps / (len(workers) or 1)],
"host_net_io.write_bps": [net_write_bps / (len(workers) or 1)],
"cpu": [cpu / (len(workers) or 1)],
"memory": [memory / (len(workers) or 1)],
"read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
"write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
"host_disk_io.read_bps": [disk_read_bps / (len(workers) or 1)],
"host_disk_io.write_bps": [disk_write_bps / (len(workers) or 1)],
}
return result

Expand Down Expand Up @@ -3523,8 +3528,10 @@ def __init__(self, scheduler, width=800, **kwargs):
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
"host_net_io.read_bps",
"host_net_io.write_bps",
"host_disk_io.read_bps",
"host_disk_io.write_bps",
"cpu_fraction",
]
workers = self.scheduler.workers.values()
Expand All @@ -3551,8 +3558,10 @@ def __init__(self, scheduler, width=800, **kwargs):
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
"host_net_io.read_bps",
"host_net_io.write_bps",
"host_disk_io.read_bps",
"host_disk_io.write_bps",
]
column_title_renames = {
"memory_limit": "limit",
Expand All @@ -3562,10 +3571,10 @@ def __init__(self, scheduler, width=800, **kwargs):
"memory_unmanaged_recent": "unmanaged recent",
"memory_spilled": "spilled",
"num_fds": "# fds",
"read_bytes": "net read",
"write_bytes": "net write",
"read_bytes_disk": "disk read",
"write_bytes_disk": "disk write",
"host_net_io.read_bps": "net read",
"host_net_io.write_bps": "net write",
"host_disk_io.read_bps": "disk read",
"host_disk_io.write_bps": "disk write",
}

self.source = ColumnDataSource({k: [] for k in self.names})
Expand All @@ -3584,12 +3593,12 @@ def __init__(self, scheduler, width=800, **kwargs):
"memory_unmanaged_old": NumberFormatter(format="0.0 b"),
"memory_unmanaged_recent": NumberFormatter(format="0.0 b"),
"memory_spilled": NumberFormatter(format="0.0 b"),
"read_bytes": NumberFormatter(format="0 b"),
"write_bytes": NumberFormatter(format="0 b"),
"host_net_io.read_bps": NumberFormatter(format="0 b"),
"host_net_io.write_bps": NumberFormatter(format="0 b"),
"num_fds": NumberFormatter(format="0"),
"nthreads": NumberFormatter(format="0"),
"read_bytes_disk": NumberFormatter(format="0 b"),
"write_bytes_disk": NumberFormatter(format="0 b"),
"host_disk_io.read_bps": NumberFormatter(format="0 b"),
"host_disk_io.write_bps": NumberFormatter(format="0 b"),
}

table = DataTable(
Expand Down Expand Up @@ -3707,7 +3716,13 @@ def update(self):
minfo = ws.memory

for name in self.names + self.extra_names:
data[name].append(ws.metrics.get(name, None))
if "." in name:
n0, _, n1 = name.partition(".")
v = ws.metrics.get(n0, {}).get(n1, None)
else:
v = ws.metrics.get(name, None)
data[name].append(v)

data["name"][-1] = ws.name if ws.name is not None else i
data["address"][-1] = ws.address
if ws.memory_limit:
Expand Down
4 changes: 2 additions & 2 deletions distributed/dashboard/components/shared.py
Expand Up @@ -504,14 +504,14 @@ def __init__(self, worker, height=150, last_count=None, **kwargs):
self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
y="host_net_io.read_bps",
color="red",
legend_label="read",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
y="host_net_io.write_bps",
color="blue",
legend_label="write",
)
Expand Down
46 changes: 26 additions & 20 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Expand Up @@ -589,14 +589,20 @@ async def test_WorkerNetworkBandwidth_metrics(c, s, a, b):
nb.update()

for idx, ws in enumerate(s.workers.values()):
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
assert (
ws.metrics.get("read_bytes_disk", 0)
ws.metrics["host_net_io"]["read_bps"]
== nb.source.data["x_read"][idx]
)
assert (
ws.metrics["host_net_io"]["write_bps"]
== nb.source.data["x_write"][idx]
)
assert (
ws.metrics.get("host_disk_io", {}).get("read_bps", 0)
== nb.source.data["x_read_disk"][idx]
)
assert (
ws.metrics.get("write_bytes_disk", 0)
ws.metrics.get("host_disk_io", {}).get("write_bps", 0)
== nb.source.data["x_write_disk"][idx]
)

Expand All @@ -617,23 +623,23 @@ async def test_SystemTimeseries(c, s, a, b):
workers = s.workers.values()

assert all(len(v) == 1 for v in systs.source.data.values())
assert systs.source.data["read_bytes"][0] == sum(
ws.metrics["read_bytes"] for ws in workers
assert systs.source.data["host_net_io.read_bps"][0] == sum(
ws.metrics["host_net_io"]["read_bps"] for ws in workers
) / len(workers)
assert systs.source.data["write_bytes"][0] == sum(
ws.metrics["write_bytes"] for ws in workers
assert systs.source.data["host_net_io.write_bps"][0] == sum(
ws.metrics["host_net_io"]["write_bps"] for ws in workers
) / len(workers)
assert systs.source.data["cpu"][0] == sum(
ws.metrics["cpu"] for ws in workers
) / len(workers)
assert systs.source.data["memory"][0] == sum(
ws.metrics["memory"] for ws in workers
) / len(workers)
assert systs.source.data["read_bytes_disk"][0] == sum(
ws.metrics["read_bytes_disk"] for ws in workers
assert systs.source.data["host_disk_io.read_bps"][0] == sum(
ws.metrics["host_disk_io"]["read_bps"] for ws in workers
) / len(workers)
assert systs.source.data["write_bytes_disk"][0] == sum(
ws.metrics["write_bytes_disk"] for ws in workers
assert systs.source.data["host_disk_io.write_bps"][0] == sum(
ws.metrics["host_disk_io"]["write_bps"] for ws in workers
) / len(workers)
assert (
systs.source.data["time"][0]
Expand All @@ -647,23 +653,23 @@ async def test_SystemTimeseries(c, s, a, b):
systs.update()

assert all(len(v) == 2 for v in systs.source.data.values())
assert systs.source.data["read_bytes"][1] == sum(
ws.metrics["read_bytes"] for ws in workers
assert systs.source.data["host_net_io.read_bps"][1] == sum(
ws.metrics["host_net_io"]["read_bps"] for ws in workers
) / len(workers)
assert systs.source.data["write_bytes"][1] == sum(
ws.metrics["write_bytes"] for ws in workers
assert systs.source.data["host_net_io.write_bps"][1] == sum(
ws.metrics["host_net_io"]["write_bps"] for ws in workers
) / len(workers)
assert systs.source.data["cpu"][1] == sum(
ws.metrics["cpu"] for ws in workers
) / len(workers)
assert systs.source.data["memory"][1] == sum(
ws.metrics["memory"] for ws in workers
) / len(workers)
assert systs.source.data["read_bytes_disk"][1] == sum(
ws.metrics["read_bytes_disk"] for ws in workers
assert systs.source.data["host_disk_io.read_bps"][1] == sum(
ws.metrics["host_disk_io"]["read_bps"] for ws in workers
) / len(workers)
assert systs.source.data["write_bytes_disk"][1] == sum(
ws.metrics["write_bytes_disk"] for ws in workers
assert systs.source.data["host_disk_io.write_bps"][1] == sum(
ws.metrics["host_disk_io"]["write_bps"] for ws in workers
) / len(workers)
assert (
systs.source.data["time"][1]
Expand Down
3 changes: 3 additions & 0 deletions distributed/distributed-schema.yaml
Expand Up @@ -1117,6 +1117,9 @@ properties:
disk:
type: boolean
description: Should we include disk metrics? (they can cause issues in some systems)
host-cpu:
type: boolean
description: Should we include host-wide CPU usage, with very granular breakdown?

rmm:
type: object
Expand Down
3 changes: 2 additions & 1 deletion distributed/distributed.yaml
Expand Up @@ -305,7 +305,8 @@ distributed:
pdb-on-err: False # enter debug mode on scheduling error
system-monitor:
interval: 500ms
disk: true
disk: true # Monitor host-wide disk I/O
host-cpu: false # Monitor host-wide CPU usage, with very granular breakdown
event-loop: tornado
rmm:
pool-size: null
4 changes: 1 addition & 3 deletions distributed/scheduler.py
Expand Up @@ -3919,13 +3919,11 @@ def heartbeat_worker(
if size == memory_unmanaged_old:
memory_unmanaged_old = 0 # recalculate min()

# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run.
# ws._nbytes is updated at a different time and sizeof() may not be accurate,
# so size may be (temporarily) negative; floor it to zero.
size = max(
0,
(metrics["memory"] or 0) - ws.nbytes + metrics["spilled_nbytes"]["memory"],
metrics["memory"] - ws.nbytes + metrics["spilled_nbytes"]["memory"],
)

ws._memory_unmanaged_history.append((local_now, size))
Expand Down

0 comments on commit 6ac679e

Please sign in to comment.