From d1d6e0137b6234079f85c08ab22314a8761802f7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 15 Dec 2023 15:35:03 +0000 Subject: [PATCH] Tweak comment --- distributed/shuffle/_scheduler_plugin.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index af4a305117..3f2f4dc50b 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -195,11 +195,14 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: else: workers = list(self.scheduler.workers) - seen = {barrier} - - # Check if this shuffle shares an output task with a different shuffle that has + # Check if this shuffle shares output tasks with a different shuffle that has # already been initialized and needs to be taken into account when - # mapping output partitions to workers + # mapping output partitions to workers. + # Naively, you could delete this whole paragraph and just call + # spec.pick_worker; it would return two identical sets of results on both calls + # of this method... until the set of available workers changes between the two + # calls, which would cause misaligned shuffle outputs and a deadlock. + seen = {barrier} for dependent in barrier.dependents: for possible_barrier in dependent.dependencies: if possible_barrier in seen: @@ -210,10 +213,10 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: if not (shuffle := self.active_shuffles.get(other_barrier_key)): continue current_worker_for = shuffle.run_spec.worker_for - # This is a fail-safe for future modifications, there should only ever - # be one other shuffle that shares output tasks, so existing is always - # empty. - if existing: + # This is a fail-safe for future three-ways merges. At the moment there + # should only ever be at most one other shuffle that shares output + # tasks, so existing will always be empty. + if existing: # pragma: nocover for shared_key in existing.keys() & current_worker_for.keys(): if existing[shared_key] != current_worker_for[shared_key]: raise RuntimeError( @@ -223,7 +226,7 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: f"Mismatch encountered for output partition {shared_key!r}: " f"{existing[shared_key]} != {current_worker_for[shared_key]}." ) - existing.update(shuffle.run_spec.worker_for) + existing.update(current_worker_for) worker_for = {} for partition in spec.output_partitions: