# Dask Arrays - parallelized numpy

- Parallel, larger-than-memory, n-dimensional array using blocked algorithms.

- Parallel: Uses all of the cores on your computer

- Larger-than-memory: Lets you work on datasets that are larger than your available memory
             by breaking up your array into many small pieces, 
             operating on those pieces in an order that minimizes 
             the memory footprint of your computation, 
             and effectively streaming data from disk.


- Blocked Algorithms :  Perform large computations by performing many smaller computations.

<img src = 'https://docs.dask.org/en/stable/_images/dask-array.svg'>

- dask memory cutting up large array into many small arrays 
- This lets us compute on arrays larger than memory using all of our core
- 쪼개서 병렬로 돌려버린다 

In [None]:
!pip3 install zarr

In [4]:
%run prep.py -d random

- Generating random array data... ** Created random data for array exercise in 10.75s


In [5]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54712,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:54727,Total threads: 3
Dashboard: http://127.0.0.1:54730/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:54716,
Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-w33x86l2,Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-w33x86l2

0,1
Comm: tcp://127.0.0.1:54734,Total threads: 3
Dashboard: http://127.0.0.1:54736/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:54718,
Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-_eboey_l,Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-_eboey_l

0,1
Comm: tcp://127.0.0.1:54728,Total threads: 3
Dashboard: http://127.0.0.1:54729/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:54715,
Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-5of4y1jj,Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-5of4y1jj

0,1
Comm: tcp://127.0.0.1:54733,Total threads: 3
Dashboard: http://127.0.0.1:54735/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:54717,
Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-e2wmjkov,Local directory: /var/folders/2r/_fr309ls60x8zpqkcdvzvn940000gn/T/dask-worker-space/worker-e2wmjkov


# Blocked Algorithms

In [6]:
import numpy as np
import dask.array as da

In [7]:
# NumPy array
a_np = np.ones(10)
a_np

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

In [8]:
# numpy we have sum 
a_np_sum = a_np[:5].sum() + a_np[5:].sum()
a_np_sum

10.0

In [10]:
a_da = da.ones(10, chunks=5)
# we have 5 elements per block 
a_da

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 80 B 40 B Shape (10,) (5,) Dask graph 2 chunks in 1 graph layer Data type float64 numpy.ndarray",10  1,

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [11]:
a_da_sum = a_da.sum()
a_da_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 3 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


# Task Graphs
- task scheduling --> graph로 부르기로 했음

In [12]:
# visualize the low level Dask graph using cytoscape
a_da_sum.visualize(engine="cytoscape")


CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

# Performance comparison

## Numpy version

In [14]:
%%time
xn = np.random.normal(10, 0.1, size=(30_000, 30_000))
yn = xn.mean(axis=0)
yn

CPU times: user 14.1 s, sys: 789 ms, total: 14.9 s
Wall time: 14.5 s


array([ 9.99946971, 10.00002035, 10.00032803, ..., 10.00003338,
       10.00058552, 10.00022401])

# Dask array version

In [15]:
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
xd

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,68.66 MiB
Shape,"(30000, 30000)","(3000, 3000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 6.71 GiB 68.66 MiB Shape (30000, 30000) (3000, 3000) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",30000  30000,

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,68.66 MiB
Shape,"(30000, 30000)","(3000, 3000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [16]:
yd = xd.mean(axis=0)
yd

Unnamed: 0,Array,Chunk
Bytes,234.38 kiB,23.44 kiB
Shape,"(30000,)","(3000,)"
Dask graph,10 chunks in 4 graph layers,10 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 234.38 kiB 23.44 kiB Shape (30000,) (3000,) Dask graph 10 chunks in 4 graph layers Data type float64 numpy.ndarray",30000  1,

Unnamed: 0,Array,Chunk
Bytes,234.38 kiB,23.44 kiB
Shape,"(30000,)","(3000,)"
Dask graph,10 chunks in 4 graph layers,10 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [17]:
%%time
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()

CPU times: user 485 ms, sys: 102 ms, total: 587 ms
Wall time: 2.13 s


array([10.00042821, 10.00047679, 10.00033298, ...,  9.99898234,
        9.99959701, 10.00048158])

# Choosing good chunk sizes

In [19]:
darr = da.random.random((1000, 1000, 1000))
#  we did not specify the chunks. Dask has set by default chunks='auto' 
darr

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,126.51 MiB
Shape,"(1000, 1000, 1000)","(255, 255, 255)"
Dask graph,64 chunks in 1 graph layer,64 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.45 GiB 126.51 MiB Shape (1000, 1000, 1000) (255, 255, 255) Dask graph 64 chunks in 1 graph layer Data type float64 numpy.ndarray",1000  1000  1000,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,126.51 MiB
Shape,"(1000, 1000, 1000)","(255, 255, 255)"
Dask graph,64 chunks in 1 graph layer,64 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [20]:
darr.chunksize


(255, 255, 255)

In [21]:
darr.chunks


((255, 255, 255, 235), (255, 255, 255, 235), (255, 255, 255, 235))

In [23]:
darr = darr.rechunk({0: -1, 1: 100, 2: "auto"})
darr

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,127.41 MiB
Shape,"(1000, 1000, 1000)","(1000, 100, 167)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.45 GiB 127.41 MiB Shape (1000, 1000, 1000) (1000, 100, 167) Dask graph 60 chunks in 2 graph layers Data type float64 numpy.ndarray",1000  1000  1000,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,127.41 MiB
Shape,"(1000, 1000, 1000)","(1000, 100, 167)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [24]:
darr.chunksize


(1000, 100, 167)

In [25]:
darr.chunks


((1000,),
 (100, 100, 100, 100, 100, 100, 100, 100, 100, 100),
 (167, 167, 167, 167, 167, 165))

# chuck size 골르는 법
- 너무 작아도 문제
- 너무 커도 문제
- 100MB and 1GB is good
- 너무 많은 task graphs 피하자 10000 ~ 100000 rjs wjdeh
- the number of worker cores times 2

# Example of chunked data with Zarr

- zarr : chunked storage format 

- 추가 자료 필요 : hdf5, zarr 방식

In [29]:
import zarr


In [31]:
a = da.from_zarr("data/random.zarr")

In [32]:
a

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(200000000,)","(6250000,)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.49 GiB 47.68 MiB Shape (200000000,) (6250000,) Dask graph 32 chunks in 2 graph layers Data type float64 numpy.ndarray",200000000  1,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,47.68 MiB
Shape,"(200000000,)","(6250000,)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [34]:
# array chucked already

In [33]:
%%time
a.mean().compute()

CPU times: user 53.3 ms, sys: 10.3 ms, total: 63.7 ms
Wall time: 263 ms


0.5000284161442478

In [None]:
### chucksize에 따라서 어떻게 변하는지 보기 

In [35]:
b = da.from_zarr("data/random_sc.zarr")
b

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,78.12 kiB
Shape,"(200000000,)","(10000,)"
Dask graph,20000 chunks in 2 graph layers,20000 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.49 GiB 78.12 kiB Shape (200000000,) (10000,) Dask graph 20000 chunks in 2 graph layers Data type float64 numpy.ndarray",200000000  1,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,78.12 kiB
Shape,"(200000000,)","(10000,)"
Dask graph,20000 chunks in 2 graph layers,20000 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [36]:
%%time
b.mean().compute()


CPU times: user 5.12 s, sys: 333 ms, total: 5.45 s
Wall time: 7.33 s


0.500017773261381

# Zarr
- https://zarr.readthedocs.io/en/stable/tutorial.html

# Xarray 
- https://tutorial.xarray.dev/intro.html