Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flakiness in tests caused by WindowsTime #8087

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2580,7 +2580,7 @@ async def test_no_dangling_asyncio_tasks():


@gen_cluster(client=True, Worker=NoSchedulerDelayWorker, config=NO_AMM)
async def test_task_groups(c, s, a, b):
async def test_task_groups(c, s, a, b, no_time_resync):
start = time()
da = pytest.importorskip("dask.array")
x = da.arange(100, chunks=(20,))
Expand Down Expand Up @@ -2629,7 +2629,7 @@ async def test_task_groups(c, s, a, b):


@gen_cluster(client=True, nthreads=[("", 2)], Worker=NoSchedulerDelayWorker)
async def test_task_groups_update_start_stop(c, s, a):
async def test_task_groups_update_start_stop(c, s, a, no_time_resync):
"""TaskGroup.stop increases as the tasks in the group finish.
TaskGroup.start can move backwards in the following use case:

Expand Down
10 changes: 5 additions & 5 deletions distributed/tests/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def test_no_extension(c, s, a, b):
Worker=NoSchedulerDelayWorker,
config={"optimization.fuse.active": False},
)
async def test_task_groups(c, s, a, b, release):
async def test_task_groups(c, s, a, b, release, no_time_resync):
da = pytest.importorskip("dask.array")
with span("wf"):
with span("p1"):
Expand Down Expand Up @@ -289,7 +289,7 @@ async def test_task_groups(c, s, a, b, release):


@gen_cluster(client=True, nthreads=[("", 1)], Worker=NoSchedulerDelayWorker)
async def test_before_first_task_finished(c, s, a):
async def test_before_first_task_finished(c, s, a, no_time_resync):
t0 = time()
ev = Event()
x = c.submit(ev.wait, key="x")
Expand Down Expand Up @@ -694,7 +694,7 @@ async def test_active_cpu_seconds_trivial(c, s, a, b):

@pytest.mark.parametrize("some_done", [False, True])
@gen_cluster(client=True, nthreads=[("", 2)], Worker=NoSchedulerDelayWorker)
async def test_active_cpu_seconds_not_done(c, s, a, some_done):
async def test_active_cpu_seconds_not_done(c, s, a, some_done, no_time_resync):
ev = Event()
x0 = c.submit(ev.wait, key="x-0", workers=[a.address])
if some_done:
Expand Down Expand Up @@ -722,7 +722,7 @@ async def test_active_cpu_seconds_not_done(c, s, a, some_done):


@gen_cluster(client=True, Worker=NoSchedulerDelayWorker)
async def test_active_cpu_seconds_change_nthreads(c, s, a, b):
async def test_active_cpu_seconds_change_nthreads(c, s, a, b, no_time_resync):
ev = Event()
x = c.submit(ev.wait, key="x", workers=[a.address])
await wait_for_state("x", "executing", a)
Expand Down Expand Up @@ -775,7 +775,7 @@ async def test_active_cpu_seconds_change_nthreads(c, s, a, b):


@gen_cluster(client=True, nthreads=[("", 2)], Worker=NoSchedulerDelayWorker)
async def test_active_cpu_seconds_merged(c, s, a):
async def test_active_cpu_seconds_merged(c, s, a, no_time_resync):
"""Overlapping input spans are not double-counted
Empty gap between input spans is not counted
"""
Expand Down
17 changes: 17 additions & 0 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2594,3 +2594,20 @@
@scheduler_delay.setter
def scheduler_delay(self, value):
pass


@pytest.fixture()
def no_time_resync():
"""Temporarily disable the automatic resync of distributed.metrics._WindowsTime
which, every 10 minutes, can cause time() to go backwards a few milliseconds.

On Linux and MacOSX, this fixture is a no-op.
Comment on lines +2601 to +2604
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the entire point of _WindowsTime to have a monotonic clock?

Copy link
Collaborator Author

@crusaderky crusaderky Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only on distributed.metrics.monotonic. Not on distributed.metrics.time.
time() offers no guarantee of being monotonic, on any OS. However on Linux it's very unlikely to be something that actually happens as you need to actually change the wall clock with NTP.

"""
if WINDOWS:
time() # Initialize or refresh delta
bak = time.__self__.next_resync
time.__self__.next_resync = float("inf")
yield
time.__self__.next_resync = bak

Check warning on line 2611 in distributed/utils_test.py

View check run for this annotation

Codecov / codecov/patch

distributed/utils_test.py#L2607-L2611

Added lines #L2607 - L2611 were not covered by tests
else:
yield
Loading