Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask dataframe from large pandas dataframe cannot be computed on cluster #10644

Closed
yiliema opened this issue Nov 22, 2023 · 3 comments
Closed
Labels
needs triage Needs a response from a contributor

Comments

@yiliema
Copy link

yiliema commented Nov 22, 2023

It looks like when trying to compute a dask dataframe made from a large pandas dataframe on a cluster, something goes wrong (it cannot be deserialized). On my machine it happens when the pandas dataframe is larger than 2GB, and does not matter what npartition is set to. It only happens when being computed on a cluster, and it's fine when computed locally without a cluster. From the source code it looks like it's deserializing the entire dataframe on the scheduler for some reason but I could be wrong here.

This starts to occur from 2023.4.0, 2023.3.2 doesn't have this problem.

from dask.distributed import Client
import numpy as np
import dask.dataframe as dd
import pandas as pd

cl = Client()
df = pd.DataFrame(np.random.random((550000,500)))
ddf = dd.from_pandas(df, npartitions=200)
cl.gather(cl.compute(ddf))

Error message:

~/distributed/client.py:3163: UserWarning: Sending large graph of size 2.05 GiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
2023-11-22 11:26:46,425 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "~/distributed/protocol/core.py", line 160, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
ValueError: 2200046998 exceeds max_bin_len(2147483647)
2023-11-22 11:26:46,428 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "~/distributed/core.py", line 968, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "~/distributed/scheduler.py", line 5532, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "~/distributed/core.py", line 1023, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "~/distributed/comm/tcp.py", line 248, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "~/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "~/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "~/distributed/protocol/core.py", line 160, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
ValueError: 2200046998 exceeds max_bin_len(2147483647)
Task exception was never retrieved
future: <Task finished name='Task-425190' coro=<Server._handle_comm() done, defined at ~/distributed/core.py:874> exception=ValueError('2200046998 exceeds max_bin_len(2147483647)')>
Traceback (most recent call last):
  File "~/distributed/core.py", line 968, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "~/distributed/scheduler.py", line 5532, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "~/distributed/core.py", line 1023, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "~/distributed/comm/tcp.py", line 248, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "~/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "~/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "~/distributed/protocol/core.py", line 160, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
ValueError: 2200046998 exceeds max_bin_len(2147483647)
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Cell In[36], line 9
      7 df = pd.DataFrame(np.random.random((550000,500)))
      8 ddf = dd.from_pandas(df, npartitions=200)
----> 9 cl.gather(cl.compute(ddf))

File ~/distributed/client.py:2384, in Client.gather(self, futures, errors, direct, asynchronous)
   2381     local_worker = None
   2383 with shorten_traceback():
-> 2384     return self.sync(
   2385         self._gather,
   2386         futures,
   2387         errors=errors,
   2388         direct=direct,
   2389         local_worker=local_worker,
   2390         asynchronous=asynchronous,
   2391     )

File ~/distributed/client.py:2245, in Client._gather(self, futures, errors, direct, local_worker)
   2243     else:
   2244         raise exception.with_traceback(traceback)
-> 2245     raise exc
   2246 if errors == "skip":
   2247     bad_keys.add(key)

CancelledError: finalize-bd8a71172471c855bac79a88e14f8a96
  • Dask version: 2023.11.0
  • Python version: 3.11.6
  • Operating System: Linux
  • Install method (conda, pip, source): conda
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Nov 22, 2023
@fjetter
Copy link
Member

fjetter commented Nov 23, 2023

Indeed, we are deserializing the dataframe on the scheduler. This goes back to a change that was done in 2023.4.0 (dask/distributed#7564) where we started to deserialize everything on the scheduler side. This was mostly done for reduction of code complexity.

We typically strongly discourage to send such large dataframes to the scheduler but rather use an dask API to read the data instead of a pandas API (for example read_parquet, see https://docs.dask.org/en/stable/dataframe-api.html#create-dataframes for a list) The from_pandas is mostly used for examples and demos but I recommend using a dask-native API instead.

The equivalent for your toy example would be

import dask.array as da
import dask.dataframe as dd
dd.from_array(da.random.random((550000, 500)))

@mrocklin
Copy link
Member

mrocklin commented Nov 23, 2023 via email

@yiliema
Copy link
Author

yiliema commented Nov 23, 2023

Thanks, I'll work around to follow the best practices.

@yiliema yiliema closed this as completed Nov 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

3 participants