# Using Dask "Locally" on a Virtual Machine
I'm using this document to keep track of some different methods for applying Dask in data analysis.

## 1. Clusters and Clients

* A dask *Cluster* is a collection of workers/cores/threads that do the parallel computations.
* A `worker` is a separate 'machine' in the cluster. It can still be comprised of threads and cores.
* A cluster can either represent a set of remote machines or a partition of your local system (or VM) into workers.
* By default a cluster is set up with the number of available cores on the system.
* Clusters are part of the `dask.distributed` framework (like distributed cores). They don't actually have to be used and dask will automatically use local resources. However, `dask.distributed` offers lots of other functionality such as the dashboard.

* Dask `Client` are used to communicate with the workers in a cluster (via a `Scheduler`). 

The easiest way to make a new `Cluster` on your VM and connect to it with a `Client`:

In [51]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 35559 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:35559/status,

0,1
Dashboard: http://127.0.0.1:35559/status,Workers: 4
Total threads: 8,Total memory: 31.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43367,Workers: 4
Dashboard: http://127.0.0.1:35559/status,Total threads: 8
Started: Just now,Total memory: 31.35 GiB

0,1
Comm: tcp://127.0.0.1:34223,Total threads: 2
Dashboard: http://127.0.0.1:46557/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:42747,
Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-0skm78jj,Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-0skm78jj

0,1
Comm: tcp://127.0.0.1:37091,Total threads: 2
Dashboard: http://127.0.0.1:40877/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:42567,
Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-btb695j0,Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-btb695j0

0,1
Comm: tcp://127.0.0.1:44477,Total threads: 2
Dashboard: http://127.0.0.1:35913/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:37803,
Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-9_yd6dl9,Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-9_yd6dl9

0,1
Comm: tcp://127.0.0.1:33035,Total threads: 2
Dashboard: http://127.0.0.1:37015/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:38957,
Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-fdcarmux,Local directory: /home/davidbyrne/code/aqml/dask-worker-space/worker-fdcarmux


In [96]:
client = Client('tcp://127.0.0.1:37567')

* Make sure the close the cluster using `cluster.close()` before creating a new one.

## 2. Basic Use of Dask Array

* Dask is pretty good at parallelizing and chunking any operations that can be done using typical numpy methods. 
* Most numpy methods that can be called from the main module (e.g. np.sum, np.mean, np.max) also have a dask array equivalent. 
* Any array you apply these methods to will be computed in chunks and lazily until you want to compute

Start by importing dask array and some other things:

In [72]:
import dask.array as da
import dask

Make a big array of random values:

In [53]:
array = da.random.random((1000, 1000, 1000), chunks = [-1, 10, 10])
array

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,781.25 kiB
Shape,"(1000, 1000, 1000)","(1000, 10, 10)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.45 GiB 781.25 kiB Shape (1000, 1000, 1000) (1000, 10, 10) Count 10000 Tasks 10000 Chunks Type float64 numpy.ndarray",1000  1000  1000,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,781.25 kiB
Shape,"(1000, 1000, 1000)","(1000, 10, 10)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray


* This is a huge 1000 x 1000 x 1000 array of random values, but none of them have been computed yet. 
* The array represents an instruction. 
* The array is chunked into 1000 x 10 x 10 cubes (the -1 takes the whole dimension). 
* By looking at the array in Jupyter, we can see the size of the whole computed array (75Gb ish) and each chunks (~8Mb ish).

While the whole array can't be easily stored in memory without a large computer, we can still apply operations on it and, if done wisely, we can handle their outputs much more easily.

In [54]:
# Calculate the sum over the first axis:
array_sum  = da.sum( array, axis=0 )
array_sum

Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,800 B
Shape,"(1000, 1000)","(10, 10)"
Count,30000 Tasks,10000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.63 MiB 800 B Shape (1000, 1000) (10, 10) Count 30000 Tasks 10000 Chunks Type float64 numpy.ndarray",1000  1000,

Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,800 B
Shape,"(1000, 1000)","(10, 10)"
Count,30000 Tasks,10000 Chunks
Type,float64,numpy.ndarray


These arrays have still not been computed, so lets do that:

In [55]:
array_computed = array_sum.compute()
type(array_computed)

numpy.ndarray

The computed array is now a numpy array. We can also string operations together:


In [62]:
array_mean_sum = da.mean( array_sum )
array_mean_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,43364 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 43364 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,43364 Tasks,1 Chunks
Type,float64,numpy.ndarray


Above, I called `.compute()` directly onto the dask array. There are some other options at this point:

* `dask.compute( array )`. I think this is basically the same as calling .compute() directly onto the array.
* `client.compute( array )`. This will return a `Future` object, having submitted the job to the cluster using the client. A future object will be carrying out the computation in the background. You can interrogate this object to see whether or not the computation has finished yet (it will be *pending* or *finalized*). This returns a single future object for the result, which is stored in its entirety on one worked. Therefore this is the option to use for small results.
* `client.persist( array )`. This is similar to client.compute() in that it submits all of the tasks to the cluster. However, it doesn't gather the resulting future into one place like client.compute() does. Instead, the result will stay distributed over workers. This is a better option for bigger results.

These routines can also be passed lists of array, tasks or delayed objects. 

## 3. Point-by-point analysis

Sometimes you need more control over the analysis that can't be provided by dask.array methods, e.g. doing a time series analysis at every point in a geospatial domain. In these cases, I have found two useful methods: `map_blocks()` and using lists of delayed objects.

### 3.1 Map_blocks()

* This will apply a function to all and each chunk of a dask array in parallel.
* A function will be applied across chunks, and then the results 'stitched' back together into one dask array.
* It has a constraint: the returned 'stitched' array should have the same number of dimensions as the input array.
* Some thought must be put into the function applied to make sure the above constraint is satisfied. In many cases it is easy

In [82]:
# Import things
import dask.array as da
import dask
import numpy as np

# Make a new BIG array of random numbers
array = da.random.random((1000, 1000, 1000), chunks = [-1, 10, 10])

In the above array, let's pretend that the first dimension is time and we want to do some sort of analysis along the time dimension, at every point in the domain. To apply map_blocks() correctly, I have only chunked the "horizontal" axes.

Define a function to apply at each point. The following function will calculate the 10 largest elements of each time series. The eventual array will then be of shape 10 x 1000 x 1000, with the first dimension being the largest elements. In this example, I have applied a sorting method to each time series independently using numpy, although of course this could be done to a whole chunk by specifying an axis.

In [91]:
def apply_me_to_each_chunk(chunk):
    
    # Get number of times, rows and cols in this chunk
    n_time, n_r, n_c = chunk.shape
    n_pts = n_r*n_c
    
    # Flatten the chunk in the horizontal dimension
    chunkF = chunk.reshape((n_time, n_pts))
    
    # Define an empty output array
    top10 = np.zeros((10, n_pts))*np.nan
    
    # Loop over each "horizontal" point of the chunk
    for ii in range(n_pts):
        
        # Pull out time series for this point and sort
        sorted_data = np.sort( chunkF[:, ii] )
        
        # Get top 10 values and put into output array
        top10[:, ii] = sorted_data[::-1][:10]
        
    # Reshape top10 to have the same dimension as the input chunk and return
    return top10.reshape((10, n_r, n_c))

Now we can use `map_blocks()`. This routine takes the function as an argument, and then any arguments taken by the function. In our case, this is only a chunk array. For this arguments, we pass it the full dask array and dask will handle the rest.

In [108]:
# Call map_blocks
top10 = da.map_blocks(apply_me_to_each_chunk, array, dtype=int)

# Compute the result in some way
top10 = top10.compute()

top10.shape

(10, 1000, 1000)

* Of course, this doesn't have to be computed immediately. It could be subjected to further analysis or written to files such as .zarr or .netcdf.
* There are ways of changing the shape and dimension of the output array. See the map_blocks docs for examples (maybe I'll add these here at some point).
* If you only want to process one time series per core you could avoid the loop in the function above. Just chunks along the 2nd and 3rd dimensions with chunk sizes of 1.

### 3.2 Computation of Delayed Chunks

Sometimes map_blocks might not work, for example if the resulting outputs cannot be easily placed into an array. In this case, we can turn the chunks of a dask array into delayed objects and apply a function to each.

Lets define another function. This will return all elements of the chunk over 0.99. As this will be a different sized array for each chunk, we can't use map_blocks as easily. (Of course there are functions such as da.where() or other indexing methods, but this is just an example).

In [126]:
def apply_me_to_each_delayed_chunk(delayed_chunk):
    
    # Pull out all values over 0.98 in the chunk
    over_threshold = delayed_chunk[ delayed_chunk > 0.99 ]
    
    return over_threshold

Now create a random array and turn it into a list of delayed chunks:

In [116]:
# Import things
import dask.array as da
import dask
import numpy as np

# Make a new BIG array of random numbers
array = da.random.random((1000, 1000, 1000), chunks = [-1, 10, 10])

# Turn into delayed objects, with shape 1x100x100
delayed_array = array.to_delayed()
print( delayed_array.shape )

# Stack these delayed arrays
delayed_array = delayed_array.ravel()
print( delayed_array.shape )

(1, 100, 100)
(10000,)


Now loop over these delayed chunks and aplpy the function we wrote above. The result will be a further list of delayed objects that are waiting to be computed.

In [128]:
over_threshold = [ apply_me_to_each_delayed_chunk(dd) for dd in delayed_array ]
over_threshold[0]

Delayed('getitem-1ecda444836f9e8ff7dbebc78308deae')

Now compute. In this case I have used persist for no particular reason.

In [139]:
computed = client.persist(over_threshold)
computed[0].compute()

array([0.9900639 , 0.99609172, 0.9907704 , ..., 0.99338078, 0.99558568,
       0.99130166])

## 4. Input/Output

### 4.1 Input with Xarray

Xarray is a useful library for interaction between netCDF files and dask. Files are read into xarray Datasets and variables are represented by xarray DataArrays. When opening a netCDF file, if you specify the `chunks` argument then the data will be read automatically (and lazily) into a dask array. This means the data stays on the storage disk until needed. When needed, chunks will be read to memory accordingly. This can be a little slower than reading all data at once, but helps combat memory issues for big datasets.

```
import xarray as xr
filename = "<example_filename>"
dataset = xr.open_dataset( filename, chunks={'time':-1, 'lat':100, 'lon':100})
```

The underlying dask array can be accessed from a data array using the `.data` call. e.g.

```
variable = dataset['variable_name']
dask_array = variable.data
```

The the dask methods described in previous sections can be easily applied to this. Note that this dask array comes preset with additional tasks for loading in the data.

Often, you may not need to convert an xarray dataset or array into a dask array first, as xarray comes with many dask-type functions that can be applied straight to the dataset. This actually includes most of the routines in this notebook.

### 4.2 Output with Xarray

If your data is in an xarray dataset, whether computed or not, it can be written to netCDF using the `to_netcdf()` function. If the data inside the dataset is an uncomputed dask array, then data should be written on a chunkwise basis. This is useful if the computed data is also very large and you don't want it read to memory. Call the analysis lazily, then write it to file. Xarray can also be used to write to zarr in a similar way by using the `to_zarr()` function instead.

In the case where you are working with dask arrays and the data is not in an xarray object you can quickly write the data to a .zarr file using Dask's own `dask.array.to_zarr()` function. 

If you want to write to netcdf, you can quickly put the data into an xarray DataArray first and then call `to_netcdf()`. For example, using the top10 array calculated in the previous section:

In [148]:
output_array = xr.DataArray(top10, dims=['top10','r','c'])


In [147]:
test