I'm running a single-machine cluster and try to load and process a movie larger than memory. However I seem to have memory leaking issue at various steps, especially when doing some aggregation. So I create the following simple script to track it down. Surprisingly even the first step in my workflow dask.array.image.imread seem to be enough to trigger some memory leak:
import os
import dask
import numpy as np
from dask.distributed import LocalCluster, Client, fire_and_forget
from dask.array.image import imread
cluster = LocalCluster(diagnostics_port=8989, memory_limit="200MB")
client = Client(cluster)
/opt/miniconda3/envs/dask/lib/python3.7/importlib/_bootstrap.py:219: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88
return f(*args, **kwds)
/opt/miniconda3/envs/dask/lib/python3.7/importlib/_bootstrap.py:219: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88
return f(*args, **kwds)
array = dask.array.zeros((20000, 500, 800), chunks=(1, -1, -1))
array
dask.array<zeros, shape=(20000, 500, 800), dtype=float64, chunksize=(1, 500, 800)>
#this is perfectly fine despite that the array is larger than my real movie
arr_sum = client.compute(array.sum())
arr_sum.result()
#the folder contains ~18000 tiff files, but to isolate the issue I'm not actually reading them
dpath = "/home/phild/Documents/test_data/"
def dummy_read(im):
return np.zeros((480, 752))
array = imread(os.path.join(dpath, "*.tiff"), dummy_read)
array
dask.array<imread, shape=(17991, 480, 752), dtype=float64, chunksize=(1, 480, 752)>
arr_sum = client.compute(array.sum())
arr_sum.result()
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 12465 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 12463 was killed by signal 15
distributed.nanny - WARNING - Worker process 12484 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 12474 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 12479 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 12477 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process 12468 was killed by signal 15
distributed.nanny - WARNING - Worker process 12472 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<ipython-input-5-12b03eff355b> in <module>
1 arr_sum = client.compute(array.sum())
----> 2 arr_sum.result()
/opt/miniconda3/envs/dask/lib/python3.7/site-packages/distributed/client.py in result(self, timeout)
193 raiseit=False)
194 if self.status == 'error':
--> 195 six.reraise(*result)
196 elif self.status == 'cancelled':
197 raise result
/opt/miniconda3/envs/dask/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
KilledWorker: ("('imread-sum-22b1dc8c8dbc99d865bdc52557ca4d52', 5415, 0, 0)", 'tcp://127.0.0.1:44369')
The last sum produce memory leak and killed all my workers despite it's smaller than the array created by dask.array.zeros, and I have something like the following in my worker logs:
distributed.worker - DEBUG - Calling gc.collect(). 3.435s elapsed since previous call.
distributed.worker - DEBUG - gc.collect() took 0.089s
distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 161.94 MB -- Worker memory limit: 200.00 MB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 161.94 MB -- Worker memory limit: 200.00 MB
distributed.worker - DEBUG - Heartbeat skipped: channel busy
distributed.worker - DEBUG - Heartbeat: tcp://127.0.0.1:32873
distributed.worker - DEBUG - gc.collect() lasts 0.089s but only 0.137s elapsed since last call: throttling.
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 162.17 MB -- Worker memory limit: 200.00 MB
distributed.worker - DEBUG - gc.collect() lasts 0.089s but only 0.144s elapsed since last call: throttling.
I understand that 200MB is probably not a reasonable memory limit for data of this size, but I suppose dask should handle it even under such extreme cases by spilling everything to disk (in fact I think that was what happened when we create array with dask.array.zeros), instead of complaining about memory leak?
If this is a expected behavior, however, I'd like to understand what is a rule of thumb for estimating the minimal required memory when performing such tasks?
I'm running a single-machine cluster and try to load and process a movie larger than memory. However I seem to have memory leaking issue at various steps, especially when doing some aggregation. So I create the following simple script to track it down. Surprisingly even the first step in my workflow
dask.array.image.imreadseem to be enough to trigger some memory leak:The last sum produce memory leak and killed all my workers despite it's smaller than the array created by
dask.array.zeros, and I have something like the following in my worker logs:I understand that 200MB is probably not a reasonable memory limit for data of this size, but I suppose dask should handle it even under such extreme cases by spilling everything to disk (in fact I think that was what happened when we create
arraywithdask.array.zeros), instead of complaining about memory leak?If this is a expected behavior, however, I'd like to understand what is a rule of thumb for estimating the minimal required memory when performing such tasks?