Array traps and optimisation
============================

First let's start 3 workers

In [1]:
import dask.array as da
from dask.distributed import Client, progress
client = Client(
    processes=False,
    n_workers=3,
    threads_per_worker=2,
)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.45.60.116:8787/status,

0,1
Dashboard: http://10.45.60.116:8787/status,Workers: 3
Total threads: 6,Total memory: 31.04 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.45.60.116/108249/1,Workers: 3
Dashboard: http://10.45.60.116:8787/status,Total threads: 6
Started: Just now,Total memory: 31.04 GiB

0,1
Comm: inproc://10.45.60.116/108249/4,Total threads: 2
Dashboard: http://10.45.60.116:39725/status,Memory: 10.35 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-lg19xjs8,Local directory: /tmp/dask-scratch-space/worker-lg19xjs8

0,1
Comm: inproc://10.45.60.116/108249/6,Total threads: 2
Dashboard: http://10.45.60.116:42463/status,Memory: 10.35 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-4qq6ob27,Local directory: /tmp/dask-scratch-space/worker-4qq6ob27

0,1
Comm: inproc://10.45.60.116/108249/8,Total threads: 2
Dashboard: http://10.45.60.116:38063/status,Memory: 10.35 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-o77ght73,Local directory: /tmp/dask-scratch-space/worker-o77ght73


2025-04-30 13:45:58,547 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-4a954583e6cd3dbdcfd0c367a958c894',): 'forgotten'}
2025-04-30 13:45:58,939 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-4a954583e6cd3dbdcfd0c367a958c894',): 'forgotten'}
2025-04-30 13:46:00,112 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-4a954583e6cd3dbdcfd0c367a958c894',): 'forgotten'}
2025-04-30 13:46:00,238 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-4a954583e6cd3dbdcfd0c367a958c894',): 'forgotten'}
2025-04-30 13:46:02,275 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-4a954583e6cd3dbdcfd0c367a958c894',): 'forgotten'}
2025-04-30 13:46:03,762 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-b55c96cf7c4c6492adf3f82151fba319',): 'forgotten'}
2025-04-30 13:46:04,210 - distributed.scheduler - ERROR - Couldn't gather keys: {('sum-aggregate-b55c96cf7c4c6492adf3f

Chunks size
-----------

- Too large chunks don't split work efficiently.
- Too small and too much time is lost in communication and other overheads.
- 100Mb~1Gb per chunks is usually good. Scheduling a single task take arround ~1ms.

In [3]:
print("1 chunk")
arr = da.random.random((6000, 6000), chunks=(6000, 6000))
%time x = arr.sum().compute()
print("3 chunks")
arr = da.random.random((6000, 6000), chunks=(2000, 6000))
%time x = arr.sum().compute()
print("4 chunks")
arr = da.random.random((6000, 6000), chunks=(3000, 3000))
%time x = arr.sum().compute()
print("36 chunks")
arr = da.random.random((6000, 6000), chunks=(1000, 1000))
%time x = arr.sum().compute()
print("400 chunks")
arr = da.random.random((6000, 6000), chunks=(300, 300))
%time x = arr.sum().compute()
print("auto")
arr = da.random.random((6000, 6000), chunks="auto")
%time x = arr.sum().compute()

1 chunk
CPU times: user 217 ms, sys: 22.9 ms, total: 240 ms
Wall time: 225 ms
3 chunks
CPU times: user 225 ms, sys: 44.9 ms, total: 270 ms
Wall time: 107 ms
4 chunks
CPU times: user 279 ms, sys: 37.1 ms, total: 316 ms
Wall time: 107 ms
36 chunks
CPU times: user 375 ms, sys: 11.5 ms, total: 386 ms
Wall time: 149 ms
400 chunks
CPU times: user 1.21 s, sys: 58.7 ms, total: 1.27 s
Wall time: 1.05 s
auto
CPU times: user 279 ms, sys: 34 ms, total: 313 ms
Wall time: 138 ms


In [5]:
arr

Unnamed: 0,Array,Chunk
Bytes,274.66 MiB,128.00 MiB
Shape,"(6000, 6000)","(4096, 4096)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 274.66 MiB 128.00 MiB Shape (6000, 6000) (4096, 4096) Dask graph 4 chunks in 1 graph layer Data type float64 numpy.ndarray",6000  6000,

Unnamed: 0,Array,Chunk
Bytes,274.66 MiB,128.00 MiB
Shape,"(6000, 6000)","(4096, 4096)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Operation order
---------------

Dask has a symbolic tree of operation, but little tools for optimization.  
It does not reorder operations for faster computation:

In [6]:
A = da.random.random((3000, 3000), chunks=(1000, 1000))
B = da.random.random((3000, 3000), chunks=(1000, 1000))
v = da.random.random((3000, 1), chunks=(1000, 1))
MM1 = (A @ B) @ v
MM2 = A @ (B @ v)
%time MM1.compute()
%time MM2.compute()

CPU times: user 2.47 s, sys: 2.23 s, total: 4.71 s
Wall time: 717 ms
CPU times: user 413 ms, sys: 910 ms, total: 1.32 s
Wall time: 185 ms


array([[1127664.41591508],
       [1115579.59158549],
       [1148852.88888581],
       ...,
       [1138887.10755949],
       [1156290.67193881],
       [1116515.15128465]])

In [8]:
# einsum can do operation in the right order, but the operation is not optimized
einMM = da.einsum("ij,jk,kl->il", A, B, v)
%time einMM.compute()

CPU times: user 1min 14s, sys: 351 ms, total: 1min 15s
Wall time: 13.5 s


array([[1127664.41591508],
       [1115579.59158549],
       [1148852.88888581],
       ...,
       [1138887.10755949],
       [1156290.67193881],
       [1116515.15128465]])