Skip to content

Future.result() on a worker's client unnecessarily pickles and duplicates data #8086

@crusaderky

Description

@crusaderky

In this task:

def f():
    c = get_client()
    c.gather(some_future)

If the data for the future is already on the worker, the network stack is completely sidestepped and the client just gets a reference to the python object:

In Worker.gather:

try:
local_worker = get_worker()
except ValueError:
local_worker = None
return self.sync(
self._gather,
futures,
errors=errors,
direct=direct,
local_worker=local_worker,
asynchronous=asynchronous,
)

In Worker._gather:
if local_worker: # look inside local worker
data.update(
{k: local_worker.data[k] for k in keys if k in local_worker.data}
)
keys = [k for k in keys if k not in data]

However, if you do

def f():
    c = get_client()
    some_future.result()

then this special case is not dealt with:
In Future._result:

result = await self.client._gather([self])

Reproducer

import distributed

class C:
    def __reduce__(self):
        assert False

def f():
    c = distributed.get_client()
    f = c.submit(C)
    return str(c.gather(f))

def g():
    c = distributed.get_client()
    f = c.submit(C)
    return str(f.result())

with distributed.Client(n_workers=1) as client:
    print("f", client.submit(f).result())
    print("g", client.submit(g).result())

Output:

f <__main__.C object at 0x7fcc840212e0>

2023-08-09 12:20:47,316 - distributed.protocol.pickle - ERROR - Failed to serialize <__main__.C object at 0x7fcc84038100>.
Traceback (most recent call last):
  File "distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
  File "/tmp/ipykernel_889930/1792512223.py", line 5, in __reduce__
AssertionError

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokenp3Affects a small number of users or is largely cosmetic

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions