Skip to content

Commit

Permalink
Fix flakiness in test_spans
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Aug 24, 2023
1 parent 22eb33a commit 52286b7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
14 changes: 1 addition & 13 deletions distributed/tests/test_scheduler.py
Expand Up @@ -66,6 +66,7 @@
gen_test,
inc,
nodebug,
padded_time,
raises_with_cause,
slowadd,
slowdec,
Expand Down Expand Up @@ -2622,19 +2623,6 @@ async def test_task_groups(c, s, a, b, no_time_resync):
assert "compute" in tg.all_durations


async def padded_time(before=0.01, after=0.01):
"""Sample time(), preventing millisecond-magnitude corrections in the wall clock in
from disrupting monotonicity tests (t0 < t1 < t2 < ...).
This prevents frequent flakiness on Windows and, more rarely, in Linux and
MacOSX (NoSchedulerDelayWorker and no_time_resync help, but aren't sufficient
on their own to ensure stability).
"""
await asyncio.sleep(before)
t = time()
await asyncio.sleep(after)
return t


@gen_cluster(client=True, nthreads=[("", 2)], Worker=NoSchedulerDelayWorker)
async def test_task_groups_update_start_stop(c, s, a, no_time_resync):
"""TaskGroup.stop increases as the tasks in the group finish.
Expand Down
15 changes: 8 additions & 7 deletions distributed/tests/test_spans.py
Expand Up @@ -11,12 +11,12 @@
from distributed import Client, Event, Future, Worker, span, wait
from distributed.compatibility import WINDOWS
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.metrics import time
from distributed.utils_test import (
NoSchedulerDelayWorker,
async_poll_for,
gen_cluster,
inc,
padded_time,
slowinc,
wait_for_state,
)
Expand Down Expand Up @@ -216,6 +216,8 @@ async def test_no_extension(c, s, a, b):
)
async def test_task_groups(c, s, a, b, release, no_time_resync):
da = pytest.importorskip("dask.array")
t0 = await padded_time(before=0)

with span("wf"):
with span("p1"):
a = da.ones(10, chunks=5, dtype="int64") + 1
Expand All @@ -224,9 +226,8 @@ async def test_task_groups(c, s, a, b, release, no_time_resync):
a = a.sum() # A TaskGroup attached directly to a non-leaf Span
finalizer = c.compute(a)

t0 = time()
assert await finalizer == 40
t1 = time()
t1 = await padded_time(after=0)

if release:
# Test that the information in the Spans survives the tasks
Expand Down Expand Up @@ -284,12 +285,12 @@ async def test_task_groups(c, s, a, b, release, no_time_resync):

@gen_cluster(client=True, nthreads=[("", 1)], Worker=NoSchedulerDelayWorker)
async def test_before_first_task_finished(c, s, a, no_time_resync):
t0 = time()
t0 = await padded_time(before=0)
ev = Event()
x = c.submit(ev.wait, key="x")
await wait_for_state("x", "executing", a)
sp = s.extensions["spans"].spans_search_by_name["default",][-1]
t1 = time()
t1 = await padded_time()
assert t0 < sp.enqueued < t1
assert sp.start == 0
assert t1 < sp.stop < t1 + 1
Expand All @@ -299,7 +300,7 @@ async def test_before_first_task_finished(c, s, a, no_time_resync):

await ev.set()
await x
t2 = time()
t2 = await padded_time()
assert t0 < sp.enqueued < sp.start < t1 < sp.stop < t2
assert sp.duration > 0
assert sp.all_durations["compute"] > 0
Expand Down Expand Up @@ -695,7 +696,7 @@ async def test_active_cpu_seconds_not_done(c, s, a, some_done, no_time_resync):

span = s.extensions["spans"].spans_search_by_name["default",][0]
assert not span.done
now = time()
now = await padded_time()

intervals = span.nthreads_intervals
assert len(intervals) == 1
Expand Down
27 changes: 27 additions & 0 deletions distributed/utils_test.py
Expand Up @@ -2601,6 +2601,11 @@ class NoSchedulerDelayWorker(Worker):
This worker class is useful for some tests which make time
comparisons using times reported from workers.
See also
--------
no_time_resync
padded_time
"""

@property
Expand All @@ -2618,6 +2623,11 @@ def no_time_resync():
which, every 10 minutes, can cause time() to go backwards a few milliseconds.
On Linux and MacOSX, this fixture is a no-op.
See also
--------
NoSchedulerDelayWorker
padded_time
"""
if WINDOWS:
time() # Initialize or refresh delta
Expand All @@ -2627,3 +2637,20 @@ def no_time_resync():
time.__self__.next_resync = bak
else:
yield


async def padded_time(before=0.01, after=0.01):
"""Sample time(), preventing millisecond-magnitude corrections in the wall clock in
from disrupting monotonicity tests (t0 < t1 < t2 < ...).
This prevents frequent flakiness on Windows and, more rarely, in Linux and
MacOSX.
See also
--------
NoSchedulerDelayWorker
no_time_resync
"""
await asyncio.sleep(before)
t = time()
await asyncio.sleep(after)
return t

Check warning on line 2656 in distributed/utils_test.py

View check run for this annotation

Codecov / codecov/patch

distributed/utils_test.py#L2653-L2656

Added lines #L2653 - L2656 were not covered by tests

0 comments on commit 52286b7

Please sign in to comment.