Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nanny logs #2744

Merged
merged 3 commits into from Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions distributed/client.py
Expand Up @@ -2270,6 +2270,10 @@ def run(self, function, *args, **kwargs):
wait: boolean (optional)
If the function is asynchronous whether or not to wait until that
function finishes.
nanny : bool, defualt False
Whether to run ``function`` on the nanny. By default, the function
is run on the worker process. If specified, the addresses in
``workers`` should still be the worker addresses, not the nanny addresses.

Examples
--------
Expand Down Expand Up @@ -3354,7 +3358,7 @@ def get_scheduler_logs(self, n=None):

Parameters
----------
n: int
n : int
Number of logs to retrive. Maxes out at 10000 by default,
confiruable in config.yaml::log-length

Expand All @@ -3364,23 +3368,27 @@ def get_scheduler_logs(self, n=None):
"""
return self.sync(self.scheduler.logs, n=n)

def get_worker_logs(self, n=None, workers=None):
def get_worker_logs(self, n=None, workers=None, nanny=False):
""" Get logs from workers

Parameters
----------
n: int
n : int
Number of logs to retrive. Maxes out at 10000 by default,
confiruable in config.yaml::log-length
workers: iterable
List of worker addresses to retrive. Gets all workers by default.
workers : iterable
List of worker addresses to retrieve. Gets all workers by default.
nanny : bool, default False
Whether to get the logs from the workers (False) or the nannies (True). If
specified, the addresses in `workers` should still be the worker addresses,
not the nanny addresses.

Returns
-------
Dictionary mapping worker address to logs.
Logs are returned in reversed order (newest first)
"""
return self.sync(self.scheduler.worker_logs, n=n, workers=workers)
return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)

def retire_workers(self, workers=None, close_workers=True, **kwargs):
""" Retire certain workers on the scheduler
Expand Down
2 changes: 2 additions & 0 deletions distributed/nanny.py
Expand Up @@ -79,6 +79,7 @@ def __init__(
protocol=None,
**worker_kwargs
):
self._setup_logging(logger)
self.loop = loop or IOLoop.current()
self.security = security or Security()
assert isinstance(self.security, Security)
Expand Down Expand Up @@ -130,6 +131,7 @@ def __init__(
"kill": self.kill,
"restart": self.restart,
# cannot call it 'close' on the rpc side for naming conflict
"get_logs": self.get_logs,
"terminate": self.close,
"close_gracefully": self.close_gracefully,
"run": self.run,
Expand Down
24 changes: 23 additions & 1 deletion distributed/node.py
@@ -1,12 +1,15 @@
from __future__ import print_function, division, absolute_import

import warnings
import logging

from tornado.ioloop import IOLoop
import dask

from .compatibility import unicode
from .compatibility import unicode, finalize
from .core import Server, ConnectionPool
from .versions import get_versions
from .utils import DequeHandler


class Node(object):
Expand Down Expand Up @@ -131,6 +134,25 @@ def stop_services(self):
def service_ports(self):
return {k: v.port for k, v in self.services.items()}

def _setup_logging(self, logger):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

async def __aenter__(self):
await self
return self
Expand Down
28 changes: 5 additions & 23 deletions distributed/scheduler.py
Expand Up @@ -52,7 +52,6 @@
key_split,
validate_key,
no_default,
DequeHandler,
parse_timedelta,
parse_bytes,
PeriodicCallback,
Expand Down Expand Up @@ -843,7 +842,7 @@ def __init__(
dashboard_address=None,
**kwargs
):
self._setup_logging()
self._setup_logging(logger)

# Attributes
self.allowed_failures = allowed_failures
Expand Down Expand Up @@ -1327,16 +1326,6 @@ def close_worker(self, stream=None, worker=None, safe=None):
self.worker_send(worker, {"op": "close", "report": False})
self.remove_worker(address=worker, safe=safe)

def _setup_logging(self):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

###########
# Stimuli #
###########
Expand Down Expand Up @@ -4625,18 +4614,11 @@ def get_profile_metadata(

raise gen.Return({"counts": counts, "keys": keys})

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

@gen.coroutine
def get_worker_logs(self, comm=None, n=None, workers=None):
results = yield self.broadcast(msg={"op": "get_logs", "n": n}, workers=workers)
def get_worker_logs(self, comm=None, n=None, workers=None, nanny=False):
results = yield self.broadcast(
msg={"op": "get_logs", "n": n}, workers=workers, nanny=nanny
)
raise gen.Return(results)

###########
Expand Down
16 changes: 14 additions & 2 deletions distributed/tests/test_client.py
Expand Up @@ -5111,7 +5111,7 @@ def test_task_metadata(c, s, a, b):
assert result == {"a": {"c": {"d": 1}}, "b": 2}


@gen_cluster(client=True)
@gen_cluster(client=True, Worker=Nanny)
def test_logs(c, s, a, b):
yield wait(c.map(inc, range(5)))
logs = yield c.get_scheduler_logs(n=5)
Expand All @@ -5121,11 +5121,23 @@ def test_logs(c, s, a, b):
assert "distributed.scheduler" in msg

w_logs = yield c.get_worker_logs(n=5)
assert set(w_logs.keys()) == {a.address, b.address}
assert set(w_logs.keys()) == {a.worker_address, b.worker_address}
for log in w_logs.values():
for _, msg in log:
assert "distributed.worker" in msg

n_logs = yield c.get_worker_logs(nanny=True)
assert set(n_logs.keys()) == {a.worker_address, b.worker_address}
for log in n_logs.values():
for _, msg in log:
assert "distributed.nanny" in msg

n_logs = yield c.get_worker_logs(nanny=True, workers=[a.worker_address])
assert set(n_logs.keys()) == {a.worker_address}
for log in n_logs.values():
for _, msg in log:
assert "distributed.nanny" in msg


@gen_cluster(client=True)
def test_avoid_delayed_finalize(c, s, a, b):
Expand Down
24 changes: 2 additions & 22 deletions distributed/worker.py
Expand Up @@ -33,7 +33,7 @@
from .comm import get_address_host, get_local_address_for, connect
from .comm.utils import offload
from .comm.addressing import address_from_user_args
from .compatibility import unicode, get_thread_identity, finalize, MutableMapping
from .compatibility import unicode, get_thread_identity, MutableMapping
from .core import error_message, CommClosedError, send_recv, pingpong, coerce_to_address
from .diskutils import WorkSpace
from .metrics import time
Expand All @@ -60,7 +60,6 @@
json_load_robust,
key_split,
format_bytes,
DequeHandler,
PeriodicCallback,
parse_bytes,
parse_timedelta,
Expand Down Expand Up @@ -412,7 +411,7 @@ def __init__(
)
profile_cycle_interval = parse_timedelta(profile_cycle_interval, default="ms")

self._setup_logging()
self._setup_logging(logger)

if scheduler_file:
cfg = json_load_robust(scheduler_file)
Expand Down Expand Up @@ -666,16 +665,6 @@ def __repr__(self):
)
)

def _setup_logging(self):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

@property
def worker_address(self):
""" For API compatibility with Nanny """
Expand Down Expand Up @@ -888,15 +877,6 @@ def gather(self, comm=None, who_has=None):
self.update_data(data=result, report=False)
raise Return({"status": "OK"})

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

#############
# Lifecycle #
#############
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api.rst
Expand Up @@ -19,8 +19,8 @@ API
Client.get_executor
Client.get_metadata
Client.get_scheduler_logs
Client.get_task_stream
Client.get_worker_logs
Client.get_task_stream
Client.has_what
Client.list_datasets
Client.map
Expand Down