# Dask Array

## Notebook Objectives
* **Demonstrate NumPy**, a library for working with multidimensional arrays.
* Using **blocked algorithms** to work on a large dataset in small chunks.
* **Introducing Dask Array**, interface for parallel NumPy.
* **Limitations of Dask Array**.
* **References** for further reading.

## Demostrate NumPy for array operations


NumPy has a `ones()` function to create unit arrays, or arrays of all ones. We use it to create a 10x10 matrix of ones:

In [1]:
import numpy as np

x = np.ones((10, 10), dtype=int)
x

array([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])

The `sum()` method is used to calculate sum.

In [2]:
%%time
x.sum()

CPU times: user 30 µs, sys: 6 µs, total: 36 µs
Wall time: 38.9 µs


100

The `random` module can be used to create arrays of random data. Let's create a larger matrix of dimension 1000x1000:

In [28]:
x = np.random.random(size=(1000, 1000))
x

array([[0.45568271, 0.1431577 , 0.37642501, ..., 0.15796487, 0.45990731,
        0.27984186],
       [0.81437462, 0.62504866, 0.51884817, ..., 0.66213603, 0.76877329,
        0.27941481],
       [0.67832935, 0.47963707, 0.86939886, ..., 0.8534172 , 0.84040989,
        0.56960117],
       ...,
       [0.01973544, 0.35057438, 0.05273421, ..., 0.60723429, 0.2137585 ,
        0.99921152],
       [0.89358418, 0.53268602, 0.69352128, ..., 0.06789243, 0.84053498,
        0.38334184],
       [0.05749119, 0.42748649, 0.72071472, ..., 0.44029739, 0.43499474,
        0.46421326]])

In [3]:
%%time
x.sum()

CPU times: user 33 µs, sys: 7 µs, total: 40 µs
Wall time: 42.9 µs


100

NumPy has many helpful operations, including matrix transpose, matrix addition, and mean as shown below:

In [4]:
%%time
y = x + x.T
y

CPU times: user 36 µs, sys: 0 ns, total: 36 µs
Wall time: 39.1 µs


array([[2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2]])

In [5]:
%%time
np.mean(y)

CPU times: user 76 µs, sys: 8 µs, total: 84 µs
Wall time: 86.1 µs


2.0

Let's now create an even larger matrix of 20,000x20,000 normally distributed random values and compute it's mean.

In [6]:
%%time 
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x.mean(axis=0) 
y

CPU times: user 9.15 s, sys: 771 ms, total: 9.92 s
Wall time: 9.95 s


array([ 9.99981611,  9.99998234, 10.00035018, ..., 10.0003316 ,
       10.00153614,  9.99937434])

Note that this computation takes some time. We will run this same example using Dask in a few minutes!

Now, let's try to create an even larger matrix with a billion values along each axis!

In [33]:
x = np.ones((1_000_000_000, 1_000_000_000), dtype=int)

MemoryError: Unable to allocate 6.94 EiB for an array with shape (1000000000, 1000000000) and data type int64

This throws a `MemoryError`, meaning NumPy isn't able to handle data at this size. We can work around this limitation using blocked algorithms as shown in the following section.

## Blocked Algorithms

A blocked algorithm executes on a large dataset by breaking it up into many small blocks.

For example, in the above example with a billion+ numbers, consider taking the sum of all numbers. We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums.

Let's do this:

In [34]:
# Load data with h5py
# this creates a pointer to the data, but does not actually load
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']

In [35]:
# Compute sum of large array, one million numbers at a time
sums = []
for i in range(0, 1_000_000_000, 1_000_000):
    chunk = dset[i: i + 1_000_000]  # pull out numpy array
    sums.append(chunk.sum())

total = sum(sums)
print(total)

5005765.3125


Note that this is a sequential process in the notebook kernel, both the loading and summing.

## Checkpoint

Question: Create a random matrix of size 1000x1000 and compute standard deviation.

In [None]:
# Your answer goes here

In [None]:
# Answer

x = np.random.random(size=(1000, 1000))
y = x.std(axis=0)
y

## Dask Array for parallel NumPy

Dask Array is high-level interface that can be used to scale NumPy code to large datasets by using chuncking techniques as seen in the previous section.

Let's create a new cluster:

In [7]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads:  12,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:63861,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads:  12
Started:  Just now,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:63868,Total threads: 3
Dashboard: http://127.0.0.1:63872/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:63863,
Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-ec3y923x,Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-ec3y923x

0,1
Comm: tcp://127.0.0.1:63870,Total threads: 3
Dashboard: http://127.0.0.1:63873/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:63864,
Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-asmqcmx8,Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-asmqcmx8

0,1
Comm: tcp://127.0.0.1:63867,Total threads: 3
Dashboard: http://127.0.0.1:63871/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:63866,
Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-j5mi2q4h,Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-j5mi2q4h

0,1
Comm: tcp://127.0.0.1:63869,Total threads: 3
Dashboard: http://127.0.0.1:63874/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:63865,
Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-z8ngf4fe,Local directory: /Users/pavithra-coiled/Developer/talkpython-dask-course/2-dask-fundamentals/dask-worker-space/worker-z8ngf4fe


Don't forget to open the dashboards!

The following Dask Array code creates a 10,000x10,000 array with 100x100 chunks. The visualization below shows the resulting structure created.

In [8]:
import dask.array as da

x = da.ones((10_000, 10_000), chunks=(100, 100))
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,78.12 kiB
Shape,"(10000, 10000)","(100, 100)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 78.12 kiB Shape (10000, 10000) (100, 100) Count 10000 Tasks 10000 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,78.12 kiB
Shape,"(10000, 10000)","(100, 100)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray


Let's compute the sum of this array. Dask Array also evaluates lazily, so we need to call `compute()` to get the result.

In [9]:
%%time
result = x.sum()

CPU times: user 55.8 ms, sys: 3.4 ms, total: 59.2 ms
Wall time: 58.4 ms


In [10]:
result

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,23364 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 23364 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,23364 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [11]:
%%time
result.compute()

CPU times: user 14 s, sys: 587 ms, total: 14.6 s
Wall time: 14.8 s


100000000.0

Now, let's do the same NumPy operations as earlier and compare the compute time!

In [12]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute()

CPU times: user 2.09 s, sys: 77.9 ms, total: 2.17 s
Wall time: 2.66 s


array([ 9.99984054,  9.99810732, 10.0011439 , 10.00006243, 10.00067303,
       10.00061939,  9.99967573, 10.00082095, 10.00005874, 10.00077991,
       10.00026624,  9.9997531 ,  9.9998511 , 10.00016311, 10.00117175,
       10.00104788,  9.99950712, 10.00058845, 10.0008487 ,  9.99974336,
        9.99990533, 10.00005898,  9.9997313 ,  9.99963978,  9.99963639,
       10.0004849 , 10.00105947, 10.0013202 , 10.0003486 ,  9.99945365,
        9.9993116 , 10.00008687, 10.00020631, 10.00188321,  9.99979062,
        9.99948907,  9.99998619,  9.99977491,  9.99940876,  9.99954004,
        9.99940587, 10.00030509,  9.99879323, 10.00005036,  9.99899518,
        9.99996586, 10.00165206, 10.00018339,  9.99941188,  9.9996999 ,
       10.00090938,  9.9999311 ,  9.99904588, 10.00009871, 10.00080389,
        9.9996124 ,  9.99973756, 10.0006106 ,  9.99932609,  9.99976075,
       10.00024083,  9.99707116,  9.99981883,  9.99889187,  9.99995958,
        9.99922913, 10.00073525,  9.9999535 , 10.00043148,  9.99

In [13]:
client.close()

## Checkpoint

**Question**: Using Dask Array, create a random matrix of size 1 million x 1 million and compute the standard deviation.

In [None]:
#your answer here

In [None]:
# Answer

x = da.random((1_000_000, 1_000_000), chunks=(10_000, 10_000))
y = x.std(axis=0)
y

## Limitations of Dask Array

* Dask Array does not implement the entire NumPy interface. For example, it does not implement `np.linalg` and `np.sometrue`.
* Dask Array does not support some operations where the resulting shape depends on the values of the array.
* Dask Array does not attempt operations like sort which are difficult to do in parallel.

## References

* [Dask Array documentation](https://docs.dask.org/en/latest/array.html)
* [Dask Array API](https://docs.dask.org/en/latest/array-api.html)
* [dask Array examples](https://examples.dask.org/array.html)
* [Dask Tutorial - Array](https://tutorial.dask.org/03_array.html)