Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Event Loop intervals in dashboard plot #5964

Merged
merged 9 commits into from
Mar 25, 2022
23 changes: 20 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,20 @@ def stop():
self.periodic_callbacks["monitor"] = pc

self._last_tick = time()
measure_tick_interval = parse_timedelta(
self._tick_counter = 0
self._tick_count = 0
self._tick_count_last = time()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a monotonic timer here #4528

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use metrics.time because of windows support. If we want to switch to something else that's better I'm all in favor, but I hesitate to launch into this immediately. Instead I'd rather that we kept with the standard way of doing things, and then considered switching out to monotonic en-masse. Thoughts?

self._tick_interval = parse_timedelta(
dask.config.get("distributed.admin.tick.interval"), default="ms"
)
pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000)
self.periodic_callbacks["tick"] = pc
self._tick_interval_observed = self._tick_interval
self.periodic_callbacks["tick"] = PeriodicCallback(
self._measure_tick, self._tick_interval * 1000
)
self.periodic_callbacks["ticks"] = PeriodicCallback(
self._cycle_ticks,
parse_timedelta(dask.config.get("distributed.admin.tick.cycle")) * 1000,
)

self.thread_id = 0

Expand Down Expand Up @@ -351,6 +360,7 @@ def _measure_tick(self):
now = time()
diff = now - self._last_tick
self._last_tick = now
self._tick_counter += 1
if diff > tick_maximum_delay:
logger.info(
"Event loop was unresponsive in %s for %.2fs. "
Expand All @@ -363,6 +373,13 @@ def _measure_tick(self):
if self.digests is not None:
self.digests["tick-duration"].add(diff)

def _cycle_ticks(self):
if not self._tick_counter:
return
last, self._tick_count_last = self._tick_count_last, time()
count, self._tick_counter = self._tick_counter, 0
self._tick_interval_observed = (time() - last) / (count or 1)

@property
def address(self):
"""
Expand Down
53 changes: 53 additions & 0 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3018,6 +3018,59 @@ def update(self):
)


class EventLoop(DashboardComponent):
"""Event Loop Health"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"names": ["Scheduler", "Workers"],
"values": [0, 0],
"text": ["0", "0"],
}
)

self.root = figure(
title="Event Loop Health",
x_range=["Scheduler", "Workers"],
y_range=[
0,
parse_timedelta(dask.config.get("distributed.admin.tick.interval"))
* 25,
],
tools="",
toolbar_location="above",
**kwargs,
)
self.root.vbar(x="names", top="values", width=0.9, source=self.source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I'd like to see color as well (yellow if the interval is over a certain threshold, like the worker memory plot). I think 0.1s would be a reasonable threshold; this is asyncio's default slow_callback_duration when debug mode is turned on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the ideal. However, I'd like to get this in and focus elsewhere for now if possible.


self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = True
self.root.xgrid.visible = False

hover = HoverTool(tooltips=[("Interval", "@text s")], mode="vline")
self.root.add_tools(hover)

@without_property_validation
def update(self):
with log_errors():
s = self.scheduler

data = {
"names": ["Scheduler", "Workers"],
"values": [
s._tick_interval_observed,
sum([w.metrics["event_loop_interval"] for w in s.workers.values()])
/ (len(s.workers) or 1),
],
}
data["text"] = [format_time(x) for x in data["values"]]

update(self.source, data)


class WorkerTable(DashboardComponent):
"""Status of the current workers

Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ClusterMemory,
ComputePerKey,
CurrentLoad,
EventLoop,
MemoryByKey,
Occupancy,
SystemMonitor,
Expand Down Expand Up @@ -97,6 +98,7 @@
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
"/individual-scheduler-system": individual_doc(SystemMonitor, 500),
"/individual-event-loop": individual_doc(EventLoop, 500),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the interval for this plot correspond to the tick cycle? Otherwise it will run unnecessarily often.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty cheap. I don't think that this matters much.

"/individual-profile": individual_profile_doc,
"/individual-profile-server": individual_profile_server_doc,
"/individual-gpu-memory": gpu_memory_doc,
Expand Down
9 changes: 8 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ClusterMemory,
ComputePerKey,
CurrentLoad,
EventLoop,
Events,
MemoryByKey,
Occupancy,
Expand Down Expand Up @@ -75,7 +76,13 @@ async def test_simple(c, s, a, b):

@gen_cluster(client=True, worker_kwargs={"dashboard": True})
async def test_basic(c, s, a, b):
for component in [TaskStream, SystemMonitor, Occupancy, StealingTimeSeries]:
for component in [
TaskStream,
SystemMonitor,
Occupancy,
StealingTimeSeries,
EventLoop,
]:
ss = component(s)

ss.update()
Expand Down
5 changes: 4 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ properties:
- {type: number, minimum: 0}
- enum: [false]
description: >-
Limit of number of bytes to be spilled on disk.
Limit of number of bytes to be spilled on disk.

monitor-interval:
type: string
Expand Down Expand Up @@ -976,6 +976,9 @@ properties:
limit :
type: string
description: The time allowed before triggering a warning
cycle :
type: string
description: The time in between verifying event loop speed

max-error-length:
type: integer
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ distributed:
tick:
interval: 20ms # time between event loop health checks
limit: 3s # time allowed before triggering a warning
cycle: 1s # time between checking event loop speed

max-error-length: 10000 # Maximum size traceback after error to return
log-length: 10000 # default length of logs to keep in memory
Expand Down
30 changes: 30 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3306,3 +3306,33 @@ async def test_Worker__to_dict(c, s, a):
}
assert d["tasks"]["x"]["key"] == "x"
assert d["data"] == ["x"]


@gen_cluster(
client=True,
config={
"distributed.admin.tick.interval": "5ms",
"distributed.admin.tick.cycle": "100ms",
},
)
async def test_tick_interval(c, s, a, b):
import time

await a.heartbeat()
x = s.workers[a.address].metrics["event_loop_interval"]
assert x
assert 0.0001 < x < 1
old = a._tick_interval_observed

old_count_last = a._tick_count_last

time.sleep(0.500) # Block event loop

while a._tick_count_last == old_count_last:
await asyncio.sleep(0.01)

await a.heartbeat()
y = s.workers[a.address].metrics["event_loop_interval"]
new = a._tick_interval_observed

assert y > x
1 change: 1 addition & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ async def get_metrics(self) -> dict:
"memory": spilled_memory,
"disk": spilled_disk,
},
event_loop_interval=self._tick_interval_observed,
mrocklin marked this conversation as resolved.
Show resolved Hide resolved
)
out.update(self.monitor.recent())

Expand Down