From b0b8e95bf0e7e8fae44ca019652a394dcca92353 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 24 Jan 2022 14:32:29 +0000 Subject: [PATCH] Code review --- distributed/tests/test_worker.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 720e3dc001..b6c4961ff6 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1635,12 +1635,10 @@ async def test_close_gracefully(c, s, a, b): # be replicated by retire_workers(). while True: mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} - if len(mem) >= 8: + if len(mem) >= 8 and any(ts.state == "executing" for ts in b.tasks.values()): break await asyncio.sleep(0.01) - assert any(ts for ts in b.tasks.values() if ts.state == "executing") - await b.close_gracefully() assert b.status == Status.closed @@ -1663,16 +1661,22 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.slow @gen_cluster(client=True, nthreads=[("", 1)], timeout=10) async def test_lifetime(c, s, a): + # Note: test was occasionally failing with lifetime="1 seconds" async with Worker(s.address, lifetime="2 seconds") as b: futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) - await asyncio.sleep(1) - assert not a.data + # Note: keys will appear in b.data several milliseconds before they switch to # status=memory in s.tasks. It's important to sample the in-memory keys from the # scheduler side, because those that the scheduler thinks are still processing # won't be replicated by retire_workers(). - mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} - assert mem + while True: + mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} + if len(mem) >= 8: + break + await asyncio.sleep(0.01) + + assert b.status == Status.running + assert not a.data while b.status != Status.closed: await asyncio.sleep(0.01)