Skip to content

Commit

Permalink
more robust test_broken_worker_during_computation
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Apr 11, 2024
1 parent 083fe8d commit ad0f058
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
18 changes: 9 additions & 9 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ def test_worker_doesnt_await_task_completion(loop):

@gen_cluster(Worker=Nanny, timeout=60)
async def test_multiple_clients_restart(s, a, b):
async with Client(s.address, asynchronous=True) as c1, Client(
s.address, asynchronous=True
) as c2:
async with (
Client(s.address, asynchronous=True) as c1,
Client(s.address, asynchronous=True) as c2,
):
x = c1.submit(inc, 1)
y = c2.submit(inc, 2)
xx = await x
Expand Down Expand Up @@ -288,10 +289,7 @@ async def test_forgotten_futures_dont_clean_up_new_futures(c, s, a, b):
async def test_broken_worker_during_computation(c, s, a, b):
s.allowed_failures = 100
async with Nanny(s.address, nthreads=2) as n:
start = time()
while len(s.workers) < 3:
await asyncio.sleep(0.01)
assert time() < start + 5
await c.wait_for_workers(3)

N = 256
expected_result = N * (N + 1) // 2
Expand All @@ -308,10 +306,12 @@ async def test_broken_worker_during_computation(c, s, a, b):
await asyncio.sleep(random.random() / 20)
with suppress(CommClosedError): # comm will be closed abrupty
await c.run(os._exit, 1, workers=[n.worker_address])
assert not n.process.is_alive()
while not n.process.is_alive():
await asyncio.sleep(0.01)
await c.wait_for_workers(3)

await asyncio.sleep(random.random() / 20)
while len(s.workers) < 3:
await asyncio.sleep(0.01)

with suppress(
CommClosedError, EnvironmentError
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ async def test_worker_start_exception(s):
assert not nanny.process.is_alive()
assert "Restarting worker" not in logs.getvalue()
# Avoid excessive spewing. (It's also printed once extra within the subprocess, which is okay.)
assert logs.getvalue().count("ValueError: broken") == 1, logs.getvalue()
assert logs.getvalue().count("ValueError: broken") == 2, logs.getvalue()


@gen_cluster(nthreads=[])
Expand Down

0 comments on commit ad0f058

Please sign in to comment.