Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,52 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log)


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 1), ("127.0.0.2", 1)], client=True
)
async def test_prefer_gather_from_local_address_unless_busy(c, s, w1, w2, w3):
x = await c.scatter(123, workers=[w1.address, w3.address], broadcast=True)

# Set up w1 to be busy
w1.outgoing_current_count = 10000000

y = c.submit(inc, x, workers=[w2.address])
await wait(y)

assert w1.address in w2.busy_workers_log
assert not any(d["who"] == w2.address for d in w1.outgoing_transfer_log)
assert any(d["who"] == w2.address for d in w3.outgoing_transfer_log)


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 1), ("127.0.0.2", 1)], client=True
)
async def test_prefer_gather_from_local_address_unless_busy_allows_reset(
c, s, w1, w2, w3
):
x = await c.scatter(123, workers=[w1.address, w3.address], broadcast=True)

# Set up both to be busy, ensuring multiple loops run
w1.outgoing_current_count = 10000000
w3.outgoing_current_count = 10000000

y = c.submit(inc, x, workers=[w2.address])
with pytest.raises(TimeoutError):
await wait(y, timeout=1.0)

assert w1.address in w2.busy_workers_log
assert w3.address in w2.busy_workers_log

# Un-block, ensure they use the one that was unblocked
w1.outgoing_current_count = 0
await wait(y)

assert any(d["who"] == w2.address for d in w1.outgoing_transfer_log)
assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log)


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 20,
Expand Down
20 changes: 18 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ class Worker(ServerNode):
* **in_flight_workers**: ``{worker: {task}}``
The workers from which we are currently gathering data and the
dependencies we expect from those connections
* **busy_workers**: ``{worker}``
The workers from which we have tried to gather data and received
a busy response. These will be removed from the list as they are
needed.
* **busy_workers_log**: ``{worker}``
For testing, log of all workers which ever reported busy.
* **comm_bytes**: ``int``
The total number of bytes in flight
* **threads**: ``{key: int}``
Expand Down Expand Up @@ -423,6 +429,8 @@ def __init__(

self.in_flight_tasks = 0
self.in_flight_workers = dict()
self.busy_workers = set()
self.busy_workers_log = set()
self.total_out_connections = dask.config.get(
"distributed.worker.connections.outgoing"
)
Expand Down Expand Up @@ -2164,11 +2172,17 @@ def ensure_communicating(self):
in_flight = True
continue
host = get_address_host(self.address)
local = [w for w in workers if get_address_host(w) == host]
workers_not_busy = [
w for w in workers if w not in self.busy_workers
]
local = [w for w in workers_not_busy if get_address_host(w) == host]
if local:
worker = random.choice(local)
else:
elif not workers_not_busy:
self.busy_workers.difference_update(workers)
Comment on lines +2181 to +2182
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC

  1. as soon as a worker is flagged busy it is added to busy_workers
  2. If a worker is in busy_workers, it will always be ignored
  3. The only way to remove a worker from this "busy" state is if all workers of the inspected tasks are flagged busy and only then they are removed from the busy_workers.

Condition 3.) is very strict since it hides a worker completely until we are lucky enough to hit a task with only busy who_has entries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're following -- I will point out that the 2nd unchecked TODO on this PR is resetting a peer's busy state when fetching a new key belonging to that peer, which will reduce condition 3's strictness. However, I didn't want to go ahead and bother with figuring out how to do that until the larger discussion re: the best approach to dealing with this was resolved.

worker = random.choice(list(workers))
else:
worker = random.choice(list(workers_not_busy))
to_gather, total_nbytes = self.select_keys_for_gather(
worker, to_gather_ts.key
)
Expand Down Expand Up @@ -2361,6 +2375,8 @@ async def gather_dep(

if response["status"] == "busy":
self.log.append(("busy-gather", worker, to_gather_keys))
self.busy_workers.add(worker)
self.busy_workers_log.add(worker)
Comment on lines +2377 to +2379
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer you using the Worker.log and test for the busy-gather event instead of introducing another set which is simply used for testing

for key in to_gather_keys:
ts = self.tasks.get(key)
if ts and ts.state == "flight":
Expand Down