Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve type reconciliation for P2P #8332

Merged
merged 9 commits into from Nov 10, 2023

Conversation

hendrikmakait
Copy link
Member

This PR does not address #8310 and dask/dask#10014. Due to its dependency on Arrow, I doubt that the current P2P shuffle implementation will be able to gracefully handle these cases without a significant increase in complexity. To improve the user experience in those cases, we may want to add a better exception. I'd like to leave that to a future PR.

  • Tests added / passed
  • Passes pre-commit run --all-files

@hendrikmakait
Copy link
Member Author

cc @fjetter and @phofl

# First version that supports concatenating extension arrays (apache/arrow#14463)
minversion = "12.0.0"
# First version to implement type promotion for pa.concat_tables (apache/arrow#36846)
minversion = "14.0.0"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version came out on Nov 1, 2023. Users are unlikely to have it installed in their existing environments, so this PR will create some user pain forcing them to upgrade or manually choose to use task-based shuffling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this on a best effort basis instead of effectively pinning it? There are many users out there who are not affected by this problem. It would be nice if this wasn't a breaking change for them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am worried about increasing to 14 by default as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed a more permissive approach (it's actually more permissive than what we had before). I just didn't want to go through the hassle unless others think that being more backward-compatible is with it here.

for column, dtype in meta.dtypes.items():
actual = df[column].dtype
if actual == dtype:
continue
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, the dtypes of the _partitions column don't match up (int64 vs uint64). @phofl, can pandas handle this conversion as a zero-copy operation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a numpy question, but I am pretty sure that this is not possible without a copy

def make_partition(i):
"""Return mismatched column types for every other partition"""
if i % 2 == 1:
return pd.DataFrame({"a": np.random.random(10), "b": [True] * 10})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a regression hidden here:

Previously, we could reconcile bool and float partitions. Arrow considers these irreconcilable. (Personally, I agree.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this problem been reported as such or was this just a dummy unit test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this was a dummy example to test reconciliation. I could easily see a scenario where someone applies a "boolean" UDF that fails to cast (some) numerical values to a bool though.

Copy link
Contributor

github-actions bot commented Nov 8, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       27 files  ±  0         27 suites  ±0   14h 11m 51s ⏱️ + 27m 22s
  3 928 tests +  2    3 805 ✔️ +  1     117 💤 ±0  5 +1  1 🔥 ±0 
49 414 runs  +41  46 995 ✔️ +47  2 413 💤  - 5  5  - 1  1 🔥 ±0 

For more details on these failures and errors, see this check.

Results for commit 69376b1. ± Comparison against base commit 0dc9e88.

This pull request removes 11 and adds 13 tests. Note that renamed tests count towards both.
distributed.shuffle.tests.test_shuffle ‑ test_handle_null_partitions_p2p_shuffling
distributed.shuffle.tests.test_shuffle ‑ test_handle_null_partitions_p2p_shuffling_2
distributed.shuffle.tests.test_shuffle ‑ test_handle_object_columns_p2p
distributed.shuffle.tests.test_shuffle ‑ test_p2p_flaky_connect_fails_without_retry
distributed.shuffle.tests.test_shuffle ‑ test_p2p_flaky_connect_recover_with_retry
distributed.shuffle.tests.test_shuffle ‑ test_raise_on_incompatible_partitions_p2p_shuffling
distributed.shuffle.tests.test_shuffle ‑ test_reconcile_mismatching_partitions_p2p_shuffling
distributed.shuffle.tests.test_shuffle ‑ test_set_index_p2p
distributed.shuffle.tests.test_shuffle ‑ test_set_index_p2p_with_existing_index
distributed.shuffle.tests.test_shuffle ‑ test_shuffle_p2p_with_existing_index
…
distributed.shuffle.tests.test_shuffle ‑ test_flaky_connect_fails_without_retry
distributed.shuffle.tests.test_shuffle ‑ test_flaky_connect_recover_with_retry
distributed.shuffle.tests.test_shuffle ‑ test_handle_categorical_data
distributed.shuffle.tests.test_shuffle ‑ test_handle_floats_in_int_meta
distributed.shuffle.tests.test_shuffle ‑ test_handle_null_partitions
distributed.shuffle.tests.test_shuffle ‑ test_handle_null_partitions_2
distributed.shuffle.tests.test_shuffle ‑ test_handle_object_columns
distributed.shuffle.tests.test_shuffle ‑ test_raise_on_incompatible_partitions
distributed.shuffle.tests.test_shuffle ‑ test_reconcile_partitions
distributed.shuffle.tests.test_shuffle ‑ test_set_index
…

♻️ This comment has been updated with latest results.

@@ -306,8 +306,6 @@ def split_by_worker(

from dask.dataframe.dispatch import to_pyarrow_table_dispatch

df = df.astype(meta.dtypes, copy=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

def make_partition(i):
"""Return mismatched column types for every other partition"""
if i % 2 == 1:
return pd.DataFrame({"a": np.random.random(10), "b": [True] * 10})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this problem been reported as such or was this just a dummy unit test?

# First version that supports concatenating extension arrays (apache/arrow#14463)
minversion = "12.0.0"
# First version to implement type promotion for pa.concat_tables (apache/arrow#36846)
minversion = "14.0.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this on a best effort basis instead of effectively pinning it? There are many users out there who are not affected by this problem. It would be nice if this wasn't a breaking change for them.

df.b = df.b.astype("category")
shuffled = df.shuffle("a", shuffle="p2p")
result, expected = await c.compute([shuffled, df], sync=True)
dd.assert_eq(result, expected, check_categorical=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one caveat here: The expected categories_dtype of column b is string, but after the shuffle, we end up with object. There's not much we can do about this at the moment, meta also has object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: dask/dask#6242

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing the backwards compat.

Assuming dask/dask#10622 is accepted we'll enter a deprecation cycle and can hopefully soon drop the older pyarrow versions

{
# Extension types
f"col{next(counter)}": pd.array(
[pd.Period("2022-01-01", freq="D") + i for i in range(100)],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non blocking, I think you can use period_range and interval_range instead, which is better here

@hendrikmakait hendrikmakait merged commit 8212868 into dask:main Nov 10, 2023
26 of 34 checks passed
@hendrikmakait hendrikmakait deleted the p2p-dtype-reconciliation branch November 10, 2023 15:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Categorical column turned to NaN after P2P Shuffle IntCastingNaNError after outer join with int64 column
3 participants