## Before we start computing



Start a Dask client



In [1]:
# from dask.distributed import Client, progress
# client = Client(processes=False, threads_per_worker=4, n_workers=4)
# client
import dask.multiprocessing
dask.config.set(scheduler=dask.multiprocessing.get)

## Let's read some data



from the [Gridded Ensemble Precipitation and Temperature Estimates over the Contiguous United States](https://www.earthsystemgrid.org/dataset/gridded_precip_and_temp.html)



In [1]:
ds = xr.open_mfdataset('nc/*.nc4', engine='netcdf4', concat_dim='ensemble', chunks={'time': 366})

How big are our data?



In [1]:
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))
ds.info()

## What's our domain?



In [1]:
import numpy as np
%matplotlib inline
ds['mask'] = ds['elevation'].isel(ensemble=0, drop=True).astype(np.int)
ds['mask'] = ds['mask'].where(ds['mask'] > 0)
ds['mask'].plot()

## What do our arrays look like?



In [1]:
for name, da in ds.data_vars.items():
    print(name, da.data)

## Let's calculate some things



In [1]:
da_mean = ds['t_mean'].mean(dim='time')
da_mean

In [1]:
da_spread = da_mean.max(dim='ensemble') - da_mean.min(dim='ensemble')
da_spread

## What's happening?



In [1]:
from dask import visualize
visualize(da_mean)

## Actually computing results



In [None]:
out = da_spread.load()

## Or plotting results



In [1]:
da_spread.plot(robust=True, figsize=(10, 6))

## Let's do a simple comparison



Configure scheduler to use only one thread



In [1]:
dask.config.set(scheduler='single-threaded')

In [1]:
lat = 42.37
lon = -72.52
ds.coords['ensemble'] = range(100)
%time ds.sel(lat=lat,lon=lon,method='nearest').pcp.plot(figsize=(10, 6))

Let's try that again



In [1]:
dask.config.set(scheduler=dask.multiprocessing.get)
%time ds.sel(lat=lat,lon=lon,method='nearest').pcp.plot(figsize=(10, 6))

## What if we had a simpler data structures?



Python has the `multiprocessing` module



In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

## Types of parallel execution



-   Synchronous
    -   `Pool.map`
    -   `Pool.apply`
-   Asynchronous
    -   `Pool.map_async`
    -   `Pool.apply_async`
-   `Process`



## Let's get some data



on fuel economy from the [DOE](https://catalog.data.gov/dataset/fuel-economy-data)



In [1]:
import pandas as pd
df = pd.read_csv("examples/vehicles.csv")

## Can you create a list of that has the combined MPG of each car multiplied by 1.2 if it's FWD?



-   Write a function that acts on each row
-   The columns needed are `comb08` and `drive`
-   Use a for loop and `iteritems` after transposing the DataFrame



In [1]:
import time

## Now let's try to parallelize this



Use `Pool.map`



In [1]:
%time 

`map` blocks returning results until *all* threads complete the job



## What about asynchronous?



Use `Pool.map_async`



In [1]:
%time 