Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker Network Timeseries #5129

Merged
merged 17 commits into from Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
286 changes: 268 additions & 18 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -737,9 +737,12 @@ def __init__(self, scheduler, **kwargs):
"y_write": [],
"x_read": [],
"x_write": [],
"x_read_disk": [],
"x_write_disk": [],
}
)
self.root = figure(

self.bandwidth = figure(
title="Worker Network Bandwidth",
tools="",
id="bk-worker-net-bandwidth",
Expand All @@ -748,7 +751,7 @@ def __init__(self, scheduler, **kwargs):
)

# read_bytes
self.root.hbar(
self.bandwidth.hbar(
y="y_read",
right="x_read",
line_color=None,
Expand All @@ -760,7 +763,7 @@ def __init__(self, scheduler, **kwargs):
)

# write_bytes
self.root.hbar(
self.bandwidth.hbar(
y="y_write",
right="x_write",
line_color=None,
Expand All @@ -771,15 +774,55 @@ def __init__(self, scheduler, **kwargs):
source=self.source,
)

self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.yaxis.visible = False
self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024)
self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.bandwidth.xaxis.minor_tick_line_alpha = 0
self.bandwidth.x_range = Range1d(start=0)
self.bandwidth.yaxis.visible = False
self.bandwidth.ygrid.visible = False
self.bandwidth.toolbar_location = None

self.disk = figure(
title="Workers Disk",
tools="",
id="bk-workers-disk",
name="worker_disk",
**kwargs,
)

# read_bytes_disk
self.disk.hbar(
y="y_read",
right="x_read_disk",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)

# write_bytes_disk
self.disk.hbar(
y="y_write",
right="x_write_disk",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)

self.disk.axis[0].ticker = BasicTicker(**TICKS_1024)
self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.disk.xaxis.minor_tick_line_alpha = 0
self.disk.x_range = Range1d(start=0)
self.disk.yaxis.visible = False
self.disk.ygrid.visible = False
self.disk.toolbar_location = None

@without_property_validation
def update(self):
Expand All @@ -792,28 +835,235 @@ def update(self):

x_read = []
x_write = []
x_read_disk = []
x_write_disk = []

for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics["read_bytes_disk"])
x_write_disk.append(ws.metrics["write_bytes_disk"])

self.root.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.root.x_range.end,
)
if self.scheduler.workers:
self.bandwidth.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.bandwidth.x_range.end,
)

self.disk.x_range.end = max(
max(x_read_disk),
max(x_write_disk),
100_000_000,
0.95 * self.disk.x_range.end,
)
else:
self.bandwidth.x_range.end = 100_000_000
self.disk.x_range.end = 100_000_000

result = {
"y_read": y_read,
"y_write": y_write,
"x_read": x_read,
"x_write": x_write,
"x_read_disk": x_read_disk,
"x_write_disk": x_write_disk,
}

update(self.source, result)


class SystemTimeseries(DashboardComponent):
"""Timeseries for worker network bandwidth, cpu, memory and disk.

bandwidth: plots the average of read_bytes and write_bytes for the workers
as a function of time.
cpu: plots the average of cpu for the workers as a function of time.
memory: plots the average of memory for the workers as a function of time.
disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
as a function of time.

The metrics plotted come from the aggregation of
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [],
"read_bytes": [],
"write_bytes": [],
"cpu": [],
"memory": [],
"read_bytes_disk": [],
"write_bytes_disk": [],
}
)

update(self.source, self.get_data())

x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
tools = "reset, xpan, xwheel_zoom"

self.bandwidth = figure(
title="Workers Network Bandwidth",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-network-bandwidth-ts",
name="worker_network_bandwidth-timeseries",
**kwargs,
)

self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
color="red",
legend_label="read (mean)",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
color="blue",
legend_label="write (mean)",
)

self.bandwidth.legend.location = "top_left"
self.bandwidth.yaxis.axis_label = "bytes / second"
self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.bandwidth.y_range.start = 0
self.bandwidth.yaxis.minor_tick_line_alpha = 0
self.bandwidth.xgrid.visible = False

self.cpu = figure(
title="Workers CPU",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-cpu-ts",
name="worker_cpu-timeseries",
**kwargs,
)

self.cpu.line(
source=self.source,
x="time",
y="cpu",
)
self.cpu.yaxis.axis_label = "Utilization"
self.cpu.y_range.start = 0
self.cpu.yaxis.minor_tick_line_alpha = 0
self.cpu.xgrid.visible = False

self.memory = figure(
title="Workers Memory",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-memory-ts",
name="worker_memory-timeseries",
**kwargs,
)

self.memory.line(
source=self.source,
x="time",
y="memory",
)
self.memory.yaxis.axis_label = "Bytes"
self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.memory.y_range.start = 0
self.memory.yaxis.minor_tick_line_alpha = 0
self.memory.xgrid.visible = False

self.disk = figure(
title="Workers Disk",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-disk-ts",
name="worker_disk-timeseries",
**kwargs,
)

self.disk.line(
source=self.source,
x="time",
y="read_bytes_disk",
color="red",
legend_label="read (mean)",
)
self.disk.line(
source=self.source,
x="time",
y="write_bytes_disk",
color="blue",
legend_label="write (mean)",
)

self.disk.legend.location = "top_left"
self.disk.yaxis.axis_label = "bytes / second"
self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.disk.y_range.start = 0
self.disk.yaxis.minor_tick_line_alpha = 0
self.disk.xgrid.visible = False

def get_data(self):
workers = self.scheduler.workers.values()

read_bytes = 0
write_bytes = 0
cpu = 0
memory = 0
read_bytes_disk = 0
write_bytes_disk = 0
time = 0
for ws in workers:
read_bytes += ws.metrics["read_bytes"]
write_bytes += ws.metrics["write_bytes"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics["read_bytes_disk"]
write_bytes_disk += ws.metrics["write_bytes_disk"]
time += ws.metrics["time"]

result = {
# use `or` to avoid ZeroDivision when no workers
"time": [time / (len(workers) or 1) * 1000],
"read_bytes": [read_bytes / (len(workers) or 1)],
"write_bytes": [write_bytes / (len(workers) or 1)],
"cpu": [cpu / (len(workers) or 1)],
"memory": [memory / (len(workers) or 1)],
"read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
"write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
}
return result

@without_property_validation
def update(self):
with log_errors():
self.source.stream(self.get_data(), 1000)

if self.scheduler.workers:
y_end_cpu = sum(
ws.nthreads or 1 for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
y_end_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
else:
y_end_cpu = 1
y_end_mem = 100_000_000

self.cpu.y_range.end = y_end_cpu * 100
self.memory.y_range.end = y_end_mem


class ComputePerKey(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""

Expand Down
18 changes: 17 additions & 1 deletion distributed/dashboard/scheduler.py
Expand Up @@ -15,6 +15,7 @@
MemoryByKey,
Occupancy,
SystemMonitor,
SystemTimeseries,
TaskGraph,
TaskGroupGraph,
TaskProgress,
Expand Down Expand Up @@ -69,7 +70,22 @@
"/individual-bandwidth-types": individual_doc(BandwidthTypes, 500),
"/individual-bandwidth-workers": individual_doc(BandwidthWorkers, 500),
"/individual-workers-network-bandwidth": individual_doc(
WorkerNetworkBandwidth, 500
WorkerNetworkBandwidth, 500, fig_attr="bandwidth"
),
"/individual-workers-disk": individual_doc(
WorkerNetworkBandwidth, 500, fig_attr="disk"
),
"/individual-workers-network-bandwidth-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="bandwidth"
),
"/individual-workers-cpu-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="cpu"
),
"/individual-workers-memory-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="memory"
),
"/individual-workers-disk-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="disk"
),
"/individual-memory-by-key": individual_doc(MemoryByKey, 500),
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
Expand Down