From acb28095034dcbf546531d7eee7bd39217e19b24 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 21 Aug 2023 18:25:20 +0200 Subject: [PATCH] Merge with P2P shuffle fails if left_index or right_index is True (#8121) --- distributed/shuffle/_merge.py | 20 ++++++++++++++------ distributed/shuffle/tests/test_merge.py | 24 ++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index 42ce1ce081..5241e68dd8 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -73,20 +73,22 @@ def hash_join_p2p( npartitions = max(lhs.npartitions, rhs.npartitions) if isinstance(left_on, Index): - left_on = None + _left_on = None left_index = True else: left_index = False + _left_on = left_on if isinstance(right_on, Index): - right_on = None + _right_on = None right_index = True else: right_index = False + _right_on = right_on merge_kwargs = dict( how=how, - left_on=left_on, - right_on=right_on, + left_on=_left_on, + right_on=_right_on, left_index=left_index, right_index=right_index, suffixes=suffixes, @@ -104,11 +106,11 @@ def hash_join_p2p( name=merge_name, name_input_left=lhs._name, meta_input_left=lhs._meta, - left_on=left_on, + left_on=_left_on, n_partitions_left=lhs.npartitions, name_input_right=rhs._name, meta_input_right=rhs._meta, - right_on=right_on, + right_on=_right_on, n_partitions_right=rhs.npartitions, meta_output=meta, how=how, @@ -159,6 +161,8 @@ def merge_unpack( meta_right: pd.DataFrame, result_meta: pd.DataFrame, suffixes: Suffixes, + left_index: bool, + right_index: bool, ): from dask.dataframe.multi import merge_chunk @@ -178,6 +182,8 @@ def merge_unpack( left_on=left_on, right_on=right_on, suffixes=suffixes, + left_index=left_index, + right_index=right_index, ) @@ -383,5 +389,7 @@ def _construct_graph(self) -> dict[tuple | str, tuple]: self.meta_input_right, self.meta_output, self.suffixes, + self.left_index, + self.right_index, ) return dsk diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index c8ef5857bb..e30961efae 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -370,3 +370,27 @@ async def test_merge_by_multiple_columns(c, s, a, b, how): ), pd.merge(pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]), ) + + +@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) +@gen_cluster(client=True) +async def test_index_merge_p2p(c, s, a, b, how): + pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a") + pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1}) + + left = dd.from_pandas(pdf_left, npartitions=5, sort=False) + right = dd.from_pandas(pdf_right, npartitions=6) + + assert_eq( + await c.compute( + left.merge(right, how=how, left_index=True, right_on="a", shuffle="p2p") + ), + pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"), + ) + + assert_eq( + await c.compute( + right.merge(left, how=how, right_index=True, left_on="a", shuffle="p2p") + ), + pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"), + )