Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle unknown chunk sizes in P2P rechunking #7856

Merged
merged 13 commits into from
May 25, 2023
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ repos:
- pytest
- tornado
- pyarrow
- git+https://github.com/dask/dask
- git+https://github.com/dask/dask.git
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
- git+https://github.com/dask/zict
4 changes: 2 additions & 2 deletions distributed/shuffle/_worker_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@
memory_limiter_disk: ResourceLimiter,
memory_limiter_comms: ResourceLimiter,
):
from dask.array.rechunk import _old_to_new
from dask.array.rechunk import old_to_new

Check warning on line 304 in distributed/shuffle/_worker_extension.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_worker_extension.py#L304

Added line #L304 was not covered by tests

super().__init__(
id=id,
Expand All @@ -323,7 +323,7 @@
self.partitions_of = dict(partitions_of)
self.worker_for = worker_for
self._slicing = rechunk_slicing(old, new)
self._old_to_new = _old_to_new(old, new)
self._old_to_new = old_to_new(old, new)

Check warning on line 326 in distributed/shuffle/_worker_extension.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_worker_extension.py#L326

Added line #L326 was not covered by tests

async def _receive(self, data: list[tuple[ArrayRechunkShardID, bytes]]) -> None:
self.raise_if_closed()
Expand Down