From 731d13223c7af8d62d797a4df169898030f57a60 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 21 Jan 2022 12:12:03 +0000 Subject: [PATCH 1/5] Fix flaky test_close_gracefully and test_lifetime --- distributed/tests/test_worker.py | 44 +++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index db580920d8a..3a4a784e765 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1627,32 +1627,52 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker): @gen_cluster(client=True) async def test_close_gracefully(c, s, a, b): - futures = c.map(slowinc, range(200), delay=0.1) + futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) - while not b.data: + # Note: keys will appear in b.data several milliseconds before they switch to + # status=memory in s.tasks. It's important to sample the in-memory keys from the + # scheduler side, because those that the scheduler thinks are still processing won't + # be replicated by the Active Memory Manager. + while True: + mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} + if len(mem) >= 8: + break await asyncio.sleep(0.01) - mem = set(b.data) - proc = {ts for ts in b.tasks.values() if ts.state == "executing"} - assert proc + + assert any(ts for ts in b.tasks.values() if ts.state == "executing") await b.close_gracefully() assert b.status == Status.closed assert b.address not in s.workers - assert mem.issubset(a.data.keys()) - for ts in proc: - assert ts.state in ("executing", "memory") + + # All tasks that were in memory in b have been copied over to a; + # they have not been recomputed + for key in mem: + assert_worker_story( + a.story(key), + [ + (key, "put-in-memory"), + (key, "receive-from-scatter"), + ], + strict=True, + ) + assert key in a.data @pytest.mark.slow @gen_cluster(client=True, nthreads=[]) async def test_lifetime(c, s): async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: - futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) - await asyncio.sleep(1.5) - assert b.status not in (Status.running, Status.paused) + futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) + await asyncio.sleep(0.5) + assert not a.data + assert b.data + b_keys = set(b.data) + while b.status == Status.running: + await asyncio.sleep(0.01) await b.finished() - assert set(b.data) == set(a.data) # successfully moved data over + assert b_keys.issubset(a.data) # successfully moved data over from b to a @gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) From 940bb45ba0ddffe042100dcbcaa69c2408931a88 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 21 Jan 2022 12:20:10 +0000 Subject: [PATCH 2/5] tweak comment --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 3a4a784e765..83fbe98cdaf 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1632,7 +1632,7 @@ async def test_close_gracefully(c, s, a, b): # Note: keys will appear in b.data several milliseconds before they switch to # status=memory in s.tasks. It's important to sample the in-memory keys from the # scheduler side, because those that the scheduler thinks are still processing won't - # be replicated by the Active Memory Manager. + # be replicated by retire_workers(). while True: mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} if len(mem) >= 8: From aef3b719617571cd8b37cf23d8bd9b2a6819ceac Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 21 Jan 2022 14:29:10 +0000 Subject: [PATCH 3/5] Increase resilience on slow CI --- distributed/tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 83fbe98cdaf..d9405ecfa35 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1663,9 +1663,9 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.slow @gen_cluster(client=True, nthreads=[]) async def test_lifetime(c, s): - async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: + async with Worker(s.address) as a, Worker(s.address, lifetime="2 seconds") as b: futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) - await asyncio.sleep(0.5) + await asyncio.sleep(1) assert not a.data assert b.data b_keys = set(b.data) From 7faab519e9f3a133a02f36495ed2e77a7fe8fd3f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 21 Jan 2022 14:39:41 +0000 Subject: [PATCH 4/5] harden test --- distributed/tests/test_worker.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d9405ecfa35..720e3dc0015 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1661,18 +1661,34 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[]) -async def test_lifetime(c, s): - async with Worker(s.address) as a, Worker(s.address, lifetime="2 seconds") as b: +@gen_cluster(client=True, nthreads=[("", 1)], timeout=10) +async def test_lifetime(c, s, a): + async with Worker(s.address, lifetime="2 seconds") as b: futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) await asyncio.sleep(1) assert not a.data - assert b.data - b_keys = set(b.data) - while b.status == Status.running: + # Note: keys will appear in b.data several milliseconds before they switch to + # status=memory in s.tasks. It's important to sample the in-memory keys from the + # scheduler side, because those that the scheduler thinks are still processing + # won't be replicated by retire_workers(). + mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} + assert mem + + while b.status != Status.closed: await asyncio.sleep(0.01) - await b.finished() - assert b_keys.issubset(a.data) # successfully moved data over from b to a + + # All tasks that were in memory in b have been copied over to a; + # they have not been recomputed + for key in mem: + assert_worker_story( + a.story(key), + [ + (key, "put-in-memory"), + (key, "receive-from-scatter"), + ], + strict=True, + ) + assert key in a.data @gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) From b0b8e95bf0e7e8fae44ca019652a394dcca92353 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 24 Jan 2022 14:32:29 +0000 Subject: [PATCH 5/5] Code review --- distributed/tests/test_worker.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 720e3dc0015..b6c4961ff6d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1635,12 +1635,10 @@ async def test_close_gracefully(c, s, a, b): # be replicated by retire_workers(). while True: mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} - if len(mem) >= 8: + if len(mem) >= 8 and any(ts.state == "executing" for ts in b.tasks.values()): break await asyncio.sleep(0.01) - assert any(ts for ts in b.tasks.values() if ts.state == "executing") - await b.close_gracefully() assert b.status == Status.closed @@ -1663,16 +1661,22 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.slow @gen_cluster(client=True, nthreads=[("", 1)], timeout=10) async def test_lifetime(c, s, a): + # Note: test was occasionally failing with lifetime="1 seconds" async with Worker(s.address, lifetime="2 seconds") as b: futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) - await asyncio.sleep(1) - assert not a.data + # Note: keys will appear in b.data several milliseconds before they switch to # status=memory in s.tasks. It's important to sample the in-memory keys from the # scheduler side, because those that the scheduler thinks are still processing # won't be replicated by retire_workers(). - mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} - assert mem + while True: + mem = {k for k, ts in s.tasks.items() if ts.state == "memory"} + if len(mem) >= 8: + break + await asyncio.sleep(0.01) + + assert b.status == Status.running + assert not a.data while b.status != Status.closed: await asyncio.sleep(0.01)