Skip to content

Commit

Permalink
Fix excessive logging on P2P retry (#8511)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Feb 26, 2024
1 parent 5d913b4 commit 390a5b5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
15 changes: 13 additions & 2 deletions distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
import itertools
import pickle
import time
from collections.abc import Callable, Generator, Hashable, Iterable, Iterator, Sequence
from collections.abc import (
Callable,
Coroutine,
Generator,
Hashable,
Iterable,
Iterator,
Sequence,
)
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
Expand Down Expand Up @@ -212,8 +220,11 @@ async def send(
else:
shards_or_bytes = shards

def _send() -> Coroutine[Any, Any, None]:
return self._send(address, shards_or_bytes)

return await retry(
partial(self._send, address, shards_or_bytes),
_send,
count=self.RETRY_COUNT,
delay_min=self.RETRY_DELAY_MIN,
delay_max=self.RETRY_DELAY_MAX,
Expand Down
13 changes: 9 additions & 4 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from distributed.utils import Deadline
from distributed.utils_test import (
async_poll_for,
captured_logger,
cluster,
gen_cluster,
gen_test,
Expand Down Expand Up @@ -2642,10 +2643,14 @@ async def test_flaky_connect_recover_with_retry(c, s, a, b):
x = dd.shuffle.shuffle(df, "x")

rpc = await FlakyConnectionPool(failing_connects=1)

with mock.patch.object(a, "rpc", rpc):
await c.compute(x)
assert rpc.failed_attempts == 1
with captured_logger("distributed.utils_comm") as caplog:
with mock.patch.object(a, "rpc", rpc):
await c.compute(x)
assert rpc.failed_attempts == 1
# Assert that we do not log the binary payload (or any other excessive amount of data)
logs = caplog.getvalue()
assert len(logs) < 600
assert "Retrying" in logs

await check_worker_cleanup(a)
await check_worker_cleanup(b)
Expand Down

0 comments on commit 390a5b5

Please sign in to comment.