Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up legacy cruft from worker reconnection #7712

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 1 addition & 22 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4107,12 +4107,9 @@ async def add_worker(
address: str,
status: str,
server_id: str,
keys=(),
nthreads=None,
name=None,
resolve_address=True,
nbytes=None,
types=None,
now=None,
resources=None,
host_info=None,
Expand All @@ -4134,15 +4131,6 @@ async def add_worker(
if address in self.workers:
raise ValueError("Worker already exists %s" % address)

if nbytes:
err = (
f"Worker {address!r} connected with {len(nbytes)} key(s) in memory! Worker reconnection is not supported. "
f"Keys: {list(nbytes)}"
)
logger.error(err)
await comm.write({"status": "error", "message": err, "time": time()})
return

if name in self.aliases:
logger.warning("Worker tried to connect with a duplicate name: %s", name)
msg = {
Expand Down Expand Up @@ -4202,9 +4190,6 @@ async def add_worker(
# exist before this.
self.check_idle_saturated(ws)

# for key in keys: # TODO
# self.mark_key_in_memory(key, [address])

self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)

for plugin in list(self.plugins.values()):
Expand Down Expand Up @@ -6982,13 +6967,7 @@ def update_data(
nbytes: dict,
client=None,
):
"""
Learn that new data has entered the network from an external source

See Also
--------
Scheduler.mark_key_in_memory
"""
"""Learn that new data has entered the network from an external source"""
who_has = {k: [self.coerce_address(vv) for vv in v] for k, v in who_has.items()}
logger.debug("Update data %s", who_has)

Expand Down
26 changes: 5 additions & 21 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,30 +1676,14 @@ def add_blocked(x, y, event):
await wait(z)


@gen_cluster(nthreads=[])
async def test_new_worker_with_data_rejected(s):
w = Worker(s.address, nthreads=1)
w.update_data(data={"x": 0})
assert w.state.tasks["x"].state == "memory"
assert w.data == {"x": 0}

with captured_logger(
"distributed.worker", level=logging.WARNING
) as wlog, captured_logger("distributed.scheduler", level=logging.WARNING) as slog:
with pytest.raises(RuntimeError, match="Worker failed to start"):
await w
assert "connected with 1 key(s) in memory" in slog.getvalue()
assert "Register worker" not in slog.getvalue()
assert "connected with 1 key(s) in memory" in wlog.getvalue()

assert w.status == Status.failed
assert not s.workers
assert not s.stream_comms
assert not s.host_info
@gen_test()
async def test_nonempty_data_is_rejected():
with pytest.raises(ValueError, match="Worker.data must be empty"):
await Worker("localhost:12345", nthreads=1, data={"x": 1})


@gen_cluster(client=True)
async def test_worker_arrives_with_processing_data(c, s, a, b):
async def test_worker_arrives_with_data_is_rejected(c, s, a, b):
# A worker arriving with data we need should still be rejected,
# and not affect other computations
x = delayed(slowinc)(1, delay=0.4)
Expand Down
15 changes: 5 additions & 10 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,11 @@ async def _register_with_scheduler(self) -> None:
if self.contact_address is None:
self.contact_address = self.address
logger.info("-" * 49)

# Worker reconnection is not supported
assert not self.data
assert not self.state.tasks

while True:
try:
_start = time()
Expand All @@ -1169,18 +1174,8 @@ async def _register_with_scheduler(self) -> None:
reply=False,
address=self.contact_address,
status=self.status.name,
keys=list(self.data),
nthreads=self.state.nthreads,
name=self.name,
nbytes={
ts.key: ts.get_nbytes()
for ts in self.state.tasks.values()
# Only if the task is in memory this is a sensible
# result since otherwise it simply submits the
# default value
if ts.state == "memory"
},
types={k: typename(v) for k, v in self.data.items()},
now=time(),
resources=self.state.total_resources,
memory_limit=self.memory_manager.memory_limit,
Expand Down
3 changes: 3 additions & 0 deletions distributed/worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def __init__(
else:
self.data = {}

if self.data:
raise ValueError("Worker.data must be empty at initialization time")

self.memory_monitor_interval = parse_timedelta(
dask.config.get("distributed.worker.memory.monitor-interval"),
default=False,
Expand Down