Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with P2P shuffle fails if left_index or right_index is True #8121

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,22 @@
npartitions = max(lhs.npartitions, rhs.npartitions)

if isinstance(left_on, Index):
left_on = None
_left_on = None

Check warning on line 76 in distributed/shuffle/_merge.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_merge.py#L76

Added line #L76 was not covered by tests
left_index = True
else:
left_index = False
_left_on = left_on

Check warning on line 80 in distributed/shuffle/_merge.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_merge.py#L80

Added line #L80 was not covered by tests

if isinstance(right_on, Index):
right_on = None
_right_on = None

Check warning on line 83 in distributed/shuffle/_merge.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_merge.py#L83

Added line #L83 was not covered by tests
right_index = True
else:
right_index = False
_right_on = right_on

Check warning on line 87 in distributed/shuffle/_merge.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_merge.py#L87

Added line #L87 was not covered by tests
merge_kwargs = dict(
how=how,
left_on=left_on,
right_on=right_on,
left_on=_left_on,
right_on=_right_on,
left_index=left_index,
right_index=right_index,
suffixes=suffixes,
Expand All @@ -104,11 +106,11 @@
name=merge_name,
name_input_left=lhs._name,
meta_input_left=lhs._meta,
left_on=left_on,
left_on=_left_on,
n_partitions_left=lhs.npartitions,
name_input_right=rhs._name,
meta_input_right=rhs._meta,
right_on=right_on,
right_on=_right_on,
n_partitions_right=rhs.npartitions,
meta_output=meta,
how=how,
Expand Down Expand Up @@ -159,6 +161,8 @@
meta_right: pd.DataFrame,
result_meta: pd.DataFrame,
suffixes: Suffixes,
left_index: bool,
right_index: bool,
):
from dask.dataframe.multi import merge_chunk

Expand All @@ -178,6 +182,8 @@
left_on=left_on,
right_on=right_on,
suffixes=suffixes,
left_index=left_index,
right_index=right_index,
)


Expand Down Expand Up @@ -383,5 +389,7 @@
self.meta_input_right,
self.meta_output,
self.suffixes,
self.left_index,
self.right_index,
)
return dsk
24 changes: 24 additions & 0 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,27 @@ async def test_merge_by_multiple_columns(c, s, a, b, how):
),
pd.merge(pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]),
)


@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})

left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)

assert_eq(
await c.compute(
left.merge(right, how=how, left_index=True, right_on="a", shuffle="p2p")
),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)

assert_eq(
await c.compute(
right.merge(left, how=how, right_index=True, left_on="a", shuffle="p2p")
),
pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"),
)