# Using Dask with Dask Gateway <img align="right" src="../resources/csiro_easi_logo.png">
 
Index
- [Background](#Background)
- [Dask Concepts](#Dask-Concepts)
- [Local Dask Cluster](#Local-dask-cluster)
- [Dask Gateway](#Dask-Gateway)
   - [Create cluster](#Create-cluster)
   - [Scale the cluster](#Scale-the-cluster)
   - [Connect to the cluster](#Connect-to-the-cluster)
   - [Cluster status](#Cluster-status)
   - [Shutdown the cluster](#Shutdown-the-cluster)
- [EASI Utility Functions](#EASI-utility-functions)
   - [Initialize a local cluster](#Initialize-a-local-cluster)
   - [Initialize a remote cluster](#Initialize-a-remote-cluster)
   - [Initialize a remote cluster with adaptive scaling](#Initialize-a-remote-cluster-with-adaptive-scaling)
- [Dask strategies](#Dask-strategies)
   - [Change the dask chunk size](#Change-the-dask-chunk-size)
   - [Chunking examples](#Chunking-examples)
   - [GridWorkflow](#GridWorkflow)

<div class="alert alert-warning">
    <p>This notebook is under development</p>
</div>

## Background

Dask is a python library for distributed (parallel) processing. We encourage users to create their own Dask cluster for their analysis. Dask allows us to work on very large array or datacube objects that would otherwise not fit in the memory of any single compute node.

Why dask https://docs.dask.org/en/latest/why.html?

> Python has grown to become a dominant language in data analytics and general programming. This is fueled both by computational libraries like `Numpy`, `Pandas`, and `Scikit-Learn` and by a wealth of libraries for visualization, interactive notebooks, collaboration, and so forth. However, these packages were not designed to scale beyond a single machine. Dask was developed to scale these packages and the surrounding ecosystem. Dask provides ways to scale Pandas, Scikit-Learn, and Numpy workflows more natively, with minimal rewriting. It integrates well with these tools so that it copies most of their API and uses their data structures internally.

Dask works well with `Xarray`, which are the data objects returned from the `datacube`. There are many tutorials regarding the use of `Xarray`. See:
- https://examples.dask.org/xarray.html
- https://xarray.pydata.org/en/stable/dask.html

[Holoviews](https://holoviews.org) is a visualisation package that works directly with dask as well. It will efficiently access only the pixels it requires for a visualisation from a dask data object distributed across your dask cluster. See the [Visualisation notebook](05%20-%20Visualising%20Data.ipynb).

<div class="alert alert-info">
    <strong>Note:</strong> In addition to this notebook, please refer to the Dask Tutorials in the <strong>Dask</strong> folder in the root of this repository, which provide more information about how and why to use Dask. 
</div>

## Dask Concepts

We use the `Dask.distributed` library. See [https://distributed.dask.org/en/latest](https://distributed.dask.org/en/latest).

Dask operates in a "lazy" compute manner, which means it will only do processing work when it needs to and otherwise will store a list of operations until they are required. Dask figures out how to break up large computations and distribute the tasks to the dask cluster **workers**.

The **dask scheduler** maintains the full list of tasks. For large or complex computations it can take a noticable amount of time for the dask schedule graph to be created or updated. A key part of learning to use dask effectively is to learn how to manage - and troubleshoot - the size of the schedule graph (see [Dask strategies](#Dask-strategies)).

The **dask cluster** is the scheduler plus the group of compute **workers** that process the tasks in a distributed or parallel manner. The number of workers is initialised when the cluster is started. We encourage users to create *adaptive* clusters that add more workers when required (processing tasks) and reduces the number of workers when not required (thinking time).

The size of eash dask worker in EASI is set to: `cores = 8, memory = 32 GB` (2x large). The operating system on a worker takes up some of the available memory and it is best leave some "headroom" for the processing. Therefore estimate that about `20 GB` is available per node.

The **dask client** is the interface to the dask scheduler. The client can be used to submit tasks to the scheduler, and influence or retrieve information from the scheduler. In practice the submitting of tasks is usually handled by the `Xarray` or `Pandas` libraries. See https://distributed.dask.org/en/latest/client.html.

Useful client functions that are used regularly:
- `client.compute()` - run the tasks in the scheduler and return the result to the Jupyter notebook. ***Warning:*** if the data result is too large for the Jupyter notebook memory then it will crash.
- `client.persist()` - run the tasks in the scheduler but leave the data on the dask workers.
- `client.restart()` - restart the cluster and reset the scheduler.

The **dask dashboard** (`client.dashboard_link`) is a helpful resource to explore the activity and state of your dask cluster.

## Local dask cluster

A local dask cluster runs entirely on the Jupyter notebook machine. It is useful for processing small data volumes (less than the Jupyter notebook memory) as it will utilise dask distribution of tasks across the available machine's processors. See https://docs.dask.org/en/latest/setup/single-distributed.html.

>__Tip:__ This notebook starts by showing you how to start up a local cluster manually. Note that there are also some utility functions to make this easier. This is demonstrated at the end of this notebook.

In [None]:
import sys, os, re
os.environ['USE_PYGEOS'] = '0'
sys.path.append(os.path.expanduser('../scripts'))
from easi_tools import EasiDefaults
import notebook_utils
easi = EasiDefaults()

In [None]:
from dask.distributed import Client, LocalCluster

# Connect to an existing LocalCluster if available
# The default port for the scheduler is 8786
# The default port for the dashboard is 8787, but you will get a different port number if a cluster already exists
try:
    # This creates a new Client connection to an existing Dask scheduler if one exists.
    # There is no practical way to get the LocalCluster object from the existing scheduler,
    # although the scheduler details can be accessed with `client.scheduler`.
    # The LocalCluster object is only available from the notebook that created it.
    # Restart the kernel or `client.close();cluster.close()` in each notebook that
    # created one to remove existing LocalClusters.
    client = Client('localhost:8786', timeout='2s')
    cluster = client.cluster  # None
except:
    cluster = LocalCluster(scheduler_port=8786)
    client = Client(cluster)

display(cluster if cluster else client)

The LocalCluster dashboard link needs to be rewritten to use the JupyterLab proxy. We have put this code in a `notebook_utils` function.

In [None]:
print(notebook_utils.localcluster_dashboard(client, server=easi.hub))

## Dask Gateway

**[Dask Gateway](https://gateway.dask.org/)** is a centralised system that helps to manage dask functionality and associated workers. We use Dask Gateway to create clusters and then connect notebooks to the cluster to provide scalable computing.

In [None]:
# Initiliaze the Gateway client
from dask.distributed import Client
from dask_gateway import Gateway

gateway = Gateway()
gateway.cluster_options()

Cluster options shows how each worker will be configured. There are a limited set of options and valid values that can be configured.

The [dask usage guide](https://gateway.dask.org/usage.html) describes how to edit and set the cluster options. We use the default configuration here.

> __Only__ use a worker size that you require for your work. Excessive worker sizes will increase costs and reduce resources for other users.

### Create cluster

Create the cluster with default options if it doesn't already exist. If a cluster exists in your namespace, the code below will connect to that cluster. List the available clusters with `gateway.list_clusters()`.

***This may take a few minutes if new dask nodes need to be created in AWS (if its quick then existing nodes are being reused).***

In [None]:
clusters = gateway.list_clusters()
if not clusters:
    print('Creating new cluster. Please wait for this to finish.')
    cluster = gateway.new_cluster()
else:
    print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
    cluster=gateway.connect(clusters[0].name)
display(cluster)

### Scale the cluster

Use the GatewayCluster widget to adjust the cluster size. Alternatively use the cluster API methods.

For many tasks `1 or 2 workers` will be sufficient, although for larger areas or more complex tasks 5 to 10 workers may be used. If you are new to Dask, start with one worker and then scale your cluster if needed.

In [None]:
min_number_of_workers = 1
max_number_of_workers = 4

# Static scaling
cluster.scale(min_number_of_workers)

# Adaptive scaling
cluster.adapt(minimum=min_number_of_workers, maximum=max_number_of_workers)

### Connect to the cluster
To connect to your cluster and start doing work, use the `get_client()` method. This step will wait until the workers are ready.

>__NOTE:__ This will take a few minutes before your cluster will be ready to use.

In [None]:
client = cluster.get_client()
client.wait_for_workers(n_workers=min_number_of_workers)
client

### Cluster status
To check the status of your cluster, call the `cluster` object. Any cluster widget shown on your page will automatically update to show the new workers.

Click the `"Dashboard"` link to view your clusters workers, tasks and workload.

In [None]:
cluster

### Shutdown the cluster
To shutdown the cluster, simply call the `shutdown()` method. 

> **It is good practice to proactively shutdown your cluster when you have finished using it.**

In [None]:
cluster.shutdown()

<div class='alert alert-info'>
    <p>When you shut down your cluster, the machines that you started above will no longer be connected to this notebook, but they will continue to exist for another 10 minutes. This means that if you start up another remote cluster within the next 10 minutes, those machines will still be available and it will be much faster to start up. You will do exactly that a few cells below.</p>
</div>

## EASI utility functions
Several utility functions are available to help manage your Dask cluster. The most useful is `initialize_dask()`, which will initialize either a local or remote cluster. 

In addition, the `localcluster_dashboard()` function can be used to return the dashboard address for your local Dask dashboard.

Import these function from the `notebook_utils` library.

In [None]:
from notebook_utils import initialize_dask, localcluster_dashboard

### Initialize a local cluster
To initialize a local dask cluster, simply set the `use_gateway=False` in the function call. No other variables are needed for a local cluster.

In [None]:
cluster, client = initialize_dask(use_gateway=False)
display(client)
display(cluster)

For a local cluster, you need to provide the server address of this Jupyter Hub environment. The `EasiDefaults` library contains a variable called `easi.hub` that will give you this URL easily.

In [None]:
print(localcluster_dashboard(client, server=easi.hub))

You now have a local cluster running that you could use for data analysis. See the notebooks in the **/Dask** folder of this repository for more information on using local or remote dask clusters.

In [None]:
# Close the client
client.close()

### Initialize a remote cluster

But let's start up a cluster again. This shoudl run quickly if you shut down your previous remote cluster within the last 10 minutes and no other user has started a cluster in that time.

To initialize a remote dask cluster using the `initialize_dask` utility function, set the `use_gateway=False` and specify the number of `workers` as a simple number. This value will determine the number of dask workers to start. Each default worker will have 8 cores and 28GiB of available RAM. If you set `wait=True`, and no compute nodes are available, this cell will only complete once all workers have started up. ___This may take several minutes.___

This next cell will start a cluster with only one worker (`workers=1`)

In [None]:
cluster, client = initialize_dask(use_gateway=True, workers=1, wait=True)
display(client)
display(cluster)

In [None]:
# Close the client and cluster to re-use the resources for the next example
client.close()
cluster.close()

### Initialize a remote cluster with adaptive scaling
To initialize a remote dask cluster with adaptive scaling, set the `use_gateway=False` and specify the number of `workers` as a tuple (two numbers enclosed in parentheses) representing the minimum and maximum number of workers that you want in your cluster. If you set `wait=True`, this cell will only complete once the __minimum number of workers__ (the first value in `workers`) have started up. ___This may take several minutes if no nodes are already available.___

In [None]:
cluster, client = initialize_dask(use_gateway=True, workers=(1,4), wait=True) # Ask for an adaptive cluster of between 1 and 4 workers
display(client)
display(cluster)

In [None]:
# Now that we have finished, fully shut down the cluster.
client.close()
cluster.close()
cluster.shutdown()

<div class="alert alert-info">
    <h4 class="alert-heading">Note</h4>
    <p>The tutorial notebooks in the <strong>/Dask</strong> folder in the root of this repository provide more information on how and why to use Dask, including information regarding chunking and a range of other Dask concepts.</p>
    <p>Some basic information on Dask chunks is provided below.</p>
</div>

## Dask strategies

As a processing job becomes large (large data or complex processing) a more considered approach to using dask may be necessary to achieve efficient and well-scaled processing. Here are some strategies that we've trialled:

1. More workers *does not* mean faster. If more workers does not near-linearly increase your processing speed then try the next strategy.
1. Restructure the data. Change the dask chunk size (and number of dask tasks) associated with your data objects depending on the type of processing being undertaken.
   - The datacube loads data by spatial layers, so the initial `dask_chunks: {"x": 2048, "y": 2048}` in most `datacube.load()` operations is appropriate.
   - If further processing (valid data, masking, scaling etc) are reasonably per pixel operations then dask chunks can remain as are (spatially-orientated).
   - If time series processing then consider rechunking the xarray dask dataset on the time dimension.
1. Consider GridWorkflow to break your large array into "tiles".
   - The tiles are handled by dask independently of each other, thereby reducing the size of each dask dataset and processing request.
   - You may need to consider node size as well including shared memory and CPU limits per node.
   
### Change the dask chunk size

Use `display(xarray_dask_dataset)` to view the size of the proposed processing task. Select the <html><i class="fa fa-database" style="font-size:18px;color:gray"></i></html> icon and note the dask chunks size and number of tasks as shown below.

<img width="50%" src="../resources/dask-example.jpg">

> __Task limit__: We find that 500,000 tasks is about the limit of the dask scheduler.

> __Chunk size__: Limit each chunk to less than 100 MB, 20-50 MB is better.

### Chunking examples

From dask documentation's best practices https://docs.dask.org/en/latest/array-best-practices.html: 

> You want to choose a chunk size that is large in order to reduce the number of chunks that Dask has to think about (which affects overhead) but also small enough so that many of them can fit in memory at once. 

> The "optimal" chunk size can also depend on how the raw data is stored what operations you intend on performing on the dask Array.

> In general, aiming for a resulting chunk size of a few hundred megabytes should work well. Things to consider include the data type and the dimensions.

See also:
- https://docs.dask.org/
- https://xarray.pydata.org/
- https://xarray.pydata.org/en/stable/dask.html
- https://examples.dask.org/xarray.html

First, create a demonstration dataset with time, x and y dimensions:

In [None]:
import numpy as np
import pandas as pd
import xarray as xr

da = xr.DataArray(
    np.random.rand(50,500,500),
    dims = ("time","x","y"),
    coords={
        "time": pd.date_range("2000-01-01", periods=50),
        "x": np.random.randn(500),
        "y": np.random.randn(500)
    }
)
xarray_dask_dataset = da.chunk()

In [None]:
# Chunking for spatial analysis - each time slice is a single chunk
xarray_dask_rechunked = xarray_dask_dataset.chunk({'time':1})
xarray_dask_rechunked

In [None]:
# Rechunk an xarray dataset for time series processing - each full timeseries is in a chunk of 50x50 pixels
# time:-1 means all time layers
xarray_dask_rechunked = xarray_dask_dataset.chunk({'time':-1, 'x':50, 'y':50})
xarray_dask_rechunked

In [None]:
# Chunked into smaller "cubes"
xarray_dask_rechunked = xarray_dask_dataset.chunk({'time':10, 'x':25, 'y':25})
xarray_dask_rechunked