Skip to content

Commit

Permalink
Merge with P2P shuffle fails if left_index or right_index is True (#8121
Browse files Browse the repository at this point in the history
)
  • Loading branch information
phofl committed Aug 21, 2023
1 parent 716d526 commit acb2809
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
20 changes: 14 additions & 6 deletions distributed/shuffle/_merge.py
Expand Up @@ -73,20 +73,22 @@ def hash_join_p2p(
npartitions = max(lhs.npartitions, rhs.npartitions)

if isinstance(left_on, Index):
left_on = None
_left_on = None
left_index = True
else:
left_index = False
_left_on = left_on

if isinstance(right_on, Index):
right_on = None
_right_on = None
right_index = True
else:
right_index = False
_right_on = right_on
merge_kwargs = dict(
how=how,
left_on=left_on,
right_on=right_on,
left_on=_left_on,
right_on=_right_on,
left_index=left_index,
right_index=right_index,
suffixes=suffixes,
Expand All @@ -104,11 +106,11 @@ def hash_join_p2p(
name=merge_name,
name_input_left=lhs._name,
meta_input_left=lhs._meta,
left_on=left_on,
left_on=_left_on,
n_partitions_left=lhs.npartitions,
name_input_right=rhs._name,
meta_input_right=rhs._meta,
right_on=right_on,
right_on=_right_on,
n_partitions_right=rhs.npartitions,
meta_output=meta,
how=how,
Expand Down Expand Up @@ -159,6 +161,8 @@ def merge_unpack(
meta_right: pd.DataFrame,
result_meta: pd.DataFrame,
suffixes: Suffixes,
left_index: bool,
right_index: bool,
):
from dask.dataframe.multi import merge_chunk

Expand All @@ -178,6 +182,8 @@ def merge_unpack(
left_on=left_on,
right_on=right_on,
suffixes=suffixes,
left_index=left_index,
right_index=right_index,
)


Expand Down Expand Up @@ -383,5 +389,7 @@ def _construct_graph(self) -> dict[tuple | str, tuple]:
self.meta_input_right,
self.meta_output,
self.suffixes,
self.left_index,
self.right_index,
)
return dsk
24 changes: 24 additions & 0 deletions distributed/shuffle/tests/test_merge.py
Expand Up @@ -370,3 +370,27 @@ async def test_merge_by_multiple_columns(c, s, a, b, how):
),
pd.merge(pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]),
)


@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})

left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)

assert_eq(
await c.compute(
left.merge(right, how=how, left_index=True, right_on="a", shuffle="p2p")
),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)

assert_eq(
await c.compute(
right.merge(left, how=how, right_index=True, left_on="a", shuffle="p2p")
),
pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"),
)

0 comments on commit acb2809

Please sign in to comment.