Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle null partitions in P2P shuffling #8116

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
"""Convert a list of arrow buffers and a schema to an Arrow Table"""
import pyarrow as pa

return pa.concat_tables(deserialize_table(buffer) for buffer in data)
return pa.concat_tables(
(deserialize_table(buffer) for buffer in data), promote=True
)

Check warning on line 84 in distributed/shuffle/_arrow.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_arrow.py#L84

Added line #L84 was not covered by tests


def serialize_table(table: pa.Table) -> bytes:
Expand Down
23 changes: 23 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key
from distributed.worker import Status

np = pytest.importorskip("numpy")
pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")

Expand Down Expand Up @@ -2057,6 +2058,28 @@ async def test_handle_null_partitions_p2p_shuffling(c, s, *workers):
await check_scheduler_cleanup(s)


@gen_cluster(client=True)
async def test_handle_null_partitions_p2p_shuffling_2(c, s, a, b):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails only fails 2/3 of the time on main, but that's good enough for me.

def make_partition(i):
"""Return null column for one partition"""
if i % 2 == 1:
return pd.DataFrame({"a": np.random.random(10), "b": [None] * 10})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make the None column null explicitly, otherwise lgtm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails unless I replace b in the other case with a string. That's another issue I plan to handle. As the current test mimics the original reproducer, I will keep this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)})

ddf = dd.from_map(make_partition, range(50))
out = ddf.shuffle(on="a", shuffle="p2p", ignore_index=True)
result, expected = c.compute([ddf, out])
del out
result = await result
expected = await expected
dd.assert_eq(result, expected)
del result

await check_worker_cleanup(a)
await check_worker_cleanup(b)
await check_scheduler_cleanup(s)


@gen_cluster(client=True)
async def test_set_index_p2p(c, s, *workers):
df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": 1})
Expand Down