<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Advanced distributed

This notebook goes through some advanced distributed techniques including:

 * `persist` converts the dask graphs to a set of futures allowing the cluster to point directly to the data in RAM
 * asynchronous computation
 * debugging

In [2]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 39.04 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:33267,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 39.04 GiB

0,1
Comm: tcp://127.0.0.1:42199,Total threads: 2
Dashboard: http://127.0.0.1:39463/status,Memory: 9.76 GiB
Nanny: tcp://127.0.0.1:46657,
Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-_j_sy23q,Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-_j_sy23q

0,1
Comm: tcp://127.0.0.1:41687,Total threads: 2
Dashboard: http://127.0.0.1:42975/status,Memory: 9.76 GiB
Nanny: tcp://127.0.0.1:34687,
Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-85tib5k4,Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-85tib5k4

0,1
Comm: tcp://127.0.0.1:37251,Total threads: 2
Dashboard: http://127.0.0.1:34231/status,Memory: 9.76 GiB
Nanny: tcp://127.0.0.1:42449,
Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-6wem8q75,Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-6wem8q75

0,1
Comm: tcp://127.0.0.1:40763,Total threads: 2
Dashboard: http://127.0.0.1:39397/status,Memory: 9.76 GiB
Nanny: tcp://127.0.0.1:33687,
Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-qewbf_4n,Local directory: /home/julia/dask-tutorial/dask-worker-space/worker-qewbf_4n


### Persist

Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.

In the example here, we repeat a calculation from the Array chapter - notice that each call to `compute()` is roughly the same speed, because the loading of the data is included every time.

In [None]:
%run prep.py -d random

In [None]:
import h5py
import os
import dask.array as da

f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']

x = da.from_array(dset, chunks=(1000000,))

In [None]:
%time x.sum().compute()

In [None]:
%time x.sum().compute()

If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could `wait()` on this process), then further computations will be much faster.

In [None]:
# changes x from a set of instructions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = client.persist(x)

In [None]:
%time x.sum().compute()

In [None]:
%time x.sum().compute()

Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often. 

**Exercise**: how is the memory associated with `x` released, once we know we are done with it?

## Asynchronous computation
<img style="float: right;" src="https://upload.wikimedia.org/wikipedia/commons/thumb/3/32/Rosenbrock_function.svg/450px-Rosenbrock_function.svg.png" height=200 width=200>

One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.

Watching the [diagnostics dashboard](../../9002/status) as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.

Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:

In [None]:
# a simple function with interesting minima
import time

def rosenbrock(point):
    """Compute the rosenbrock function and return the point and result"""
    time.sleep(0.1)
    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
    return point, score

Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in. 

In [None]:
from bokeh.io import output_notebook, push_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, show
import numpy as np
output_notebook()

# set up plot background
N = 500
x = np.linspace(-5, 5, N)
y = np.linspace(-5, 5, N)
xx, yy = np.meshgrid(x, y)
d = (1 - xx)**2 + 2 * (yy - xx**2)**2
d = np.log(d)

p = figure(x_range=(-5, 5), y_range=(-5, 5))
p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");

We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.

We print the function value and current best location each time we have a new best value.

In [None]:
from dask.distributed import as_completed
from random import uniform

scale = 5                  # Initial random perturbation scale
best_point = (0, 0)        # Initial guess
best_score = float('inf')  # Best score so far
startx = [uniform(-scale, scale) for _ in range(10)]
starty = [uniform(-scale, scale) for _ in range(10)]

# set up plot
source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
p.circle(source=source, x='x', y='y', color='c')
t = show(p, notebook_handle=True)

# initial 10 random points
futures = [client.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
iterator = as_completed(futures)

for res in iterator:
    # take a completed point, is it an improvement?
    point, score = res.result()
    if score < best_score:
        best_score, best_point = score, point
        print(score, point)

    x, y = best_point
    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))
    
    # update plot
    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
    push_notebook(document=t)
    
    # add new point, dynamically, to work on the cluster
    new_point = client.submit(rosenbrock, (newx, newy))
    iterator.add(new_point)  # Start tracking new task as well

    # Narrow search and consider stopping
    scale *= 0.99
    if scale < 0.001:
        break
point

## Debugging

When something goes wrong in a distributed job, it is hard to figure out what the problem was and what to do about it. When a task raises an exception, the exception will show up when that result, or other result that depend upon it, is gathered.

Consider the following delayed calculation to be computed by the cluster. As usual, we get back a future, which the cluster is working on to compute (this happens very slowly for the trivial procedure).

In [None]:
@delayed
def ratio(a, b):
    return a // b

ina = [5, 25, 30]
inb = [5, 5, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = client.compute(out)
f

We only get to know what happened when we gather the result (this is also true for `out.compute()`, except we could not have done other stuff in the meantime). For the first set of inputs, it works fine.

In [None]:
client.gather(f)

But if we introduce bad input, an exception is raised. The exception happens in `ratio`, but only comes to our attention when calculating the sum.

In [None]:
ina = [5, 25, 30]
inb = [5, 0, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = client.compute(out)
client.gather(f)

The display in this case makes the origin of the exception obvious, but this is not always the case. How should this be debugged, how would we go about finding out the exact conditions that caused the exception? 

The first step, of course, is to write well-tested code which makes appropriate assertions about its input and clear warnings and error messages when something goes wrong. This applies to all code.

The most typical thing to do is to execute some portion of the computation in the local thread, so that we can run the Python debugger and query the state of things at the time that the exception happened. Obviously, this cannot be performed on the whole data-set when dealing with Big Data on a cluster, but a suitable sample will probably do even then.

In [None]:
import dask
with dask.config.set(scheduler="sync"):
    # do NOT use client.compute(out) here - we specifically do not
    # want the distributed scheduler
    out.compute()

In [None]:
# uncomment to enter post-mortem debugger
# %debug

The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can't simply run the whole thing 
in one local thread, else you wouldn't have used Dask in the first place. So the code above should only be used on a small part of the data that also exhibits the error. 
Furthermore, the method will not work when you are dealing with futures (such as `f`, above, or after persisting) instead of delayed-based computations.

As an alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependencies locally for execution.

In [None]:
client.recreate_error_locally(f)

In [None]:
# uncomment to enter post-mortem debugger
# %debug

Finally, there are errors other than exceptions, when we need to look at the state of the scheduler/workers. In the standard "LocalCluster" we started, we
have direct access to these.

In [None]:
[(k, v.state) for k, v in client.cluster.scheduler.tasks.items() if v.exception is not None]