# Dask Distributed 

`Dask.distributed` is a centrally managed, distributed, dynamic task scheduler. The central `dask-scheduler` process coordinates the actions of one or more `dask-worker`s processes that might be spread across multiple machines.

In a nutshell, `dask.distributed` extends `dask` to distributed computing on multiple nodes. 

It can also be used as the scheduler for a single/local node.

One way to start `dask.distributed` locally is with the Client interface. If you create a client without providing an address it will start up a local scheduler and worker for you.

```python
from dask.distributed import Client
client = Client()
client
```

Another way is to start the scheduler and workers by hand. This allows dask.distributed to use multiple machines as workers.

```
$ dask-scheduler
Scheduler started at 127.0.0.1:8786

$ dask-worker 127.0.0.1:8786
$ dask-worker 127.0.0.1:8786
$ dask-worker 127.0.0.1:8786
```
Here the dask-workers could be started on the same machine or different ones.

Launch a Client and point it to the IP/port of the scheduler.

```
from dask.distributed import Client
client = Client('127.0.0.1:8786')
```

If you are interested in the details of the current scheduler/worker setup issue:
```
client.scheduler_info()
```

`dask.distributed` features a sophisticated **web-based monitoring** based on the package `bokeh`. This is exposed in Juptyerlab as a JupyterLab extension. You can use the extension both to start a distributed cluster on your local node, and to examine performance. This is the method we will use to start the scheduler and workers later in this noteobok. 


## Exercises

The following examples demonstrate the dask dashboard for use with dask.delayed (which can be used to parallelize generic Python code), and dask.array (which extends numpy arrays to larger than memory).  

<div class="alert alert-warning alert-block alert-info">
<b>Note:</b> Use the default "Python 3" kernel for thse exercises, not the miniconda kernel. 
</div>

## Delayed example

As we've seen Dask.delayed is a simple and powerful way to parallelize existing code.  It allows you to delay function calls into a task graph with dependencies.  

### Standard Python code

Our example is similar to what we've seen earlier - we simulate work using the sleep function. 

In [None]:
from time import sleep

def inc(x):
    from time import sleep
    sleep(2)
    return x + 1

def dec(x):
    from time import sleep
    sleep(2)
    return x - 1
    
def add(x, y):
    from time import sleep
    sleep(0.5)
    return x + y

### Run sequentially

Should take 4.5 seconds...

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

### Annotate the Python functions with dask


These now become lazy versions. Rather than computing the result immediately, they record what we want to compute and stick that task into a graph that we'll run later in parallel using the distributed scheduler.

In [None]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

Calling these lazy functions takes no time, but we are only constructing a graph.

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

In [None]:
z

### Visualize the graph

In [None]:
z.visualize(rankdir='LR')

### Execute with threads on our local machine
Should take three seconds, as we can do the inc and dec in parallel.

In [None]:
%%time
z.compute()

### Now, create a distributed cluster 

We connect to our cluster. Now rather than running locally, all of our computations will happen on our cluster. In this example, we create a distributed cluster on the node that we are already running on. But the distributed cluster could be a remote HPC system, an Amazon instance, etc.

Create a cluster from the Dask dashboard tab on the left hand side. To create a cluster click `+NEW`. Drag over the client code into this notebook and execute it. The code cell will look similar to the following:

```
from dask.distributed import Client
client = Client("tcp://127.0.0.1:44443")
client
```

Arrange the dask task stream, dask progress, and/or other components in your workspace and monitor performance. 

In [None]:
client.scheduler_info()

In [None]:
%%time
z.compute()

### Parallelize normal Python code

Now we use Dask in "for loop" Python code. This generates graphs instead of doing computations directly, but still looks like the code we had before. 

In [None]:
%%time
zs = []
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)
    
zs = dask.persist(*zs)
total = dask.delayed(sum)(zs)
total.compute()

By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc.

### Custom computation: a tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.

```
finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
```

In [None]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])  # add neighbors
        new_L.append(lazy)
    L = new_L                       # swap old list for new

In [None]:
dask.visualize(*L)

In [None]:
dask.compute(L)

Note the red bars for inter-worker communication.  Also note how there is lots of parallelism at the beginning but less towards the end as we reach the top of the tree where there is less work to do.

## Dask array operations
Let's take a look at some numpy operations and how to use to dask dashboard to guage the performance.

Shut down your cluster and create a new one. Drag the client code across and run it. It will look something like the following:

```
from dask.distributed import Client

client = Client("tcp://127.0.0.1:33111")
client
```



In [None]:
import dask.array as da
x = da.random.random((10000,20000,10), chunks=(1000,1000,5))
y = da.random.random((10000,20000,10), chunks=(1000,1000,5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,2))

Take a look at the size and shape of the array and the chunking:

In [None]:
x

Now call compute and observe the dask task stream, progress and memory usage.

In [None]:
z.compute()

In the next example, we observe the effect on performance of a poor choice of chunk size

In [None]:
%%time
x = da.random.random(10_000_000, chunks=1000) #chunks of size 1000
x.sum().compute()

<div class="alert alert-block alert-warning">
    <b>Note:</b> chunks stands for "chunk shape" rather than "number of chunks", so specifying chunks=1 means that you will have very many chunks, each with exactly one element!
</div>

<mark>Question</mark> How does the performance look? 

<mark>Question</mark> Try to improve things by using larger chunk sizes. 

<mark>Question</mark> What about only a single chunk. What is this equivalent to, and how does performance compare with that?