<img src="https://raw.githubusercontent.com/NCAR/dask-tutorial/main/images/NCAR-contemp-logo-blue.png"
     width="750px"
     alt="NCAR logo"
     style="vertical-align:middle;margin:30px 0px"/>


# Dask Arrays

**ESDS dask tutorial | 06 February, 2023**  

Brian Vanderwende and Negin Sobhani  
Computational & Information Systems Lab (CISL)  
[vanderwb@ucar.edu](mailto:vanderwb@ucar.edu) and [negins@ucar.edu](negins@ucar.edu)


---------

## Dask Arrays 

Dask arrays provide distrbuted, larger-than-memory, n-dimensional array using blocked algorithms. Blocked algorithms organizes large computations by performing smaller computations on coniguous chunks of data. 


<img src="https://docs.dask.org/en/stable/_images/dask-array.svg" width="500px" style="vertical-align:middle;margin:30px 0px"/>


Dask arrays are composed of many NumPy (or NumPy-like) arrays. Dask basically divides the arrays into many small pieces (called **chunks**) that fits into the memory and minimize the memory footprint of your computation by effectively streaming data from disk.


**Dask Arrays are lazy!**

Unlike Numpy, operations on Dask arrays are not computed until you explicitly request them. 

Operations queue up a series of tasks mapped over blocks, and no computation is performed until you explicitly ask values to be computed. At that point, data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

<div class="alert alert-block alert-info" markdown="1">

<b>Lazy Evaluation: objects are evaluated just in time when the results are needed!</b> 
</div>


### In this tutorial, you learn:

* Basic concepts and features of Dask Arrays
* Working with Dask arrays

**Related Documentation**

* [Array documentation](https://docs.dask.org/en/latest/array.html)
* [Array screencast](https://youtu.be/9h_61hXCDuI)
* [Array API](https://docs.dask.org/en/latest/array-api.html)
* [Array examples](https://examples.dask.org/array.html)


## Setup: Start a Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [7]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:57550,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:57568,Total threads: 4
Dashboard: http://127.0.0.1:57573/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57555,
Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-fckq0tew,Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-fckq0tew

0,1
Comm: tcp://127.0.0.1:57569,Total threads: 4
Dashboard: http://127.0.0.1:57571/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57553,
Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-81r3eig0,Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-81r3eig0

0,1
Comm: tcp://127.0.0.1:57567,Total threads: 4
Dashboard: http://127.0.0.1:57572/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57556,
Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-zchsiv3w,Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-zchsiv3w

0,1
Comm: tcp://127.0.0.1:57566,Total threads: 4
Dashboard: http://127.0.0.1:57570/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57554,
Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-h5w4avk7,Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-h5w4avk7


-----------------

## Blocked Algorithms

Blocked algorithms organizes large computations by performing smaller computations on smaller chunks of data.

Let's start by creating a NumPy array of ones:

In [36]:
import numpy as np

shape = (4000,6000)

ones_np = np.ones(shape)
ones_np

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

Now, let's create the same array using dask:

In [37]:
import dask.array as da
ones_da = da.ones(shape)
ones_da

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,127.99 MiB
Shape,"(4000, 6000)","(4000, 4194)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 183.11 MiB 127.99 MiB Shape (4000, 6000) (4000, 4194) Dask graph 2 chunks in 1 graph layer Data type float64 numpy.ndarray",6000  4000,

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,127.99 MiB
Shape,"(4000, 6000)","(4000, 4194)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


We see a dask array representation of the data. This is a symbolic representation. No data has actually been generated yet - this mode of operation is called **'lazy'**. It allows the user to build up a series of computations or tasks before being passed to the scheduler for execution.

In order to generate the data, we need to call the function .compute() on a dask array to trigger computation and conversion to a numpy array.

So, at this point, our dask array is similar to our NumPy array.  Let's run the `.compute()` function to see how this works:

In [38]:
ones.compute()

NameError: name 'ones' is not defined

-------
What are the sizes of these arrays on memory?  

First, let's define a function that return array size in MB. 

In [39]:
import sys

# Define function to display variable size in MB
def var_size(in_var):
    result = sys.getsizeof(in_var) / 1e6
    return (result)

In [40]:
print ("\n \n Numpy Array")
print("Size of the array: ",
      ones_np.size)

print("Shape of the array: ",
      ones_np.shape) 

print("Memory size of one array element in bytes: ",
      ones_np.itemsize)
 
# memory size of numpy array in MB
print(f"Memory size of numpy array in MB: {var_size(ones_np):.2f} MB")


print ("\n \n Dask Array")

print("Size of the array: ",
      ones_da.size)

print("Shape of the array: ",
      ones_da.shape) 

print("Memory size of one array element in bytes: ",
      ones_da.itemsize)
 
# memory size of dask array in MB
print(f"Memory size of dask array in MB: {var_size(ones_da):.2f} MB")


 
 Numpy Array
Size of the array:  24000000
Shape of the array:  (4000, 6000)
Memory size of one array element in bytes:  8
Memory size of numpy array in MB: 192.00 MB

 
 Dask Array
Size of the array:  24000000
Shape of the array:  (4000, 6000)
Memory size of one array element in bytes:  8
Memory size of dask array in MB: 0.00 MB


**Why memory size for the above Dask array is zero?**

Remember, this variable is only a facimile of the full array which will be split across workers.

However, Dask does give us ways to see the full size of the data (often much larger than your client machine!)

In [41]:
print("Size of Dask dataset:  {:.2f} MB".format(ones_da.nbytes / 1e6))

Size of Dask dataset:  192.00 MB


## Chunks
When checking the dask array, the symbolic representation illustrates the concept of "chunks". 

Dask arrays can split the data into sub-arrays to optimize computation with large arrays.  “Chunks” describes how the array is split into sub-arrays. In this case our array is small and we did not specify how many chunks we wanted, so there is only one chunk. Thinking about and controlling chunking is important to optimize advanced algorithms.

In [42]:
ones_da

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,127.99 MiB
Shape,"(4000, 6000)","(4000, 4194)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 183.11 MiB 127.99 MiB Shape (4000, 6000) (4000, 4194) Dask graph 2 chunks in 1 graph layer Data type float64 numpy.ndarray",6000  4000,

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,127.99 MiB
Shape,"(4000, 6000)","(4000, 4194)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [43]:
ones.visualize()

NameError: name 'ones' is not defined

### Chunking the array
The power of dask arrays comes from the ability to generate sub-arrays or "chunks".  The way that arrays are chunked can significantly affect total performance. 

For specifying the chunking of an array, we use the `chunks` argument dask.array how to break up the underlying array into chunks. There are several ways to specify `chunks`. For example:

1. A uniform dimension size like 1000, meaning chunks of size 1000 in each dimension. 

2. A uniform chunk shape like (1000, 2000, 3000), meaning chunks of size 1000 in the first axis, 2000 in the second axis, and 3000 in the third. 

3. Fully explicit sizes of all blocks for all dimensions, like ((1000, 1000, 500), (400, 400), (5, 5, 5, 5, 5))

4. A dictionary specifying chunk size per dimension like {0: 1000, 1: 2000, 2: 3000}.

Let's recreate the above dask array using a block shape, but this time we will specify chunk sizes (a.k.a. shapes) using the argument chunks. Let's create an array with 6 chunks:

In [44]:
# create dask array with 4 chunks
chunk_shape = (2000,2000)
ones_da = da.ones(shape,chunks=chunk_shape)
ones_da

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,30.52 MiB
Shape,"(4000, 6000)","(2000, 2000)"
Dask graph,6 chunks in 1 graph layer,6 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 183.11 MiB 30.52 MiB Shape (4000, 6000) (2000, 2000) Dask graph 6 chunks in 1 graph layer Data type float64 numpy.ndarray",6000  4000,

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,30.52 MiB
Shape,"(4000, 6000)","(2000, 2000)"
Dask graph,6 chunks in 1 graph layer,6 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


You can see in the above dask array representation that we now have 6 chunks, each of shape (2000,2000) and size 30.0 MB.

We can visualize this by looking at the dask task graph for this computation:

In [45]:
# visualize task graph 
ones_da.visualize()

RuntimeError: No visualization engine detected, please install graphviz or ipycytoscape

## Performance Comparison

To compare the performance between numpy and dask array, let's calculate the mean. 

In [46]:
%%time
# The %%time magic measures the execution time of the whole cell
ones_np.mean()

CPU times: user 58.5 ms, sys: 153 ms, total: 211 ms
Wall time: 240 ms


1.0

In [51]:
%%time
# Remember, we are not doing any computation here, just constructing our task graph
mean_of_ones_da = ones_da.mean()

CPU times: user 1.48 ms, sys: 21 µs, total: 1.5 ms
Wall time: 1.49 ms


So far with just constructed our task graph. Now let's calculate the mean using `compute`. 

In [52]:
%%time
mean_of_ones_da.compute()

CPU times: user 18.2 ms, sys: 3.06 ms, total: 21.3 ms
Wall time: 28.5 ms


1.0

Typically, when working with dask arrays, we do not want to generate the data right away by calling `.compute()` on a large array. We usually want to perform some computations that reduce the data size. For example, we might compute statistics like the mean or standard deviation.

Let's look at an example of taking the mean and visualize the task graph. Remember, that no actual computation is taking place until we call `.compute()`.

In [53]:
mean_of_ones_da = ones_da.mean()
mean_of_ones_da.visualize()

RuntimeError: No visualization engine detected, please install graphviz or ipycytoscape

## Larger Data
The previous example illustrated how dask works, but wasn't really necessary for an array of size 32 MB. Let's try an example using bigger data and bigger calculations. 

In [79]:
big_shape = (20000, 20000)

big_ones_np = np.ones(big_shape)

Memory size of NumPy array in MB: 3200.00 MB


Make a similar Dask Array with similar shape but specifying the `chunks` size:

In [92]:
big_shape = (20000, 20000)
chunk_shape = 2000
big_ones_da = da.ones(big_shape,chunks=chunk_shape)
big_ones_da

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000, 20000)","(2000, 2000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.98 GiB 30.52 MiB Shape (20000, 20000) (2000, 2000) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000, 20000)","(2000, 2000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [85]:
# size of data
print("Memory size of NumPy dataset :  {:.2f} GB".format(big_ones_np.nbytes / 1e9))
print("Memory size of Dask dataset  :  {:.2f} GB".format(big_ones_da.nbytes / 1e9))

Memory size of NumPy dataset :  3.20 GB
Memory size of Dask dataset  :  3.20 GB


This dataset is 3.2 GB, rather than MB! This may be close to the available memory/RAM that you have in your computer.

<div class="alert alert-block alert-warning">

<b>WARNING:</b> Do not try to `.visualize()` this array!

</div>

Let's try bigger calculations on this array:

In [105]:
%time 
z_np = big_ones_np * big_ones_np[::-1, ::-1].mean()

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 11.9 µs


Bigger computations can take a bit more time, so dask has some built-in tools to help us follow what is happening.

In [106]:
%time
# perform big computation
from dask.diagnostics import ProgressBar

z_da = big_ones_da * big_ones_da[::-1, ::-1].mean()

with ProgressBar():
    result = z_da.compute()


CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 5.72 µs


All the usual NumPy functions work on dask arrays and the computations with remain lazy until you either call `.compute()`, `.load()` or your want to plot the data.

As we discussed above, the way that Dask arrays are chunked can significantly affect the performance.

For example, let's do a similar calculation using a different `chunks` size:

big_shape = (20000, 20000)
chunk_shape = 5
big_ones_da = da.ones(big_shape,chunks=chunk_shape)
big_ones_da

In [None]:
%time
# perform big computation
from dask.diagnostics import ProgressBar

z_da = big_ones_da * big_ones_da[::-1, ::-1].mean()

with ProgressBar():
    result = z_da.compute()

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 14.1 µs


# Summary: