Skip to content

Commit

Permalink
Merge branch 'main' into avoid-deadlock-in-p2p-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Dec 15, 2023
2 parents 1867f6f + 3882bc6 commit 0cda6e4
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 9 deletions.
16 changes: 10 additions & 6 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,22 @@ def cull(self, keys: Iterable[str], all_keys: Any) -> tuple[HashJoinP2PLayer, di

def _construct_graph(self) -> dict[tuple | str, tuple]:
token_left = tokenize(
"hash-join",
# Include self.name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self.name,
self.name_input_left,
self.left_on,
self.npartitions,
self.parts_out,
self.left_index,
)
token_right = tokenize(
"hash-join",
# Include self.name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self.name,
self.name_input_right,
self.right_on,
self.npartitions,
self.parts_out,
self.right_index,
)
dsk: dict[tuple | str, tuple] = {}
name_left = "hash-join-transfer-" + token_left
Expand Down
27 changes: 27 additions & 0 deletions distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def get_or_create(
self._raise_if_barrier_unknown(spec.id)
self._raise_if_task_not_processing(key)
worker_for = self._calculate_worker_for(spec)
self._ensure_output_tasks_are_non_rootish(spec)
state = spec.create_new_run(worker_for)
self.active_shuffles[spec.id] = state
self._shuffles[spec.id].add(state)
Expand Down Expand Up @@ -231,6 +232,32 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
worker_for[partition] = worker
return worker_for

def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None:
"""Output tasks are created without worker restrictions and run once with the
only purpose of setting the worker restriction and then raising Reschedule, and
then running again properly on the correct worker. It would be non-trivial to
set the worker restriction before they're first run due to potential task
fusion.
Most times, this lack of initial restrictions would cause output tasks to be
labelled as rootish on their first (very fast) run, which in turn would break
the design assumption that the worker-side queue of rootish tasks will last long
enough to cover the round-trip to the scheduler to receive more tasks, which in
turn would cause a measurable slowdown on the overall runtime of the shuffle
operation.
This method ensures that, given M output tasks and N workers, each worker-side
queue is pre-loaded with M/N output tasks which can be flushed very fast as
they all raise Reschedule() in quick succession.
See Also
--------
ShuffleRun._ensure_output_worker
"""
barrier = self.scheduler.tasks[barrier_key(spec.id)]
for dependent in barrier.dependents:
dependent._rootish = False

@log_errors()
def _set_restriction(self, ts: TaskState, worker: str) -> None:
if ts.annotations and "shuffle_original_restrictions" in ts.annotations:
Expand Down
10 changes: 7 additions & 3 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s
# Vary the number of output partitions for the shuffles of dd2
.repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle="p2p")
)
# Generate unique shuffle IDs if the input frame is the same but parameters differ
# Generate unique shuffle IDs if the input frame is the same but
# parameters differ. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
result = await c.compute(out)
expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge(
Expand Down Expand Up @@ -153,8 +155,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a,
right_on="b",
shuffle="p2p",
)
# Generate the same shuffle IDs if the input frame is the same and all its parameters match
assert sum(id_from_key(k) is not None for k in out.dask) == 3
# Generate unique shuffle IDs if the input frame is the same and all its
# parameters match. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
result = await c.compute(out)
expected = pdf2.merge(
pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"
Expand Down
39 changes: 39 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2478,6 +2478,45 @@ async def test_unpack_gets_rescheduled_from_non_participating_worker(c, s, a):
dd.assert_eq(result, expected)


class BlockedBarrierShuffleSchedulerPlugin(ShuffleSchedulerPlugin):
def __init__(self, scheduler: Scheduler):
super().__init__(scheduler)
self.in_barrier = asyncio.Event()
self.block_barrier = asyncio.Event()

async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> None:
self.in_barrier.set()
await self.block_barrier.wait()
return await super().barrier(id, run_id, consistent)


@gen_cluster(client=True)
async def test_unpack_is_non_rootish(c, s, a, b):
with pytest.warns(UserWarning):
scheduler_plugin = BlockedBarrierShuffleSchedulerPlugin(s)
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-21",
dtypes={"x": float, "y": float},
freq="10 s",
)
df = df.shuffle("x")
result = c.compute(df)

await scheduler_plugin.in_barrier.wait()

unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"]
assert len(unpack_tss) == 20
assert not any(s.is_rootish(ts) for ts in unpack_tss)
del unpack_tss
scheduler_plugin.block_barrier.set()
result = await result

await check_worker_cleanup(a)
await check_worker_cleanup(b)
await check_scheduler_cleanup(s)


class FlakyConnectionPool(ConnectionPool):
def __init__(self, *args, failing_connects=0, **kwargs):
self.attempts = 0
Expand Down

0 comments on commit 0cda6e4

Please sign in to comment.