Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio exception when monitoring distributed compute #8382

Open
data2code opened this issue Dec 1, 2023 · 1 comment
Open

asyncio exception when monitoring distributed compute #8382

data2code opened this issue Dec 1, 2023 · 1 comment

Comments

@data2code
Copy link

Describe the issue:
In the example code below, I try to monitor the progress of a cluster compute.
It does return the right results, but there is always an exception as shown below. This exception cannot be suppressed by try/except wrapper I add.

(1, 1, 1)
2023-11-30 21:27:09,396 - distributed.scheduler - ERROR -
Traceback (most recent call last):
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
return await func(*args, **kwargs)
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
await asyncio.sleep(interval)
File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
return await future
asyncio.exceptions.CancelledError

Minimal Complete Verifiable Example:

from time import sleep
import dask
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster
from dask import compute, persist, delayed

@dask.delayed
def f():
    sleep(1)
    return 1

cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(1)
c = Client(cluster)
out=[ f() ]*3

try:
    x=persist(*out)
    progress(x)
    out=compute(x)
except:
    print("error")

c.close()
print(out[0])

Anything else we need to know?:

Environment:

  • Dask version: 2023.11.0
  • Python version: 3.9.18
  • Operating System: Red Hat Enterprise Linux release 8.5 (Ootpa)
  • Install method (conda, pip, source):
    mamba install -c conda-forge dask
    mamba install -c conda-forge dask-jobqueue
@data2code
Copy link
Author

data2code commented Dec 1, 2023

I tried this barebone version, still got the exception that I cannot catch and cannot suppress. I have also reinstalled dask:
pip3 install "dask[complete]" --upgrade --force-reinstall

Really appreciate your insights.


from time import sleep
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster

def f(x):
    sleep(1)
    return 1

cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(3)
c = Client(cluster)

x=c.map(f, range(3))
progress(x)
out=c.gather(x)

c.close()
print(out)

Output:

[1, 1, 1]
2023-12-01 14:14:52,476 - distributed.scheduler - ERROR -
Traceback (most recent call last):
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
    await asyncio.sleep(interval)
  File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
    return await future
asyncio.exceptions.CancelledError

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant