Skip to content

Commit

Permalink
Fix null partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 17, 2023
1 parent 8645c77 commit 6243868
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
4 changes: 3 additions & 1 deletion distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def list_of_buffers_to_table(data: list[bytes]) -> pa.Table:
"""Convert a list of arrow buffers and a schema to an Arrow Table"""
import pyarrow as pa

return pa.concat_tables(deserialize_table(buffer) for buffer in data)
return pa.concat_tables(
(deserialize_table(buffer) for buffer in data), promote=True
)

Check warning on line 84 in distributed/shuffle/_arrow.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_arrow.py#L84

Added line #L84 was not covered by tests


def serialize_table(table: pa.Table) -> bytes:
Expand Down
23 changes: 23 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key
from distributed.worker import Status

np = pytest.importorskip("numpy")
pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")

Expand Down Expand Up @@ -2057,6 +2058,28 @@ async def test_handle_null_partitions_p2p_shuffling(c, s, *workers):
await check_scheduler_cleanup(s)


@gen_cluster(client=True)
async def test_handle_null_partitions_p2p_shuffling_2(c, s, a, b):
def make_partition(i):
"""Return null column for one partition"""
if i % 2 == 1:
return pd.DataFrame({"a": np.random.random(10), "b": [None] * 10})
return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)})

ddf = dd.from_map(make_partition, range(50))
out = ddf.shuffle(on="a", shuffle="p2p", ignore_index=True)
result, expected = c.compute([ddf, out])
del out
result = await result
expected = await expected
dd.assert_eq(result, expected)
del result

await check_worker_cleanup(a)
await check_worker_cleanup(b)
await check_scheduler_cleanup(s)


@gen_cluster(client=True)
async def test_set_index_p2p(c, s, *workers):
df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": 1})
Expand Down

0 comments on commit 6243868

Please sign in to comment.