Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 24, 2022
1 parent 7faab51 commit b0b8e95
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions distributed/tests/test_worker.py
Expand Up @@ -1635,12 +1635,10 @@ async def test_close_gracefully(c, s, a, b):
# be replicated by retire_workers().
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
if len(mem) >= 8 and any(ts.state == "executing" for ts in b.tasks.values()):
break
await asyncio.sleep(0.01)

assert any(ts for ts in b.tasks.values() if ts.state == "executing")

await b.close_gracefully()

assert b.status == Status.closed
Expand All @@ -1663,16 +1661,22 @@ async def test_close_gracefully(c, s, a, b):
@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)], timeout=10)
async def test_lifetime(c, s, a):
# Note: test was occasionally failing with lifetime="1 seconds"
async with Worker(s.address, lifetime="2 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])
await asyncio.sleep(1)
assert not a.data

# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing
# won't be replicated by retire_workers().
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
assert mem
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
break
await asyncio.sleep(0.01)

assert b.status == Status.running
assert not a.data

while b.status != Status.closed:
await asyncio.sleep(0.01)
Expand Down

0 comments on commit b0b8e95

Please sign in to comment.