## Parallel Computing with Dask
### Questions
- How do I start working with larger datasets in parallel? 

### Objectives
- Introduce the concept of Dask, a free and open-source library for parallel computing in Python

In [1]:
import dask
from dask.distributed import Client
client = Client(processes=False)
client

0,1
Client  Scheduler: inproc://192.168.175.42/1178/1  Dashboard: http://192.168.175.42:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 7.52 GB


In [57]:
import xarray as xa
def calc_stats(url):
    bf = xa.open_rasterio(url, chunks={'band': 1, 'x': 1024, 'y': 1024})
    mean_band= bf.mean()
    return mean_band

Test it locally 

In [58]:
url = 'http://landsat-pds.s3.amazonaws.com/c1/L8/227/065/LC08_L1TP_227065_20200608_20200626_01_T1/'
redband = url+'LC08_L1TP_227065_20200608_20200626_01_T1_B{}.TIF'.format(4)

redband

'http://landsat-pds.s3.amazonaws.com/c1/L8/227/065/LC08_L1TP_227065_20200608_20200626_01_T1/LC08_L1TP_227065_20200608_20200626_01_T1_B4.TIF'

In [59]:
mean=calc_stats(redband)

We will use client.submit to execute the computation on a distributed worker:

In [60]:
future = client.submit(calc_stats, redband)

In [61]:
future

Lets do two files

We are now ready to get mean across many files using distributed workers. We can use map operation which is non-blocking, and one can continue to work in the Python shell/notebook while the computations are running.

In [62]:
b4 = url+'LC08_L1TP_227065_20200608_20200626_01_T1_B{}.TIF'.format(4)
b5 = url+'LC08_L1TP_227065_20200608_20200626_01_T1_B{}.TIF'.format(5)
b6 = url+'LC08_L1TP_227065_20200608_20200626_01_T1_B{}.TIF'.format(6)
filenames=[b4,b5,b6]

In [63]:
futures = client.map(calc_stats, filenames)

In [64]:
len(futures)

3

In [65]:
futures[:3]

[<Future: finished, type: xarray.DataArray, key: calc_stats-1911c362c6c16665be9e30a49a447298>,
 <Future: finished, type: xarray.DataArray, key: calc_stats-2e5747e7396d267be307ffa417eaf92f>,
 <Future: finished, type: xarray.DataArray, key: calc_stats-ff6e8b81d5b7433ed09f128fc4eca905>]

In [66]:
from distributed import progress

In [67]:
#progress(futures)

In [68]:
progress(futures)

VBox()

Results come in Dask array that are essentially Numpy. You can call .compute(), it will give result as a NumPy array.

In [69]:
# You can get output of mean Reflectances in a band
means = client.gather(futures)

In [71]:
# Mean of second Red Band, remember its scaled-up
means[2].compute()