Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 18, 2023
1 parent 16f9c75 commit 24eb71d
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 36 deletions.
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None):
self.byte_count = 0
self.next_deadline = None
self.recent_message_log = deque(
maxlen=dask.config.get("distributed.admin.log-length")
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)
self.serializers = serializers
self._consecutive_failures = 0
Expand Down
14 changes: 7 additions & 7 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@
"bokeh-export-tool": "distributed.dashboard.export-tool",
"tick-time": "distributed.admin.tick.interval",
"tick-maximum-delay": "distributed.admin.tick.limit",
"log-length": "distributed.admin.log-length",
"log-format": "distributed.admin.log-format",
"pdb-on-err": "distributed.admin.pdb-on-err",
"ucx": "distributed.comm.ucx",
"rmm": "distributed.rmm",
# log-length aliases
"transition-log-length": "distributed.admin.log-length",
"distributed.scheduler.transition-log-length": "distributed.admin.log-length",
"distributed.scheduler.events-log-length": "distributed.admin.log-length",
"log-length": "distributed.admin.log-length",
"recent-messages-log-length": "distributed.admin.log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.log-length",
# low-level-log-length aliases
"transition-log-length": "distributed.admin.low-level-log-length",
"distributed.scheduler.transition-log-length": "distributed.admin.low-level-log-length",
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
"recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
4 changes: 3 additions & 1 deletion distributed/deploy/adaptive_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ async def _adapt():
# internal state
self.close_counts = defaultdict(int)
self._adapting = False
self.log = deque(maxlen=dask.config.get("distributed.admin.log-length"))
self.log = deque(
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)

def stop(self) -> None:
logger.info("Adaptive stop")
Expand Down
20 changes: 18 additions & 2 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1099,15 +1099,24 @@ properties:
type: [integer, 'null']
minimum: 0
description: |
Default length of logs to keep in memory. This is useful for debugging
cluster issues. Set to null for unlimited.
Maximum length of worker/scheduler logs to keep in memory.
They can be retrieved with get_scheduler_logs() / get_worker_logs().
Set to null for unlimited.
log-format:
type: string
description: |
The log format to emit.
See https://docs.python.org/3/library/logging.html#logrecord-attributes
low-level-log-length:
type: [integer, 'null']
minimum: 0
description: |
Maximum length of various event logs for developers.
Set to null for unlimited.
event-loop:
type: string
description: |
Expand All @@ -1127,6 +1136,13 @@ properties:
interval:
type: string
description: Polling time to query cpu/memory statistics default 500ms
log-length:
type: [ integer, 'null' ]
minimum: 0
description: |
Maximum number of samples to keep in memory.
Multiply by `interval` to obtain log duration.
Set to null for unlimited.
disk:
type: boolean
description: Should we include disk metrics? (they can cause issues in some systems)
Expand Down
4 changes: 3 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,13 @@ distributed:
cycle: 1s # time between checking event loop speed

max-error-length: 10000 # Maximum size traceback after error to return
log-length: 1000 # default length of logs to keep in memory
log-length: 10000 # Maximum length of worker/scheduler logs to keep in memory
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
low-level-log-length: 1000 # Maximum length of various logs for developers
pdb-on-err: False # enter debug mode on scheduling error
system-monitor:
interval: 500ms
log-length: 7200 # Maximum number of samples to keep in memory
disk: true # Monitor host-wide disk I/O
host-cpu: false # Monitor host-wide CPU usage, with very granular breakdown
gil:
Expand Down
2 changes: 1 addition & 1 deletion distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def watch(
- dict[str, Any] (output of ``create()``)
"""
if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.log-length")
maxlen = dask.config.get("distributed.admin.low-level-log-length")
assert isinstance(maxlen, int) or maxlen is None
log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen)

Expand Down
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ class SchedulerState:

#: History of task state transitions.
#: The length can be tweaked through
#: distributed.admin.log-length
#: distributed.admin.low-level-log-length
transition_log: deque[Transition]

#: Total number of transitions since the cluster was started
Expand Down Expand Up @@ -1721,7 +1721,7 @@ def __init__(
self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins}

self.transition_log = deque(
maxlen=dask.config.get("distributed.admin.log-length")
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)
self.transition_counter = 0
self._idle_transition_counter = 0
Expand Down Expand Up @@ -3656,9 +3656,8 @@ def __init__(
aliases,
]

self.events = defaultdict(
partial(deque, maxlen=dask.config.get("distributed.admin.log-length"))
)
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.events = defaultdict(partial(deque, maxlen=maxlen))
self.event_counts = defaultdict(int)
self.event_subscriber = defaultdict(set)
self.worker_plugins = {}
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self, scheduler: Scheduler):
)
# `callback_time` is in milliseconds
self.scheduler.add_plugin(self)
maxlen = dask.config.get("distributed.admin.log-length")
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.scheduler.events["stealing"] = deque(maxlen=maxlen)
self.count = 0
self.in_flight = {}
Expand Down
4 changes: 1 addition & 3 deletions distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class SystemMonitor:
gpu_name: str | None
gpu_memory_total: int

# Defaults to 1h capture time assuming the default
# distributed.admin.system_monitor.interval = 500ms
def __init__(
self,
maxlen: int | None | NoDefault = no_default,
Expand All @@ -49,7 +47,7 @@ def __init__(
self.count = 0

if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.log-length")
maxlen = dask.config.get("distributed.admin.system-monitor.log-length")
if isinstance(maxlen, int):
maxlen = max(1, maxlen)
elif maxlen is not None: # pragma: nocover
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def stop():
return time() > start + 0.500

try:
log = watch(interval="10ms", cycle="50ms", stop=stop, maxlen=10_000)
log = watch(interval="10ms", cycle="50ms", stop=stop)

stop_called.wait(2)
sleep(0.5)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3012,7 +3012,7 @@ async def test_retire_state_change(c, s, a, b):
await asyncio.gather(*coros)


@gen_cluster(client=True, config={"distributed.admin.log-length": 3})
@gen_cluster(client=True, config={"distributed.admin.low-level-log-length": 3})
async def test_configurable_events_log_length(c, s, a, b):
s.log_event("test", "dummy message 1")
assert len(s.events["test"]) == 1
Expand Down
9 changes: 1 addition & 8 deletions distributed/tests/test_system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def test_SystemMonitor():
sm = SystemMonitor(maxlen=5)
sm = SystemMonitor()

# __init__ calls update()
a = sm.recent()
Expand Down Expand Up @@ -43,13 +43,6 @@ def test_maxlen_zero():
assert sm.recent()["memory"] == sm.quantities["memory"][-1]


def test_maxlen_omit():
sm = SystemMonitor()
sm.update()
sm.update()
assert len(sm.quantities["memory"]) > 0


def test_count():
sm = SystemMonitor(maxlen=5)
assert sm.count == 1
Expand Down
5 changes: 3 additions & 2 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,8 @@ def config_for_cluster_tests(**extra_config):
{
"local_directory": tempfile.gettempdir(),
"distributed.admin.tick.interval": "500 ms",
"distributed.admin.log-length": 10_000,
"distributed.admin.log-length": None,
"distributed.admin.low-level-log-length": None,
"distributed.scheduler.validate": True,
"distributed.worker.validate": True,
"distributed.worker.profile.enabled": False,
Expand Down Expand Up @@ -2453,7 +2454,7 @@ async def wait_for_stimulus(
@pytest.fixture
def ws():
"""An empty WorkerState"""
with dask.config.set({"distributed.admin.log-length": 10_000}):
with dask.config.set({"distributed.admin.low-level-log-length": None}):
state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000)
yield state
if state.validate:
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def __init__(
self.active_threads = {}
self.active_keys = set()
self.profile_keys = defaultdict(profile.create)
maxlen = dask.config.get("distributed.admin.log-length")
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.profile_keys_history = deque(maxlen=maxlen)
self.profile_history = deque(maxlen=maxlen)
self.profile_recent = profile.create()
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ def __init__(
self.executed_count = 0
self.long_running = set()
self.transfer_message_bytes_limit = transfer_message_bytes_limit
maxlen = dask.config.get("distributed.admin.log-length")
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.log = deque(maxlen=maxlen)
self.stimulus_log = deque(maxlen=maxlen)
self.task_counter = TaskCounter()
Expand Down

0 comments on commit 24eb71d

Please sign in to comment.