# 3.1. A Cheyenne Example

This example is meant to use one of the RDA datasets available on Cheyenne's GLADE storage system.

In [1]:
import dask
import time

## How to Set Up a (Local) Dask Distributed Cluster

The easiest way to start a cluster is directly from your notebook using the `Client` object.

In [None]:
from dask.distributed import Client

from dask_jobqueue import PBSCluster
cluster = PBSCluster(queue='premium', project='STDD0006', processes=1, threads=1, resource_spec='select=1:ncpus=36')
cluster.start_workers(4)

client = Client(cluster)
client

### LocalCluster

If you are running on your laptop, you are using just a "local cluster."  But you can see that launching a cluster on any other machine (such as Cheyenne) is similar.  You create the appropriate `*Cluster` object and pass this to the `Client`.  The `dask_jobqueue` package (which must be installed separately with:

    pip install git+https://github.com/dask/dask-jobqueue.git

in your `pangeo` conda environment) provides additional `*Cluster` objects for `PBSCluster` and `SLURMCluster`, for managing clusters using the PBS and SLURM job schedulers, respectively.

In [None]:
client

### Dashboard

Many of the diagnostics that we've already seen in the last section are available from the **Dashboard** link in the information box returned when we displayed `client` in the above output.  If you click that link, it will take you to a page that provides many of those diagnostics in one place!

Each kind of task on the *Status* page of the Dashboard is displayed in block, with start and stop times roecorded for each of the following:

1. Serialization (gray)
2. Dependency gathering from peers (red)
3. Disk I/O to collect local data (orange)
4. Execution times (additional colors chosen for each task)

displayed in the *Task Stream* section of the Dashboard.

### There are many ways to set up a cluster...

For more ways to set up a cluster with Dask, [click here](https://distributed.readthedocs.io/en/latest/setup.html).

## The Client

The `Client` object provides an interface with the main scheduler for your cluster.  It provides a number of functions that you can use directly to run code on the cluster (instead of just the scheduler).  Some of these functions and attributes are described below.

### Map

Satisfying the `concurrent.futures` standard, you can `map` a function across an iterative object.  The result of `client.map` is a `Future` object that is stored on the worker.

In [None]:
def dbl(x):
    time.sleep(1)
    return 2*x

Now, let's apply this new `dbl` function to an iterable object (`range`) on the distributed workers...

In [None]:
doubles = client.map(dbl, range(80))
doubles[:4]

**QUICKLY, go check out the Dashboard while this is running...**

In [None]:
doubles[:4]

In [None]:
type(doubles)

At this point, we have a list of `Future` objects in our notebook.  

*Keep in mind* that our notebook is attached to the *scheduler*.  These `Future` objects point to objects that are stored (in the memory) of the *workers*!  So, we have automatically distributed the data produced by `range(80)` onto the workers by applying `map`.

### Submit

Now that the data is on the workers, we can apply functions to the entire distributed dataset by using the `submit` method of the `Client`.

In [None]:
sum_doubles = client.submit(sum, doubles)
display(sum_doubles)
time.sleep(8)
display(sum_doubles)

### Gather

With the `doubles` data stored on the workers, we can bring that data to the notebook/scheduler with the `gather` method.

In [None]:
print(client.gather(doubles))

#### NOTE:

Do not gather unless you absolutely must!  It is usually much more efficient to keep the data distributed across the cluster and `submit` functions that act on the distributed data than it is to bring the data to the scheduler.

### Scatter

You can do the reverse of `gather` if you want to distribute data that you already have on your scheduler.

In [None]:
dist_data = client.scatter(range(4000))
dist_data[:5]

In [None]:
dist_data[4].result()

### Compute

Like the previous discussions, you can use the `Client` `compute` method to force the computation of a task graph created by using Dask Delayed, for example.

In [None]:
@dask.delayed
def dbl(x):
    time.sleep(1)
    return 2*x

In [None]:
@dask.delayed
def inc(x):
    time.sleep(1)
    return x + 1

In [None]:
@dask.delayed
def dsum(x):
    time.sleep(1)
    return sum(x)

In [None]:
data = [2, 5, 7, 3, 1, 8, 6, 9]

In [None]:
%time sum_odd_data = dsum( inc(dbl(x)) for x in data )
sum_odd_data

In [None]:
%time result = client.compute(sum_odd_data)
result

#### NOTE:

The return value of `Client.compute` is a `Future` object!  That means that, unlike in the previous sections, the `compute` (and `persist`) operations will be done asynchronously!

And once the result has been computed, then you can get the result of the `Future` with the `result()` method.

In [None]:
result.result()

### Persist

Similarly, you can `persist` the results of a tash graph (just like in the previous sections) by using the `Client` `persist` method.

Because `Client.persist` is asynchronous, we can set up initial data that is distributed *in memory* across the cluster using the `persist` method.

### Cancel & Restart & Close

You can cancel a computation on the cluster by using the `Client.cancel` method and passing it the `Future` associated with the computation result.

In [None]:
client.cancel(result)

In [None]:
result

And you can completely kill all `Future`s and restart the cluster with `restart`:

In [None]:
client.restart()

And you can close down a cluster with the `close` method.

In [None]:
client.close()

In [None]:
client

## Dask Delayed and Collections (next lesson)

Once you have initialized a `Client` (i.e., connected to the scheduler), normal Dask features like Dask Delayed, Dask Array, etc. will work using the client's scheduler by default.  So, no special mechanism is needed to get all of the previous examples to work with `distributed`.

## Advice for Efficiency

When running with the `distributed` scheduler, there are a few things you should consider for the sake of efficiecy:

1. As mentioned above, avoid `gather`ing the data to the scheduler.  Keep the data distributed!
2. Avoid creating too many tasks that need to be distributed across the cluster!  (The `distributed` scheduler adds about *1 ms overhead for each task*, depending upon the network.  Thus, as a rule of thumb, don't worry about distributing your tasks if the serial operation takes less than about 100 ms to run.)
3. With the `distributed` scheduler, you can configure workers to use multiple threads *per worker*.  (For example, you can create one worker for each remote node on a cluster, and each worker can run as many threads as cores on that node.)
4. Tasks are assigned to workers using *heuristics*, so keep in mind that Dask might not get it perfect.