Skip to content

Commit

Permalink
Reduce memory usage during culling for shuffling and merge (#8197)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl committed Sep 20, 2023
1 parent 1650ceb commit 2858930
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
14 changes: 6 additions & 8 deletions distributed/shuffle/_merge.py
@@ -1,7 +1,6 @@
# mypy: ignore-errors
from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -243,15 +242,14 @@ def _cull_dependencies(
all input partitions. This method does not require graph
materialization.
"""
deps = defaultdict(set)
deps = {}
parts_out = parts_out or self._keys_to_parts(keys)
keys = {(self.name_input_left, i) for i in range(self.npartitions)}
keys |= {(self.name_input_right, i) for i in range(self.npartitions)}
# Protect against mutations later on with frozenset
keys = frozenset(keys)
for part in parts_out:
deps[(self.name, part)] |= {
(self.name_input_left, i) for i in range(self.npartitions)
}
deps[(self.name, part)] |= {
(self.name_input_right, i) for i in range(self.npartitions)
}
deps[(self.name, part)] = keys
return deps

def _keys_to_parts(self, keys: Iterable[str]) -> set[str]:
Expand Down
7 changes: 5 additions & 2 deletions distributed/shuffle/_shuffle.py
Expand Up @@ -227,8 +227,11 @@ def cull(
parameter.
"""
parts_out = self._keys_to_parts(keys)
input_parts = {(self.name_input, i) for i in range(self.npartitions_input)}
culled_deps = {(self.name, part): input_parts.copy() for part in parts_out}
# Protect against mutations later on with frozenset
input_parts = frozenset(
{(self.name_input, i) for i in range(self.npartitions_input)}
)
culled_deps = {(self.name, part): input_parts for part in parts_out}

if parts_out != set(self.parts_out):
culled_layer = self._cull(parts_out)
Expand Down

0 comments on commit 2858930

Please sign in to comment.