Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asynchronous: bool flag to get_client #7658

Open
fjetter opened this issue Mar 16, 2023 · 3 comments · May be fixed by #7871
Open

Add asynchronous: bool flag to get_client #7658

fjetter opened this issue Mar 16, 2023 · 3 comments · May be fixed by #7871
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Mar 16, 2023

Part of our API are various utility methods that are supposed to return the current, the default or a new client.

Top-level API is worker.py::get_client

def get_client(address=None, timeout=None, resolve_address=True) -> Client:
"""Get a client while within a task.
This client connects to the same scheduler to which the worker is connected
Parameters
----------
address : str, optional
The address of the scheduler to connect to. Defaults to the scheduler
the worker is connected to.
timeout : int or str
Timeout (in seconds) for getting the Client. Defaults to the
``distributed.comm.timeouts.connect`` configuration value.
resolve_address : bool, default True
Whether to resolve `address` to its canonical form.
Returns
-------
Client
Examples
--------
>>> def f():
... client = get_client(timeout="10s")
... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks
... results = client.gather(futures)
... return sum(results)
>>> future = client.submit(f) # doctest: +SKIP
>>> future.result() # doctest: +SKIP
55
See Also
--------
get_worker
worker_client
secede
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, "s")
if address and resolve_address:
address = comm_resolve_address(address)
try:
worker = get_worker()
except ValueError: # could not find worker
pass
else:
if not address or worker.scheduler.address == address:
return worker._get_client(timeout=timeout)
from distributed.client import Client
try:
client = Client.current() # TODO: assumes the same scheduler
except ValueError:
client = None
if client and (not address or client.scheduler.address == address):
return client
elif address:
return Client(address, timeout=timeout)
else:
raise ValueError("No global client found and no address provided")

When this function is being used from an asynchronous context the returned client object is asynchronous=False by default and will basically be useless.
For this purpose, it would be helpful to add this as an argument to get_client

My proposal would be, in the case of a default or current client to raise a RuntimeError in case the default/current doesn't correspond to the appropriate asnychronous setting.

This relates to PrefectHQ/prefect#12971

@scharlottej13
Copy link
Contributor

Thanks again for opening this issue @fjetter! Any idea on when someone might have bandwidth to work on this?

@dchudz
Copy link
Contributor

dchudz commented May 16, 2023

When this function is being used from an asynchronous context

In my experience, it's not just that, but also this function doesn't work at all from an async task.

So unless I'm doing something silly, making it work is also part of the work here.

@milesgranger milesgranger self-assigned this May 25, 2023
@milesgranger milesgranger linked a pull request May 31, 2023 that will close this issue
2 tasks
@shughes-uk
Copy link
Contributor

I got hit by this today, would be nice to merge the fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants