Hi,
I'm not positive that this is an issue with dask, but I've run out of ideas for getting this working which makes me think it might be.
I'm using dask with dask.distributed on NERSC/Cori, which uses slurm for job management. As a test (to make sure I understood how it worked), I allocated 8 nodes (128GB of memory each) and ran the following:
srun -n 8 dask-mpi --nthreads 1 --scheduler-file dask_tmp/scheduler.json --memory-limit 0.9 &
My understanding is that this produces 8 workers, one per node, and limits them each to 90% of that node's memory. I then ran my test script:
which contains:
from dask.distributed import Client
client = Client(scheduler_file='dask_tmp/scheduler.json')
print('Succeeded in connecting to schdeuler.')
import dask
import dask.array as da
# Build a big array
df = da.random.normal(0, 1, size=(100000,400000),chunks=(4000,4000))
print(df)
print('Built large array. No computation yet though.')
# Hold this in memory till we're done
df_future = client.persist(df)
print(df_future)
print('Array persisted into memory')
# Do a calculation
future = client.compute(df.sum())
print(future.result())
print('Succeeded in performing a sum over a random array!')
The beginning of the output comes from dask-mpi, of the form
distributed.worker - INFO - Start worker at: (IP here)
distributed.worker - INFO - Listening to: (Port here)
distributed.worker - INFO - bokeh at: (Port here)
distributed.worker - INFO - Waiting to connect to: (IP here)
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 121.56 GB
distributed.worker - INFO - Local Directory: worker-mkn60gzb
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register (IP here)
distributed.worker - INFO - Registered to: (IP here)
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, (IP here)
distributed.worker - INFO - Start worker at: (IP here)
distributed.worker - INFO - Listening to: (Port here)
distributed.worker - INFO - bokeh at: (Port here)
distributed.worker - INFO - Waiting to connect to: (IP here)
distributed.worker - INFO - -------------------------------------------------
Counting up all of the worker messages it appears that its does in fact spawn 8 workers each on different nodes.
I've tried this with varying sizes of the random array. For shape (1e5,1e5) this works without issue. When I push the shape up to (4e5,4e5) it fails. I first get messages of the form
distributed.core - WARNING - Event loop was unresponsive for 1.18s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
followed by
distributed.worker - INFO - Failed to put key in memory
Traceback (most recent call last):
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/distributed/worker.py", line 1515, in transition_executing_done
self.put_key_in_memory(key, value, transition=False)
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/distributed/worker.py", line 1683, in put_key_in_memory
self.data[key] = value
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/buffer.py", line 80, in __setitem__
self.fast[key] = value
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/lru.py", line 71, in __setitem__
self.evict()
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/lru.py", line 90, in evict
cb(k, v)
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/buffer.py", line 52, in fast_to_slow
self.slow[key] = value
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/func.py", line 42, in __setitem__
self.d[key] = self.dump(value)
File "/usr/common/software/python/3.6-anaconda-4.4/lib/python3.6/site-packages/zict/file.py", line 83, in __setitem__
f.write(v)
OSError: [Errno 122] Disk quota exceeded
My understanding is that the final line is because it's trying to exceed the memory limit on one of the nodes, so it tries to write out to disk and then hits my disk quota.
What I'm trying to determine is why it exceeds the memory limit on a node. The shape (1e5,1e5) corresponds to ~18GB for a standard float. (4e5,4e5) yields ~300GB. But across 8 nodes I've got much more memory than that (~1TB). I've tried a few other configurations and the cutoff appears to be around the size of a single node's memory, which suggests that dask isn't actually distributing this array (unless I've misunderstood how dask manages memory across nodes?).
Any help would be appreciated (or if this is a known issue that would be useful to know!).
Many thanks,
Adam
Hi,
I'm not positive that this is an issue with dask, but I've run out of ideas for getting this working which makes me think it might be.
I'm using dask with dask.distributed on NERSC/Cori, which uses slurm for job management. As a test (to make sure I understood how it worked), I allocated 8 nodes (128GB of memory each) and ran the following:
My understanding is that this produces 8 workers, one per node, and limits them each to 90% of that node's memory. I then ran my test script:
which contains:
The beginning of the output comes from dask-mpi, of the form
Counting up all of the worker messages it appears that its does in fact spawn 8 workers each on different nodes.
I've tried this with varying sizes of the random array. For shape (1e5,1e5) this works without issue. When I push the shape up to (4e5,4e5) it fails. I first get messages of the form
followed by
My understanding is that the final line is because it's trying to exceed the memory limit on one of the nodes, so it tries to write out to disk and then hits my disk quota.
What I'm trying to determine is why it exceeds the memory limit on a node. The shape (1e5,1e5) corresponds to ~18GB for a standard float. (4e5,4e5) yields ~300GB. But across 8 nodes I've got much more memory than that (~1TB). I've tried a few other configurations and the cutoff appears to be around the size of a single node's memory, which suggests that dask isn't actually distributing this array (unless I've misunderstood how dask manages memory across nodes?).
Any help would be appreciated (or if this is a known issue that would be useful to know!).
Many thanks,
Adam