Reopen and rescope of #2336
When a Future is wrapped into a dask collection and then passed as data through client.submit or similar, it is unpickled on the Worker and attached to the first available Client. This does not necessarily match the Client that the user desires to associate it to.
from distributed import Client
client = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
def remote(df):
# Crashes with 'Inputs contain futures that were created by another client'
df.compute()
future = client.submit(remote, df)
The specific issue is caused by the scheduler being defined by hostname on one side and by IP on another - which has already been addressed by #3729 for Client.get_dataset() specifically.
There is a broader issue however that any given process may legitimately have more than one Client open, or a Client that does not match the default created by dask. For example, I personally use an asynchronous Client running on uvloop on my workers.
Proposed solution:
-
In Future.__init__, set Future._client to
-- the explicit client parameter, if not None,
-- failing that, the Client from Client.as_current(),
-- failing that, the default client,
-- failing that, None.
Currently:
-- try using the Client from Client.as_current(),
-- failing that, Future.client = the latest initialised Client instance, even if it was created with set_as_default=False
-
Change Future.client to a property, which returns
-- Future._client, if not None,
-- failing that, the Client from Client.as_current(),
-- failing that, the default client,
-- failing that, raise ValueError.
-
In Future.__getstate__, don't pickle the scheduler address - it causes just too much trouble!
-
In Future.__setstate__, implement the same logic as in Future.__init__ (except that you'll never going to have an explicit client parameter).
The above snippet would have to be changed as follows in order to work:
def remote(df):
distributed.get_client() # Creates a Client with set_as_default=True)
df.compute()
or better, explicitly:
def remote(df):
with distributed.get_client().as_current():
df.compute()
The notable change, which may possibly break stuff for some people, is that manually creating a Future won't hold a reference on cluster-side data anymore when there is no default client, which in turn may cause data to be deallocated. I don't think it should be a big problem as it will only affect people who explicitly created their Client with set_as_default=False, so they probably know what they're doing already.
Reopen and rescope of #2336
When a Future is wrapped into a dask collection and then passed as data through client.submit or similar, it is unpickled on the Worker and attached to the first available Client. This does not necessarily match the Client that the user desires to associate it to.
The specific issue is caused by the scheduler being defined by hostname on one side and by IP on another - which has already been addressed by #3729 for Client.get_dataset() specifically.
There is a broader issue however that any given process may legitimately have more than one Client open, or a Client that does not match the default created by dask. For example, I personally use an asynchronous Client running on uvloop on my workers.
Proposed solution:
In
Future.__init__, setFuture._clientto-- the explicit client parameter, if not None,
-- failing that, the Client from Client.as_current(),
-- failing that, the default client,
-- failing that,
None.Currently:
-- try using the Client from Client.as_current(),
-- failing that, Future.client = the latest initialised Client instance, even if it was created with
set_as_default=FalseChange
Future.clientto a property, which returns--
Future._client, if not None,-- failing that, the Client from Client.as_current(),
-- failing that, the default client,
-- failing that, raise ValueError.
In
Future.__getstate__, don't pickle the scheduler address - it causes just too much trouble!In
Future.__setstate__, implement the same logic as inFuture.__init__(except that you'll never going to have an explicit client parameter).The above snippet would have to be changed as follows in order to work:
or better, explicitly:
The notable change, which may possibly break stuff for some people, is that manually creating a Future won't hold a reference on cluster-side data anymore when there is no default client, which in turn may cause data to be deallocated. I don't think it should be a big problem as it will only affect people who explicitly created their Client with
set_as_default=False, so they probably know what they're doing already.