Skip to content

Commit

Permalink
Review log-length configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 18, 2023
1 parent c1f4e15 commit 8a5a9d3
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 72 deletions.
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None):
self.byte_count = 0
self.next_deadline = None
self.recent_message_log = deque(
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
maxlen=dask.config.get("distributed.admin.log-length")
)
self.serializers = serializers
self._consecutive_failures = 0
Expand Down
10 changes: 7 additions & 3 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"allowed-failures": "distributed.scheduler.allowed-failures",
"bandwidth": "distributed.scheduler.bandwidth",
"default-data-size": "distributed.scheduler.default-data-size",
"transition-log-length": "distributed.scheduler.transition-log-length",
"work-stealing": "distributed.scheduler.work-stealing",
"worker-ttl": "distributed.scheduler.worker-ttl",
"multiprocessing-method": "distributed.worker.multiprocessing-method",
Expand All @@ -43,16 +42,21 @@
"tcp-timeout": "distributed.comm.timeouts.tcp",
"default-scheme": "distributed.comm.default-scheme",
"socket-backlog": "distributed.comm.socket-backlog",
"recent-messages-log-length": "distributed.comm.recent-messages-log-length",
"diagnostics-link": "distributed.dashboard.link",
"bokeh-export-tool": "distributed.dashboard.export-tool",
"tick-time": "distributed.admin.tick.interval",
"tick-maximum-delay": "distributed.admin.tick.limit",
"log-length": "distributed.admin.log-length",
"log-format": "distributed.admin.log-format",
"pdb-on-err": "distributed.admin.pdb-on-err",
"ucx": "distributed.comm.ucx",
"rmm": "distributed.rmm",
# log-length aliases
"transition-log-length": "distributed.admin.log-length",
"distributed.scheduler.transition-log-length": "distributed.admin.log-length",
"distributed.scheduler.events-log-length": "distributed.admin.log-length",
"log-length": "distributed.admin.log-length",
"recent-messages-log-length": "distributed.admin.log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.log-length",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
3 changes: 2 additions & 1 deletion distributed/deploy/adaptive_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tlz as toolz
from tornado.ioloop import IOLoop

import dask.config
from dask.utils import parse_timedelta

from distributed.compatibility import PeriodicCallback
Expand Down Expand Up @@ -135,7 +136,7 @@ async def _adapt():
# internal state
self.close_counts = defaultdict(int)
self._adapting = False
self.log = deque(maxlen=10000)
self.log = deque(maxlen=dask.config.get("distributed.admin.log-length"))

def stop(self) -> None:
logger.info("Adaptive stop")
Expand Down
37 changes: 3 additions & 34 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,6 @@ properties:
This can be helpful to reduce costs and stop zombie processes from roaming the earth.
transition-log-length:
type: integer
minimum: 0
description: |
How long should we keep the transition log
Every time a task transitions states (like "waiting", "processing", "memory", "released")
we record that transition in a log.
To make sure that we don't run out of memory
we will clear out old entries after a certain length.
This is that length.
events-log-length:
type: integer
minimum: 0
description: |
How long should we keep the events log
All events (e.g. worker heartbeat) are stored in the events log.
To make sure that we don't run out of memory
we will clear out old entries after a certain length.
This is that length.
work-stealing:
type: boolean
description: |
Expand Down Expand Up @@ -860,11 +835,6 @@ properties:
type: string
description: The default protocol to use, like tcp or tls

recent-messages-log-length:
type: integer
minimum: 0
description: number of messages to keep for debugging

tls:
type: object
properties:
Expand Down Expand Up @@ -1126,12 +1096,11 @@ properties:
If the traceback is larger than this size (in bytes) then we truncate it.
log-length:
type: integer
type: [integer, 'null']
minimum: 0
description: |
Default length of logs to keep in memory
The scheduler and workers keep the last 10000 or so log entries in memory.
Default length of logs to keep in memory. This is useful for debugging
cluster issues. Set to null for unlimited.
log-format:
type: string
Expand Down
5 changes: 1 addition & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ distributed:
# after they have been removed from the scheduler
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
transition-log-length: 100000
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
Expand Down Expand Up @@ -224,7 +222,6 @@ distributed:
offload: 10MiB # Size after which we choose to offload serialization to another thread
default-scheme: tcp
socket-backlog: 2048
recent-messages-log-length: 0 # number of messages to keep for debugging
ucx:
cuda-copy: null # enable cuda-copy
tcp: null # enable tcp
Expand Down Expand Up @@ -310,7 +307,7 @@ distributed:
cycle: 1s # time between checking event loop speed

max-error-length: 10000 # Maximum size traceback after error to return
log-length: 10000 # default length of logs to keep in memory
log-length: 1000 # default length of logs to keep in memory
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
pdb-on-err: False # enter debug mode on scheduling error
system-monitor:
Expand Down
7 changes: 6 additions & 1 deletion distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import tlz as toolz

import dask.config
from dask.typing import NoDefault, no_default
from dask.utils import format_time, parse_timedelta

from distributed.metrics import time
Expand Down Expand Up @@ -353,7 +355,7 @@ def watch(
thread_id: int | None = None,
interval: str = "20ms",
cycle: str = "2s",
maxlen: int = 1000,
maxlen: int | None | NoDefault = no_default,
omit: Collection[str] = (),
stop: Callable[[], bool] = lambda: False,
) -> deque[tuple[float, dict[str, Any]]]:
Expand Down Expand Up @@ -386,6 +388,9 @@ def watch(
- timestamp
- dict[str, Any] (output of ``create()``)
"""
if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.log-length")
assert isinstance(maxlen, int) or maxlen is None
log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen)

thread = threading.Thread(
Expand Down
8 changes: 3 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ class SchedulerState:

#: History of task state transitions.
#: The length can be tweaked through
#: distributed.scheduler.transition-log-length
#: distributed.admin.log-length
transition_log: deque[Transition]

#: Total number of transitions since the cluster was started
Expand Down Expand Up @@ -1721,7 +1721,7 @@ def __init__(
self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins}

self.transition_log = deque(
maxlen=dask.config.get("distributed.scheduler.transition-log-length")
maxlen=dask.config.get("distributed.admin.log-length")
)
self.transition_counter = 0
self._idle_transition_counter = 0
Expand Down Expand Up @@ -3657,9 +3657,7 @@ def __init__(
]

self.events = defaultdict(
partial(
deque, maxlen=dask.config.get("distributed.scheduler.events-log-length")
)
partial(deque, maxlen=dask.config.get("distributed.admin.log-length"))
)
self.event_counts = defaultdict(int)
self.event_subscriber = defaultdict(set)
Expand Down
3 changes: 2 additions & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def __init__(self, scheduler: Scheduler):
)
# `callback_time` is in milliseconds
self.scheduler.add_plugin(self)
self.scheduler.events["stealing"] = deque(maxlen=100000)
maxlen = dask.config.get("distributed.admin.log-length")
self.scheduler.events["stealing"] = deque(maxlen=maxlen)
self.count = 0
self.in_flight = {}
self.in_flight_occupancy = defaultdict(lambda: 0)
Expand Down
16 changes: 13 additions & 3 deletions distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import psutil

import dask
import dask.config
from dask.typing import NoDefault, no_default
from dask.utils import parse_timedelta

from distributed.compatibility import WINDOWS
from distributed.diagnostics import nvml
Expand Down Expand Up @@ -38,14 +40,22 @@ class SystemMonitor:
# distributed.admin.system_monitor.interval = 500ms
def __init__(
self,
maxlen: int | None = 7200,
maxlen: int | None | NoDefault = no_default,
monitor_disk_io: bool | None = None,
monitor_host_cpu: bool | None = None,
monitor_gil_contention: bool | None = None,
):
self.proc = psutil.Process()
self.count = 0

if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.log-length")
if isinstance(maxlen, int):
maxlen = max(1, maxlen)
elif maxlen is not None: # pragma: nocover
raise TypeError(f"maxlen must be int or None; got {maxlen!r}")
self.maxlen = maxlen

self.last_time = monotonic()

self.quantities = {
Expand Down Expand Up @@ -110,7 +120,7 @@ def __init__(
raw_interval = dask.config.get(
"distributed.admin.system-monitor.gil.interval",
)
interval = dask.utils.parse_timedelta(raw_interval, default="us") * 1e6
interval = parse_timedelta(raw_interval, default="us") * 1e6

self._gilknocker = KnockKnock(polling_interval_micros=int(interval))
self._gilknocker.start()
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def stop():
return time() > start + 0.500

try:
log = watch(interval="10ms", cycle="50ms", stop=stop)
log = watch(interval="10ms", cycle="50ms", stop=stop, maxlen=10_000)

stop_called.wait(2)
sleep(0.5)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3012,15 +3012,15 @@ async def test_retire_state_change(c, s, a, b):
await asyncio.gather(*coros)


@gen_cluster(client=True, config={"distributed.scheduler.events-log-length": 3})
@gen_cluster(client=True, config={"distributed.admin.log-length": 3})
async def test_configurable_events_log_length(c, s, a, b):
s.log_event("test", "dummy message 1")
assert len(s.events["test"]) == 1
s.log_event("test", "dummy message 2")
s.log_event("test", "dummy message 3")
assert len(s.events["test"]) == 3

# adding a forth message will drop the first one and length stays at 3
# adding a fourth message will drop the first one and length stays at 3
s.log_event("test", "dummy message 4")
assert len(s.events["test"]) == 3
assert s.events["test"][0][1] == "dummy message 2"
Expand Down
18 changes: 17 additions & 1 deletion distributed/tests/test_system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def test_SystemMonitor():
sm = SystemMonitor()
sm = SystemMonitor(maxlen=5)

# __init__ calls update()
a = sm.recent()
Expand All @@ -34,6 +34,22 @@ def test_SystemMonitor():
assert "cpu" in repr(sm)


def test_maxlen_zero():
"""maxlen is floored to 1 otherwise recent() would not work"""
sm = SystemMonitor(maxlen=0)
sm.update()
sm.update()
assert len(sm.quantities["memory"]) == 1
assert sm.recent()["memory"] == sm.quantities["memory"][-1]


def test_maxlen_omit():
sm = SystemMonitor()
sm.update()
sm.update()
assert len(sm.quantities["memory"]) > 0


def test_count():
sm = SystemMonitor(maxlen=5)
assert sm.count == 1
Expand Down
6 changes: 1 addition & 5 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3296,11 +3296,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a)
assert not any("missing-dep" in msg for msg in f2_story)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.comm.recent-messages-log-length": 1000},
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_gather_dep_no_longer_in_flight_tasks(c, s, a):
async with BlockedGatherDep(s.address) as b:
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
Expand Down
10 changes: 6 additions & 4 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,7 @@ def config_for_cluster_tests(**extra_config):
{
"local_directory": tempfile.gettempdir(),
"distributed.admin.tick.interval": "500 ms",
"distributed.admin.log-length": 10_000,
"distributed.scheduler.validate": True,
"distributed.worker.validate": True,
"distributed.worker.profile.enabled": False,
Expand Down Expand Up @@ -2452,10 +2453,11 @@ async def wait_for_stimulus(
@pytest.fixture
def ws():
"""An empty WorkerState"""
state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000)
yield state
if state.validate:
state.validate_state()
with dask.config.set({"distributed.admin.log-length": 10_000}):
state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000)
yield state
if state.validate:
state.validate_state()


@pytest.fixture(params=["executing", "long-running"])
Expand Down
9 changes: 5 additions & 4 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,15 +567,16 @@ def __init__(
self.active_threads = {}
self.active_keys = set()
self.profile_keys = defaultdict(profile.create)
self.profile_keys_history = deque(maxlen=3600)
maxlen = dask.config.get("distributed.admin.log-length")
self.profile_keys_history = deque(maxlen=maxlen)
self.profile_history = deque(maxlen=maxlen)
self.profile_recent = profile.create()
self.profile_history = deque(maxlen=3600)

if validate is None:
validate = dask.config.get("distributed.worker.validate")

self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_incoming_log = deque(maxlen=maxlen)
self.transfer_outgoing_log = deque(maxlen=maxlen)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes_total = 0
self.transfer_outgoing_bytes = 0
Expand Down
5 changes: 3 additions & 2 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,9 @@ def __init__(
self.executed_count = 0
self.long_running = set()
self.transfer_message_bytes_limit = transfer_message_bytes_limit
self.log = deque(maxlen=100_000)
self.stimulus_log = deque(maxlen=10_000)
maxlen = dask.config.get("distributed.admin.log-length")
self.log = deque(maxlen=maxlen)
self.stimulus_log = deque(maxlen=maxlen)
self.task_counter = TaskCounter()
self.transition_counter = 0
self.transition_counter_max = transition_counter_max
Expand Down

0 comments on commit 8a5a9d3

Please sign in to comment.