Skip to content

Commit

Permalink
Merge branch 'main' into spans_code
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 21, 2023
2 parents a09566a + 49437c2 commit 7f8c0fd
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
2 changes: 1 addition & 1 deletion distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame:
while file.tell() < end:
sr = pa.RecordBatchStreamReader(file)
shards.append(sr.read_all())
table = pa.concat_tables(shards)
table = pa.concat_tables(shards, promote=True)
df = table.to_pandas(self_destruct=True)

def default_types_mapper(pyarrow_dtype: pa.DataType) -> object:
Expand Down
18 changes: 9 additions & 9 deletions distributed/shuffle/tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ async def test_rechunk_unknown_from_array(c, s, *ws):
(da.ones(shape=(50, 10), chunks=(25, 10)), (None, 5)),
(da.ones(shape=(50, 10), chunks=(25, 10)), {1: 5}),
(da.ones(shape=(50, 10), chunks=(25, 10)), (None, (5, 5))),
(da.ones(shape=(1000, 10), chunks=(5, 10)), (None, 5)),
(da.ones(shape=(1000, 10), chunks=(5, 10)), {1: 5}),
(da.ones(shape=(1000, 10), chunks=(5, 10)), (None, (5, 5))),
(da.ones(shape=(100, 10), chunks=(5, 10)), (None, 5)),
(da.ones(shape=(100, 10), chunks=(5, 10)), {1: 5}),
(da.ones(shape=(100, 10), chunks=(5, 10)), (None, (5, 5))),
(da.ones(shape=(10, 10), chunks=(10, 10)), (None, 5)),
(da.ones(shape=(10, 10), chunks=(10, 10)), {1: 5}),
(da.ones(shape=(10, 10), chunks=(10, 10)), (None, (5, 5))),
Expand Down Expand Up @@ -633,16 +633,16 @@ async def test_rechunk_with_fully_unknown_dimension(c, s, *ws, x, chunks):
(da.ones(shape=(50, 10), chunks=(25, 10)), (None, 5)),
(da.ones(shape=(50, 10), chunks=(25, 10)), {1: 5}),
(da.ones(shape=(50, 10), chunks=(25, 10)), (None, (5, 5))),
pytest.param(
da.ones(shape=(1000, 10), chunks=(5, 10)),
(
da.ones(shape=(100, 10), chunks=(5, 10)),
(None, 5),
),
pytest.param(
da.ones(shape=(1000, 10), chunks=(5, 10)),
(
da.ones(shape=(100, 10), chunks=(5, 10)),
{1: 5},
),
pytest.param(
da.ones(shape=(1000, 10), chunks=(5, 10)),
(
da.ones(shape=(100, 10), chunks=(5, 10)),
(None, (5, 5)),
),
(da.ones(shape=(10, 10), chunks=(10, 10)), (None, 5)),
Expand Down
19 changes: 19 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,3 +1826,22 @@ async def test_closed_worker_returns_before_barrier(c, s):
await c.close()
await asyncio.gather(*[clean_worker(w) for w in workers])
await clean_scheduler(s)


@gen_cluster(client=True)
async def test_handle_null_partitions_p2p_shuffling(c, s, *workers):
data = [
{"companies": [], "id": "a", "x": None},
{"companies": [{"id": 3}, {"id": 5}], "id": "b", "x": None},
{"companies": [{"id": 3}, {"id": 4}, {"id": 5}], "id": "c", "x": "b"},
{"companies": [{"id": 9}], "id": "a", "x": "a"},
]
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True)
result = await c.compute(ddf)
dd.assert_eq(result, df)

await c.close()
await asyncio.gather(*[clean_worker(w) for w in workers])
await clean_scheduler(s)

0 comments on commit 7f8c0fd

Please sign in to comment.