-
-
Notifications
You must be signed in to change notification settings - Fork 740
Description
What happened:
Possibly related to #2880.
I'm using client.persist
to execute delayed tasks (see rough format in the MCVE). I'll note that I get the same error using client.compute
. Despite using the same code, I recently encountered an error where I get a TimeoutError
(see traceback). The error points back to the line where I either call progress(results)
or client.gather(results)
. I've found that putting a time.sleep
between the persists
and progress
/gather
calls prevents this error, but this also seems like a poor workaround.
Traceback:
Traceback (most recent call last):
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/comm/core.py", line 320, in connect
handshake = await asyncio.wait_for(comm.read(), time_left())
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/athomson/athomson/repos/spiceracs/spiceracs/rmsynth_oncuts.py", line 956, in <module>
cli()
File "/home/athomson/athomson/repos/spiceracs/spiceracs/rmsynth_oncuts.py", line 931, in cli
main(field=args.field,
File "/home/athomson/athomson/repos/spiceracs/spiceracs/rmsynth_oncuts.py", line 803, in main
tqdm_dask(futures, desc="Running RMsynth...", disable=(not verbose))
File "/group/askap/athomson/repos/spiceracs/spiceracs/utils.py", line 61, in tqdm_dask
TqdmProgressBar(futures, **kwargs)
File "/group/askap/athomson/repos/spiceracs/spiceracs/utils.py", line 48, in __init__
loop_runner.run_sync(self.listen)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 510, in run_sync
return sync(self.loop, func, *args, **kwargs)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync
raise exc.with_traceback(tb)
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 337, in f
result[0] = yield future
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/diagnostics/progressbar.py", line 64, in listen
self.comm = await connect(
File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/comm/core.py", line 325, in connect
raise IOError(
OSError: Timed out during handshake while connecting to tcp://127.0.0.1:39589 after 10 s
/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/RMutils/util_misc.py:425: RuntimeWarning: invalid value encountered in log10
np.exp(a*lnx**5+b*lnx**4+c*lnx**3+d*lnx**2+f*lnx+np.log10(g))]
/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/RMutils/util_misc.py:425: RuntimeWarning: invalid value encountered in log10
np.exp(a*lnx**5+b*lnx**4+c*lnx**3+d*lnx**2+f*lnx+np.log10(g))]
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
What you expected to happen:
I'd expect the progress
or gather
methods to wait on the execution of the future, and not need a time.sleep
.
Minimal Complete Verifiable Example:
Whilst this matches the structure of my main script (matching the above traceback), this MCVE does not reproduce the error, unfortunately.
from dask import delayed
from dask.distributed import Client, progress, LocalCluster
import time
def inner_job(i):
return i+1
@delayed
def job(x):
time.sleep(1)
y = inner_job(x)
return y
def main(client):
njobs = int(1000)
outputs = []
for i in range(njobs):
output = job(i)
outputs.append(output)
results = client.persist(outputs)
print("Running test...")
progress(results)
def cli():
cluster = LocalCluster(n_workers=20, dashboard_address=':9999')
client = Client(cluster)
main(client)
if __name__ == "__main__":
cli()
Environment:
- Dask version: 2021.5.0
- Python version: 3.8.10
- Operating System: SUSE Linux Enterprise Server 12 SP3
- Install method (conda, pip, source): conda