Skip to content

Commit

Permalink
Tweak comment
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Dec 15, 2023
1 parent 0cda6e4 commit d1d6e01
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions distributed/shuffle/_scheduler_plugin.py
Expand Up @@ -195,11 +195,14 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
else:
workers = list(self.scheduler.workers)

seen = {barrier}

# Check if this shuffle shares an output task with a different shuffle that has
# Check if this shuffle shares output tasks with a different shuffle that has
# already been initialized and needs to be taken into account when
# mapping output partitions to workers
# mapping output partitions to workers.
# Naively, you could delete this whole paragraph and just call
# spec.pick_worker; it would return two identical sets of results on both calls
# of this method... until the set of available workers changes between the two
# calls, which would cause misaligned shuffle outputs and a deadlock.
seen = {barrier}
for dependent in barrier.dependents:
for possible_barrier in dependent.dependencies:
if possible_barrier in seen:
Expand All @@ -210,10 +213,10 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
if not (shuffle := self.active_shuffles.get(other_barrier_key)):
continue
current_worker_for = shuffle.run_spec.worker_for
# This is a fail-safe for future modifications, there should only ever
# be one other shuffle that shares output tasks, so existing is always
# empty.
if existing:
# This is a fail-safe for future three-ways merges. At the moment there
# should only ever be at most one other shuffle that shares output
# tasks, so existing will always be empty.
if existing: # pragma: nocover
for shared_key in existing.keys() & current_worker_for.keys():
if existing[shared_key] != current_worker_for[shared_key]:
raise RuntimeError(
Expand All @@ -223,7 +226,7 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
f"Mismatch encountered for output partition {shared_key!r}: "
f"{existing[shared_key]} != {current_worker_for[shared_key]}."
)
existing.update(shuffle.run_spec.worker_for)
existing.update(current_worker_for)

worker_for = {}
for partition in spec.output_partitions:
Expand Down

0 comments on commit d1d6e01

Please sign in to comment.