# Exploring the use of multiple dask clients and clusters

This notebook addresses and provides examples for the use of dask with multiple clients (and multiple clusters).

The point of departure is that is might be preferable to be able to employ multiple clients for workflows and/or have tasks spawn their own client to distribute subtasks

First we import the basic dask requirements

In [None]:
from dask.distributed import Client, LocalCluster, default_client
import dask


Next, given the example of scaling sklearn workflows hat provide parallelism via joblib to dask clusters we import joblib and its backends contexts

In [None]:
import joblib
from joblib import parallel_backend, Parallel, delayed

### Specifiying a client for use with the joblib backend
We will first consider the how to specify which client to use in combination with joblib
Note: Using joblib in this fashion only works for workflows for which the data required fits in RAM, i.e. cpu bound workflows. Larger than memory data sets require a different approach (see also below)

First we define a toy function/task to distribute via joblib

In [None]:
import time
def slow_power(x, p):
    time.sleep(1)
    return x ** p

and subsequently create a local Dask cluster (using the local resources f the computer/work station) with dask.distributed. This is then also applicable to other remote clusters

In [None]:
cluster = LocalCluster()

We can then create a client connected to the cluster for use with joblib

In [None]:
jlclient = Client(cluster)

and inspect the client

In [None]:
jlclient

Note: in this form jlclient registers as the default client/scheduler, so any unspecified call to dask will be handled by this client. This behaviour can be avoided by using `set_as_deafult=False` in the client instantiation.

In [None]:
jlclient2 = Client(cluster,set_as_default=False)

In [None]:
jlclient2

In [None]:
default_client()

joblib can then be set to leverage the dask cluster as below.
Note: configuration of the backend can aso be done using joblib `parallel_config`

In [None]:
with joblib.parallel_backend(backend="dask",client=jlclient):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))

For comparison below is the same call without specification of a client and with a specified, but non-existent client 

client not specified, default client used

In [None]:
with joblib.parallel_backend(backend="dask"):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))

non-existent specified client (`dummy`)

In [None]:
with joblib.parallel_backend(backend="dask",client=dummy):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))

Above we have considered the specification of the client to be used the joblib backend, demonstrating how this can be achived

It is also relevant to consider this in more detail. The client objects above both represent connections to the SAME scheduler, i.e. that running the LocalCluster instance we initiated. In terms of references futures and distributed data/results this means thatas long as a reference to a futureor data exists in ANY of these clients the cluster/scheduler will maintain that.
Similarly for higher order dask objects. (dask arrays, bags) the local representation is AUTOMATICALLY connected to the default client, so manipulations in the local representation are pushed to the clusters/schedulers representation via that client.  
Avoiding this is problematic; while one could set all clients as non-defualt this would entail that the default scheduler is the local (thread) scheduler, which would cause issues unless ALL actions are explicity assigned to a specified scheduler.

This behaviour is basically tied to the local process.

In order to really split two steps of a dask workflow and enable two clients with no cross-talk to be used, each step should be lauched in a separate process (notebook). Data can then be shared between the clients by publishing to the cluster.

Any advantge, however, is not entirely apparent.

## Specifying the client for use with higher order Dask objects

Nevertheless, it makes sense to consider how to specifiy the client for operations with higher order dask objects. For example, clients may connect to different clusters optimized for different parts of a workflow (this would require some for of serialization and loading of the interim result)

This can be done in calling the compute functions:

In [None]:
da.compute(client=jlclient)

or via the asynchronous calls from the specified client

In [None]:
jlclient2.compute(da)

In addition, non default clients can be selected temporarily as

In [None]:
with dask.distributed.Client.as_current(jlclient2):
    with joblib.parallel_backend(backend="dask",client=dask.distributed.Client.current()):
        parallel = Parallel(verbose=100)
        print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
    