Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P with null partitions fails #8092

Closed
Tracked by #8043
jrbourbeau opened this issue Aug 9, 2023 · 0 comments · Fixed by #8116
Closed
Tracked by #8043

P2P with null partitions fails #8092

jrbourbeau opened this issue Aug 9, 2023 · 0 comments · Fixed by #8116
Assignees
Labels

Comments

@jrbourbeau
Copy link
Member

I came across a use case where P2P shuffling fails when a column in a partition has all null values. Here's a minimal reproducer:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

def make_partition(i):
    """Return null column for one partition"""
    if i == 1:
        return pd.DataFrame({"a": np.random.random(10), "b": None})
    return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)})


if __name__ == "__main__":
    with Client() as client:
        ddf = dd.from_map(make_partition, range(10))
        result = ddf.set_index("a", shuffle="p2p").compute()
        print(result)

which raises the following error:

Traceback (most recent call last):
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_shuffle.py", line 96, in shuffle_barrier
    return _get_worker_plugin().barrier(id, run_ids)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_worker_plugin.py", line 925, in barrier
    result = sync(self.worker.loop, self._barrier, shuffle_id, run_ids)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/utils.py", line 426, in sync
    raise exc.with_traceback(tb)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/utils.py", line 399, in f
    result = yield future
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_worker_plugin.py", line 689, in _barrier
    await shuffle.barrier()
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_worker_plugin.py", line 116, in barrier
    await self.scheduler.shuffle_barrier(id=self.id, run_id=self.run_id)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/core.py", line 1374, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/core.py", line 1158, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/core.py", line 930, in _handle_comm
    result = await result
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_scheduler_plugin.py", line 139, in barrier
    await self.scheduler.broadcast(
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/scheduler.py", line 6169, in broadcast
    results = await All(
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/utils.py", line 252, in All
    result = await tasks.next()
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/scheduler.py", line 6147, in send_message
    resp = await send_recv(
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/core.py", line 1160, in send_recv
    raise Exception(response["exception_text"])
Exception: ArrowInvalid('Schema at index 1 was different: \na: double\nb: null\n_partitions: int64\n__index_level_0__: int64\nvs\na: double\nb: double\n_partitions: int64\n__index_level_0__: int64')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/james/projects/dask/dask/test-p2p-shuffle.py", line 16, in <module>
    result = ddf.set_index("a", shuffle="p2p").compute()
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/distributed/shuffle/_shuffle.py", line 98, in shuffle_barrier
    raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e
RuntimeError: shuffle_barrier failed during shuffle 2b30bc4838ba6b632ee7d432b2b31dc8

Interestingly this snippet usually fails, but I happened to notice it sometimes runs successfully which is in itself is also interesting

                 b
a
0.015788  0.677673
0.019857  0.481580
0.027898  0.564877
0.031679  0.442530
0.048167  0.990417
...            ...
0.957410  0.651139
0.969251       NaN
0.976877  0.369628
0.984942       NaN
0.999345  0.926310

[100 rows x 1 columns]

cc @hendrikmakait for visibility

@hendrikmakait hendrikmakait self-assigned this Aug 10, 2023
@hendrikmakait hendrikmakait changed the title P2P will null partitions fails P2P with null partitions fails Aug 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants