Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
help="A unique name for this worker like 'worker-1'")
@click.option('--memory-limit', default='auto',
help="Bytes of memory that the worker can use. "
"This can be an integer (bytes) "
"float (fraction of total system memory) "
"or 'auto'")
"This can be an integer (bytes), "
"float (fraction of total system memory), "
"'auto', or zero for no memory management")
@click.option('--reconnect/--no-reconnect', default=True,
help="Reconnect to scheduler if disconnected")
@click.option('--nanny/--no-nanny', default=True,
Expand Down
17 changes: 17 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,3 +1070,20 @@ def test_deque_handler():
msg = deque_handler.deque[-1]
assert 'distributed.worker' in deque_handler.format(msg)
assert any(msg.msg == 'foo456' for msg in deque_handler.deque)


@gen_cluster(ncores=[], client=True)
def test_avoid_memory_monitor_if_zero_limit(c, s):
worker = Worker(s.address, loop=s.loop, memory_limit=0,
memory_monitor_interval=10)
yield worker._start()
assert type(worker.data) is dict
assert 'memory' not in worker.periodic_callbacks

future = c.submit(inc, 1)
assert (yield future) == 2
yield gen.sleep(worker.memory_monitor_interval / 1000)

yield c.submit(inc, 2) # worker doesn't pause

yield worker._close()
12 changes: 7 additions & 5 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ def __init__(self, scheduler_ip=None, scheduler_port=None,
self.periodic_callbacks['heartbeat'] = pc
self._address = contact_address

self._memory_monitoring = False
pc = PeriodicCallback(self.memory_monitor,
self.memory_monitor_interval)
self.periodic_callbacks['memory'] = pc
if self.memory_limit:
self._memory_monitoring = False
pc = PeriodicCallback(self.memory_monitor,
self.memory_monitor_interval)
self.periodic_callbacks['memory'] = pc
self._throttled_gc = ThrottledGC(logger=logger)

@property
Expand Down Expand Up @@ -1019,7 +1020,8 @@ class Worker(WorkerBase):
heartbeat_interval: int
Milliseconds between heartbeats to scheduler
memory_limit: int
Number of bytes of memory that this worker should use
Number of bytes of memory that this worker should use.
Set to zero for no limit
memory_target_fraction: float
Fraction of memory to try to stay beneath
memory_spill_fraction: float
Expand Down