# Distributed 


<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Distributed" data-toc-modified-id="Distributed-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Distributed</a></span><ul class="toc-item"><li><span><a href="#Distributed-Cluster" data-toc-modified-id="Distributed-Cluster-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Distributed Cluster</a></span></li><li><span><a href="#Create-and-Connect-to-Dask-Distributed-Cluster" data-toc-modified-id="Create-and-Connect-to-Dask-Distributed-Cluster-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Create and Connect to Dask Distributed Cluster</a></span></li><li><span><a href="#Perfom-computation-on-a-dask-array" data-toc-modified-id="Perfom-computation-on-a-dask-array-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Perfom computation on a dask array</a></span></li><li><span><a href="#Going-Further" data-toc-modified-id="Going-Further-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Going Further</a></span></li></ul></li></ul></div>

## Distributed Cluster

As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy syntax on data collections. None of this would be very useful, if there weren't also a way to execute these graphs, in a parallel and memory-aware way. So far we have been calling `thing.compute()` or `dask.compute(thing)` without worrying what this entails. Now we will discuss the options available for that execution, and in particular, the distributed scheduler, which comes with additional functionality.

## Create and Connect to Dask Distributed Cluster

Let's begin by importing `Client` and `LocalCluster` objects/classes

In [None]:
from dask.distributed import Client, LocalCluster

In [None]:
# Setup a local cluster.
# By default this sets up 1 worker per core
cluster = LocalCluster() 
cluster

☝️ Don't forget to click the link above to view the scheduler dashboard! (you may wish to have both the notebook and dashboard side-by-side)

In [None]:
client = Client(cluster) # Connect to a Dask cluster in order to submit computation
client

## Perfom computation on a dask array

In [None]:
import dask.array as da
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

In [None]:
bigshape = (500, 2400, 3600)
chunk_shape = (10, 1200, 1800)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

In [None]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()
big_calc

In [None]:
%time big_calc.compute()

**Create a histogram**

In [None]:
random_values = da.random.normal(size=(1e8,), chunks=(20e6,))
hist, bins = da.histogram(random_values, bins=10, range=[-5, 5]) 
random_values

In [None]:
hist

In [None]:
hist.visualize()

In [None]:
%%time 
x = 0.5 * (bins[1:] + bins[:-1])
width = np.diff(bins)
plt.bar(x, hist, width);

## Going Further

- [Dask Tutorial on Distributed](https://github.com/dask/dask-tutorial/blob/master/05_distributed.ipynb)
- [Dask Tutorial on Advanced Distributed](https://github.com/dask/dask-tutorial/blob/master/06_distributed_advanced.ipynb)

<div class="alert alert-block alert-success">
  <p>Previous: <a href="02_dask_arrays.ipynb">Dask Arrays</a></p>
  <p>Next: <a href="04_dask_and_xarray.ipynb">Dask + Xarray</a></p>
</div>