diff --git a/distributed/client.py b/distributed/client.py index 22d89cda4e..ac098d7987 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2270,6 +2270,10 @@ def run(self, function, *args, **kwargs): wait: boolean (optional) If the function is asynchronous whether or not to wait until that function finishes. + nanny : bool, defualt False + Whether to run ``function`` on the nanny. By default, the function + is run on the worker process. If specified, the addresses in + ``workers`` should still be the worker addresses, not the nanny addresses. Examples -------- @@ -3354,7 +3358,7 @@ def get_scheduler_logs(self, n=None): Parameters ---------- - n: int + n : int Number of logs to retrive. Maxes out at 10000 by default, confiruable in config.yaml::log-length @@ -3364,23 +3368,27 @@ def get_scheduler_logs(self, n=None): """ return self.sync(self.scheduler.logs, n=n) - def get_worker_logs(self, n=None, workers=None): + def get_worker_logs(self, n=None, workers=None, nanny=False): """ Get logs from workers Parameters ---------- - n: int + n : int Number of logs to retrive. Maxes out at 10000 by default, confiruable in config.yaml::log-length - workers: iterable - List of worker addresses to retrive. Gets all workers by default. + workers : iterable + List of worker addresses to retrieve. Gets all workers by default. + nanny : bool, default False + Whether to get the logs from the workers (False) or the nannies (True). If + specified, the addresses in `workers` should still be the worker addresses, + not the nanny addresses. Returns ------- Dictionary mapping worker address to logs. Logs are returned in reversed order (newest first) """ - return self.sync(self.scheduler.worker_logs, n=n, workers=workers) + return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny) def retire_workers(self, workers=None, close_workers=True, **kwargs): """ Retire certain workers on the scheduler diff --git a/distributed/nanny.py b/distributed/nanny.py index 59a8083e83..9cf444fc7c 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -79,6 +79,7 @@ def __init__( protocol=None, **worker_kwargs ): + self._setup_logging(logger) self.loop = loop or IOLoop.current() self.security = security or Security() assert isinstance(self.security, Security) @@ -130,6 +131,7 @@ def __init__( "kill": self.kill, "restart": self.restart, # cannot call it 'close' on the rpc side for naming conflict + "get_logs": self.get_logs, "terminate": self.close, "close_gracefully": self.close_gracefully, "run": self.run, diff --git a/distributed/node.py b/distributed/node.py index 4f0b9813a8..8bd81ffe5a 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -1,12 +1,15 @@ from __future__ import print_function, division, absolute_import import warnings +import logging from tornado.ioloop import IOLoop +import dask -from .compatibility import unicode +from .compatibility import unicode, finalize from .core import Server, ConnectionPool from .versions import get_versions +from .utils import DequeHandler class Node(object): @@ -131,6 +134,25 @@ def stop_services(self): def service_ports(self): return {k: v.port for k, v in self.services.items()} + def _setup_logging(self, logger): + self._deque_handler = DequeHandler( + n=dask.config.get("distributed.admin.log-length") + ) + self._deque_handler.setFormatter( + logging.Formatter(dask.config.get("distributed.admin.log-format")) + ) + logger.addHandler(self._deque_handler) + finalize(self, logger.removeHandler, self._deque_handler) + + def get_logs(self, comm=None, n=None): + deque_handler = self._deque_handler + if n is None: + L = list(deque_handler.deque) + else: + L = deque_handler.deque + L = [L[-i] for i in range(min(n, len(L)))] + return [(msg.levelname, deque_handler.format(msg)) for msg in L] + async def __aenter__(self): await self return self diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ca3c1241ea..7532e902d2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -52,7 +52,6 @@ key_split, validate_key, no_default, - DequeHandler, parse_timedelta, parse_bytes, PeriodicCallback, @@ -843,7 +842,7 @@ def __init__( dashboard_address=None, **kwargs ): - self._setup_logging() + self._setup_logging(logger) # Attributes self.allowed_failures = allowed_failures @@ -1327,16 +1326,6 @@ def close_worker(self, stream=None, worker=None, safe=None): self.worker_send(worker, {"op": "close", "report": False}) self.remove_worker(address=worker, safe=safe) - def _setup_logging(self): - self._deque_handler = DequeHandler( - n=dask.config.get("distributed.admin.log-length") - ) - self._deque_handler.setFormatter( - logging.Formatter(dask.config.get("distributed.admin.log-format")) - ) - logger.addHandler(self._deque_handler) - finalize(self, logger.removeHandler, self._deque_handler) - ########### # Stimuli # ########### @@ -4625,18 +4614,11 @@ def get_profile_metadata( raise gen.Return({"counts": counts, "keys": keys}) - def get_logs(self, comm=None, n=None): - deque_handler = self._deque_handler - if n is None: - L = list(deque_handler.deque) - else: - L = deque_handler.deque - L = [L[-i] for i in range(min(n, len(L)))] - return [(msg.levelname, deque_handler.format(msg)) for msg in L] - @gen.coroutine - def get_worker_logs(self, comm=None, n=None, workers=None): - results = yield self.broadcast(msg={"op": "get_logs", "n": n}, workers=workers) + def get_worker_logs(self, comm=None, n=None, workers=None, nanny=False): + results = yield self.broadcast( + msg={"op": "get_logs", "n": n}, workers=workers, nanny=nanny + ) raise gen.Return(results) ########### diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c731ae6e5a..dc45b3025e 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5111,7 +5111,7 @@ def test_task_metadata(c, s, a, b): assert result == {"a": {"c": {"d": 1}}, "b": 2} -@gen_cluster(client=True) +@gen_cluster(client=True, Worker=Nanny) def test_logs(c, s, a, b): yield wait(c.map(inc, range(5))) logs = yield c.get_scheduler_logs(n=5) @@ -5121,11 +5121,23 @@ def test_logs(c, s, a, b): assert "distributed.scheduler" in msg w_logs = yield c.get_worker_logs(n=5) - assert set(w_logs.keys()) == {a.address, b.address} + assert set(w_logs.keys()) == {a.worker_address, b.worker_address} for log in w_logs.values(): for _, msg in log: assert "distributed.worker" in msg + n_logs = yield c.get_worker_logs(nanny=True) + assert set(n_logs.keys()) == {a.worker_address, b.worker_address} + for log in n_logs.values(): + for _, msg in log: + assert "distributed.nanny" in msg + + n_logs = yield c.get_worker_logs(nanny=True, workers=[a.worker_address]) + assert set(n_logs.keys()) == {a.worker_address} + for log in n_logs.values(): + for _, msg in log: + assert "distributed.nanny" in msg + @gen_cluster(client=True) def test_avoid_delayed_finalize(c, s, a, b): diff --git a/distributed/worker.py b/distributed/worker.py index d0bc735ec6..37dcbc2eca 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -33,7 +33,7 @@ from .comm import get_address_host, get_local_address_for, connect from .comm.utils import offload from .comm.addressing import address_from_user_args -from .compatibility import unicode, get_thread_identity, finalize, MutableMapping +from .compatibility import unicode, get_thread_identity, MutableMapping from .core import error_message, CommClosedError, send_recv, pingpong, coerce_to_address from .diskutils import WorkSpace from .metrics import time @@ -60,7 +60,6 @@ json_load_robust, key_split, format_bytes, - DequeHandler, PeriodicCallback, parse_bytes, parse_timedelta, @@ -412,7 +411,7 @@ def __init__( ) profile_cycle_interval = parse_timedelta(profile_cycle_interval, default="ms") - self._setup_logging() + self._setup_logging(logger) if scheduler_file: cfg = json_load_robust(scheduler_file) @@ -666,16 +665,6 @@ def __repr__(self): ) ) - def _setup_logging(self): - self._deque_handler = DequeHandler( - n=dask.config.get("distributed.admin.log-length") - ) - self._deque_handler.setFormatter( - logging.Formatter(dask.config.get("distributed.admin.log-format")) - ) - logger.addHandler(self._deque_handler) - finalize(self, logger.removeHandler, self._deque_handler) - @property def worker_address(self): """ For API compatibility with Nanny """ @@ -888,15 +877,6 @@ def gather(self, comm=None, who_has=None): self.update_data(data=result, report=False) raise Return({"status": "OK"}) - def get_logs(self, comm=None, n=None): - deque_handler = self._deque_handler - if n is None: - L = list(deque_handler.deque) - else: - L = deque_handler.deque - L = [L[-i] for i in range(min(n, len(L)))] - return [(msg.levelname, deque_handler.format(msg)) for msg in L] - ############# # Lifecycle # ############# diff --git a/docs/source/api.rst b/docs/source/api.rst index e91c4ee6ac..574a70d34b 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -19,8 +19,8 @@ API Client.get_executor Client.get_metadata Client.get_scheduler_logs - Client.get_task_stream Client.get_worker_logs + Client.get_task_stream Client.has_what Client.list_datasets Client.map