New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make P2P shuffle extensible #8096
Conversation
The main thing happening in this PR is the introduction of dispatches and replacing some dictionaries with dataclasses. The rest is moving stuff around, which I will probably extract into a separate PR to make it easier to track changes. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 21 files ± 0 21 suites ±0 11h 17m 6s ⏱️ - 21m 29s For more details on these failures, see this check. Results for commit b76bd7a. ± Comparison against base commit 8aa04a8. This pull request removes 2 and adds 7 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
Refactor and introduce dispatch more dispatches Generic ToPickle
9ec92d5
to
e0adc3a
Compare
@wence-: An initial iteration on this one is done, I would appreciate your thoughts! |
To implement a new shuffle-like, you now need to implement...
a class handling the main logic
a few dispatches to convert things
Ideally, I would like to reduce the implementation overhead, but I not sure yet how to best do that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small suggestions in this round. Perhaps handling the dispatch as required methods on the base dataclass objects is cleaner?
distributed/shuffle/_rechunk.py
Outdated
|
||
return ArrayRechunkState( | ||
id=spec.id, | ||
run_id=next(SchedulerShuffleState._run_id_iterator), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the SchedulerShuffleState
be responsible for setting the run id? This is meant to be a unique sequence number AIUI, but it's easy to do something wrong here, it feels like.
Perhaps:
from functools import partial
from itertools import count
from dataclasses import dataclass, field
@dataclass
class SchedulerShuffleState:
...
run_id = field(init=False, default_factory=partial(next, count())
@dataclass
ArrayRechunkState(SchedulerShuffleState)
old: ...
new: ...
This way run_id
is created automagically, and doesn't even appear in the list of arguments to __init__
(so you can't get it wrong).
@@ -441,47 +447,98 @@ def _() -> dict[str, tuple[NDIndex, bytes]]: | |||
return self.run_id | |||
|
|||
async def get_output_partition( | |||
self, partition_id: NDIndex, key: str, meta: pd.DataFrame | None = None | |||
self, partition_id: NDIndex, key: str, **kwargs: Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this switch to kwargs
future-proofing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future-proofing and making it completely transparent to the plugin.
distributed/shuffle/_core.py
Outdated
@dataclass(frozen=True) | ||
class ShuffleSpec(abc.ABC, Generic[_T_partition_id]): | ||
id: ShuffleId | ||
run_id: int | ||
output_workers: set[str] | ||
|
||
def create_new_run( | ||
self, | ||
plugin: ShuffleSchedulerPlugin, | ||
) -> SchedulerShuffleState: | ||
worker_for = self._pin_output_workers(plugin) | ||
return SchedulerShuffleState( | ||
run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for), | ||
participating_workers=set(worker_for.values()), | ||
) | ||
|
||
@abc.abstractmethod | ||
def _pin_output_workers( | ||
self, plugin: ShuffleSchedulerPlugin | ||
) -> dict[_T_partition_id, str]: | ||
"""TODO""" | ||
|
||
@abc.abstractmethod | ||
def initialize_run_on_worker( | ||
self, | ||
run_id: int, | ||
worker_for: dict[_T_partition_id, str], | ||
plugin: ShuffleWorkerPlugin, | ||
) -> ShuffleRun: | ||
"""TODO""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All you need to do now is implement concrete subclasses of ShuffleSpec
and ShuffleRun
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think this looks neater now. Coverage went made in the tests, but not sure if that will fix itself in the latest test run.
_run_id_iterator: ClassVar[itertools.count] = itertools.count(1) | ||
@dataclass(frozen=True) | ||
class ShuffleRunSpec(Generic[_T_partition_id]): | ||
run_id: int = field(init=False, default_factory=partial(next, itertools.count(1))) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, this is this mypy bug
This PR replaces hard-coded logic for handling dataframe shuffles and array rechunking with a dispatch mechanism. The main goal is to improve the separation of concerns and making the P2P mechanism extensible for other kinds of "shuffle-like" operations.
pre-commit run --all-files