New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restructure P2P code #8098
Restructure P2P code #8098
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 21 files ±0 21 suites ±0 11h 11m 52s ⏱️ - 59m 23s For more details on these failures, see this check. Results for commit f361e71. ± Comparison against base commit f27e9a2. ♻️ This comment has been updated with latest results. |
cc @wence- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For minimal code-movement changes some of the new modules now need import pandas as pd
and import numpy as np
gated behind TYPE_CHECKING
and the type spelt with strings I think.
That might be the best way to get this in, but one could consider making the get_output_partition
method generic over meta
.
distributed/shuffle/_core.py
Outdated
from enum import Enum | ||
from typing import TYPE_CHECKING, Any, ClassVar, Generic, NewType, TypeVar | ||
|
||
import pandas as pd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this was guarded by if TYPE_CHECKING
which looks to be the cause of most of the test failures.
I note get_output_partition
advertises that it accepts meta : pd.DataFrame | None
. Probably that should be meta: _T_partition_type | None
and then the pandas import can go from this generic core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I just looked at the test report and missed the complete failure of mindeps
.
|
||
@abc.abstractmethod | ||
async def get_output_partition( | ||
self, partition_id: _T_partition_id, key: str, meta: pd.DataFrame | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pd.DataFrame
seems oddly specific for a generic implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very true, but I would like to make more involved changes such as adding generic typing in a separate PR (e.g., #8096). It will be impossible to spot any meaningful changes in this PR.
if plugin is None: | ||
raise RuntimeError( | ||
f"The worker {worker.address} does not have a ShuffleExtension. " | ||
"Is pandas installed on the worker?" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that now necessary even if only using shuffle extensions for array rechunking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is outdated, but the worker plugin is still involved in array rechunking. Note that we have conflicting names between the generic "P2P shuffle" approach as in sending data all-to-all across the cluster and the specific dataframe.shuffle
operation. If you have naming suggestions for resolving this, please let me know. As with other comments, I would rather not address this here.
@@ -252,3 +264,226 @@ def split_axes(old: ChunkedAxes, new: ChunkedAxes) -> SplitAxes: | |||
old_chunk.sort(key=lambda split: split.slice.start) | |||
axes.append(old_axis) | |||
return axes | |||
|
|||
|
|||
def convert_chunk(data: bytes) -> np.ndarray: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do these type annotations work for normal execution. AFAICT, np
is not imported at the top-level, so at runtime (rather than type checking time) this name is not defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not interpreted at runtime thanks to from __future__ import annotations
.
Note: If
from __future__ import annotations
is used, annotations are not evaluated at function definition time. Instead, they are stored as strings in__annotations__
. This makes it unnecessary to use quotes around the annotation (see PEP 563).
https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING
return self.run_id | ||
|
||
async def get_output_partition( | ||
self, partition_id: NDIndex, key: str, meta: pd.DataFrame | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, if this function were generic over meta as _T_partition_type
then meta
could be meta : np.ndarray | None
here, and rechunking wouldn't require pandas for this type annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remaining failures do not look related, I think.
Extracted from #8096
_shuffle.py
or_rechunk.py
files._core.py
.pre-commit run --all-files