Skip to content

Cannot call compute() on a dask dataframe when inside a task #3791

Description

@pborgen

Below is the code to reproduce. I am running this with the latest version. Specified below.

I have a dedicated vm for the scheduler. I have other vm's for the workers. I have seen this pass before when I only have one worker running. Seems to always fail when I have 2 workers.

Related Issue:
#2336

Version:
dask==2.16.0
distributed==2.16.0

How I start the scheduler:
dask-scheduler

How I start my workers:
dask-worker tcp://x.x.x.x:8786 --nanny-port 8001 --worker-port 8002 --no-dashboard

from distributed import Client, get_client
import dask

if __name__ == '__main__':

    def getCompute(id):
        ddf = get_client().get_dataset("foo")

        #filter
        return ddf[ddf.id == id].compute()

    daskClient = Client('x.x.x.x:8786')
    daskClient.restart()

    ddf = dask.datasets.timeseries()
    ddf = ddf.persist()
    daskClient.publish_dataset(foo=ddf)

    #get id's
    id_list = ddf['id'].unique().compute()

    futures = []
    try:
        for id in id_list:
            futures.append(daskClient.submit(getCompute, id))
    finally:
        daskClient.unpublish_dataset("foo")

    results = daskClient.gather(futures)

Traceback (most recent call last):
  File "D:/dev/code/netsense.support/facts_ingester/facts_ingester/test.py", line 29, in <module>
    results = daskClient.gather(futures)
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 1961, in gather
    return self.sync(
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 815, in sync
    return sync(
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\utils.py", line 331, in f
    result[0] = yield future
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 1826, in _gather
    raise exception.with_traceback(traceback)
  File "D:/dev/code/netsense.support/facts_ingester/facts_ingester/test.py", line 10, in getCompute
    return ddf[ddf.id == id].compute()
  File "c:\python38\lib\site-packages\dask\base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "c:\python38\lib\site-packages\dask\base.py", line 444, in compute
    results = schedule(dsk, keys, **kwargs)
  File "c:\python38\lib\site-packages\distributed\client.py", line 2646, in get
    used as an optimization to avoid recomputation.
  File "c:\python38\lib\site-packages\distributed\client.py", line 2543, in _graph_to_futures
    
ValueError: Inputs contain futures that were created by another client.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions