Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ def __init__(
self.total_occupancy = 0
self.host_info = defaultdict(dict)
self.resources = defaultdict(dict)
self.aliases = dict()
self.aliases = defaultdict(list)

self._task_state_collections = [self.unrunnable]

Expand Down Expand Up @@ -1363,7 +1363,22 @@ def add_worker(

ws = self.workers.get(address)
if ws is not None:
raise ValueError("Worker already exists %s" % address)
msg = {
"status": "error",
"message": "worker already exists %s" % address,
"time": time(),
}
yield comm.write(msg)
return

if name in self.aliases and address in self.aliases[name]:
msg = {
"status": "error",
"message": "name taken, %s" % name,
"time": time(),
}
yield comm.write(msg)
return

self.workers[address] = ws = WorkerState(
address=address,
Expand All @@ -1375,23 +1390,14 @@ def add_worker(
services=services,
)

if name in self.aliases:
msg = {
"status": "error",
"message": "name taken, %s" % name,
"time": time(),
}
yield comm.write(msg)
return

if "addresses" not in self.host_info[host]:
self.host_info[host].update({"addresses": set(), "cores": 0})

self.host_info[host]["addresses"].add(address)
self.host_info[host]["cores"] += ncores

self.total_ncores += ncores
self.aliases[name] = address
self.aliases[name].append(address)

response = self.heartbeat_worker(
address=address,
Expand Down Expand Up @@ -1856,7 +1862,12 @@ def remove_worker(self, comm=None, address=None, safe=False, close=True):

self.rpc.remove(address)
del self.stream_comms[address]
del self.aliases[ws.name]
try:
self.aliases[ws.name].remove(address)
except ValueError:
pass
if not self.aliases[ws.name]:
del self.aliases[ws.name]
self.idle.discard(ws)
self.saturated.discard(ws)
del self.workers[address]
Expand Down Expand Up @@ -4392,7 +4403,7 @@ def coerce_address(self, addr, resolve=True):
"""
# XXX how many address-parsing routines do we have?
if addr in self.aliases:
addr = self.aliases[addr]
addr = self.aliases[addr][-1]
if isinstance(addr, tuple):
addr = unparse_host_port(*addr)
if not isinstance(addr, six.string_types):
Expand All @@ -4410,7 +4421,8 @@ def coerce_hostname(self, host):
Coerce the hostname of a worker.
"""
if host in self.aliases:
return self.workers[self.aliases[host]].host
# get last as it's most recent
return self.workers[self.aliases[host][-1]].host
else:
return host

Expand Down
6 changes: 1 addition & 5 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,7 @@ def test_worker_name():
s.start(0)
w = yield Worker(s.ip, s.port, name="alice")
assert s.workers[w.address].name == "alice"
assert s.aliases["alice"] == w.address

with pytest.raises(ValueError):
w2 = yield Worker(s.ip, s.port, name="alice")
yield w2._close()
assert w.address in s.aliases["alice"]

yield s.close()
yield w._close()
Expand Down
1 change: 0 additions & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,6 @@ def silence_logging(level, root="distributed"):
return old


@toolz.memoize
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This slightly concerns me. I've seen systems where DNS lookups are surprisingly expensive. Your approach here is probably safer long though, especially for longer running processes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DNS caching also occurs at the OS level.

I'm not familiar with this decorator, but I do not see any mention of cache flushing to remove stale entries, as with the OS dns caches the usually have a TTL (time to live) to specify when they need to be re-queried.

With the current implementation, it appears that the @toolz.memozie will never flush the cache.

def ensure_ip(hostname):
""" Ensure that address is an IP address

Expand Down
54 changes: 28 additions & 26 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from tornado.gen import Return
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
from tornado.locks import Event, Lock

from . import profile, comm
from .batched import BatchedSend
Expand Down Expand Up @@ -502,6 +502,7 @@ def __init__(
self.scheduler_delay = 0
self.stream_comms = dict()
self.heartbeat_active = False
self.heartbeat_lock = Lock()
self._ipython_kernel = None

if self.local_dir not in sys.path:
Expand Down Expand Up @@ -711,7 +712,7 @@ def _register_with_scheduler(self):
except gen.TimeoutError:
logger.info("Timed out when connecting to scheduler")
if response["status"] != "OK":
raise ValueError("Unexpected response from register: %r" % (response,))
logger.warning("Unexpected response from register: %r" % (response,))
else:
# Retrieve eventual init functions and run them
for function_bytes in response["worker-setups"]:
Expand All @@ -734,30 +735,31 @@ def _register_with_scheduler(self):

@gen.coroutine
def heartbeat(self):
if not self.heartbeat_active:
self.heartbeat_active = True
logger.debug("Heartbeat: %s" % self.address)
try:
start = time()
response = yield self.scheduler.heartbeat_worker(
address=self.contact_address, now=time(), metrics=self.get_metrics()
)
end = time()
middle = (start + end) / 2

if response["status"] == "missing":
yield self._register_with_scheduler()
return
self.scheduler_delay = response["time"] - middle
self.periodic_callbacks["heartbeat"].callback_time = (
response["heartbeat-interval"] * 1000
)
except CommClosedError:
logger.warning("Heartbeat to scheduler failed")
finally:
self.heartbeat_active = False
else:
logger.debug("Heartbeat skipped: channel busy")
with (yield self.heartbeat_lock.acquire()):
if not self.heartbeat_active:
self.heartbeat_active = True
logger.debug("Heartbeat: %s" % self.address)
try:
start = time()
response = yield self.scheduler.heartbeat_worker(
address=self.contact_address, now=time(), metrics=self.get_metrics()
)
end = time()
middle = (start + end) / 2

if response["status"] == "missing":
yield self._register_with_scheduler()
return
self.scheduler_delay = response["time"] - middle
self.periodic_callbacks["heartbeat"].callback_time = (
response["heartbeat-interval"] * 1000
)
except CommClosedError:
logger.warning("Heartbeat to scheduler failed")
finally:
self.heartbeat_active = False
else:
logger.debug("Heartbeat skipped: channel busy")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was finding that while this method has a variable that it checks to see if it another thread is currently performing a heartbeat check, and thus issues the "channel busy" message and skipping the heartbeat check.

There is a race condition, i.e. two threads may be have have checked the if branch, and found that a heart beat check is necessary. This was found as there was multiple concurrent heart-beat messages on the scheduler from the same worker.

Ideally the register with scheduler should be forced to be single threaded resource with appropriate tests to prevent concurrent operation as well as tests to verify that the registration is still necessary.

I found that not only the periodic heart beat would could call register with scheduler in multiple threads, and if the network connection is broken, that handler would call register with scheduler, thus you could end up with multiple threads concurrently trying to register with the scheduler.

Then to top it off, if one thread successfully registered with scheduler, then the other thread would fail and throw an exception, which would disable the re-registration of the heart beat scheduler, as at the start of the register with scheduler, would disable the period heart beat, and the thrown exception would prevent re-registration.


@gen.coroutine
def handle_scheduler(self, comm):
Expand Down