# Dask Distributed Basics

In [3]:
import time
import numpy as np
from dask.distributed import Client
from dask import delayed, compute, array as da
import zarr

## Initializing the Client
Before running any dask operations on a set of threads/processes/workers, we need to connect to or create a distributed scheduler. Remote schedulers can be created using the `dask-scheduler` command, but for simplicity, it is possible to create a local cluster by instantiating a `Client`

In [4]:
client = Client()

## Running distributed tasks
Once the client is instantiated, tasks are automatically submitted to the scheduler, where they are distributed across the set of worker instances. If [bokeh](https://bokeh.pydata.org/en/latest/) is installed on the machine running the cluster, the status of these workers, progress toward completion, and a number of other helpful statistics and graphs can be viewed in the web browser by navigating to http://ip-address-of-scheduler:8787 (8787 is the default port).


### Running Distributed Tasks using Delayed
The `delayed` decorator is one of the simplest ways to run tasks using dask-distributed. Applying the decorator to a function, or wrapping a particular call to the function with `delayed` returns a task, which can be evaluated using `compute`. `compute` automatically connects to the distributed scheduler, serving tasks to each worker. In the following example, each task has been slowed down using `time.sleep`, so that it is easier to see what is going on in the console, which can be found at http://localhost:8787/status in real time.

In [19]:
@delayed
def add_one(x):
    time.sleep(1)
    return x + 1

tasks = [add_one(x) for x in range(64)]
results = compute(*tasks)
display(np.asarray(results))

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
       18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34,
       35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51,
       52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64])

### Running Distributed Tasks using Map / Gather
dask-distributed's Client class also provides several useful functions which can be used to distribute tasks. `map` takes a function, and a list of arguments, and maps them into `Future` objects, which can be evaluated later using `gather`. Similar functions exist for individual future objects, including `submit` and `result`. More information on these functions can be found [here](https://distributed.readthedocs.io/en/latest/quickstart.html).

In [24]:
def add_one(x):
    time.sleep(1)
    return x + 1

tasks = client.map(add_one, range(64))
results = client.gather(tasks)
display(np.asarray(results))

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
       18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34,
       35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51,
       52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64])

### Writing Data to Zarr using Dask Distributed
Dask Array's `to_zarr` method automatically distributes work across the distributed scheduler when present. However, the function can only be used to write an entire zarr array, and cannot change the size of that array.

In [12]:
# Initialize dask array of random integers
dask_array = da.random.randint(0,100, size=(128,128,128), chunks = (128,128,128))

# Create zarr array in directory store
store = zarr.storage.DirectoryStore('./example.zarr')
root = zarr.group(store=store, overwrite=True)
arr = root.create('array', shape=(128,128,128), chunks = (32,32,32))
# Write data to zarr store (distributed)
dask_array.to_zarr(arr)

# Read beginning of zarr array (should be random)
display(arr[0])

array([[91., 39., 57., ..., 92., 20., 96.],
       [54., 53., 23., ..., 12., 10., 27.],
       [89.,  3., 10., ..., 62., 53., 31.],
       ...,
       [27., 20., 92., ..., 23.,  3., 87.],
       [34., 55., 48., ..., 52., 55., 11.],
       [34., 33., 49., ..., 36.,  9., 15.]])