diff --git a/distributed/worker.py b/distributed/worker.py index 734c2d7e51d..4fdaef5ee9e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -196,6 +196,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.memory_monitor_interval, io_loop=self.loop) self.periodic_callbacks['memory'] = pc + self._throttled_gc = ThrottledGC(logger=logger) @property def worker_address(self): @@ -1089,7 +1090,6 @@ def __init__(self, *args, **kwargs): self.log = deque(maxlen=100000) self.validate = kwargs.pop('validate', False) - self.gc = ThrottledGC(logger=logger) self._transitions = { ('waiting', 'ready'): self.transition_waiting_ready, @@ -2180,6 +2180,8 @@ def memory_monitor(self): # Pause worker threads if above 80% memory use if self.memory_pause_fraction and frac > self.memory_pause_fraction: + # Try to free some memory while in paused state + self._throttled_gc.collect() if not self.paused: logger.warn("Worker is at %d%% memory usage. Pausing worker. " "Process memory: %s -- Worker memory limit: %s", @@ -2220,7 +2222,7 @@ def memory_monitor(self): # Issue a GC to ensure that the evicted data is actually # freed from memory and taken into account by the monitor # before trying to evict even more data. - self.gc.collect() + self._throttled_gc.collect() memory = proc.memory_info().rss if count: logger.debug("Moved %d pieces of data data and %s to disk",