<img src="assets/dask_horizontal.svg"
     width="45%"
     alt="Dask logo\">
     
# Parallel Computing in Python with Dask

This notebook provides a high-level overview of Dask. We discuss why you might want to use Dask, the high-level and low-level APIs for generating computational graphs, and the schedulers which allow for the parallel execution of these graphs.

# Overview

### What is Dask?

- Dask is a flexible, open source library for parallel computing in Python

    - GitHub: [https://github.com/dask/dask](https://github.com/dask/dask)
    
    - Documentation: [https://docs.dask.org](https://docs.dask.org)

- Scales the existing Python ecosystem

### Why Dask?

- Enables parallel and larger-than-memory computations
- Uses familiar APIs you're used to from projects like NumPy, pandas, and scikit-learn
- Allows you to scale existing workflows with minimal code changes
- Dask works on your laptop, but also scales out to large clusters
- Offers great built-in diagnosic tools

### Dask vs. Spark

#### Reasons you might choose Spark

- You prefer Scala or the SQL language
- You have mostly JVM infrastructure and legacy systems
- You want an established and trusted solution for business
- You are mostly doing business analytics with some lightweight machine learning
- You want an all-in-one solution

#### Reasons you might choose Dask

- You prefer Python or native code, or have large legacy code bases that you do not want to entirely rewrite
- Your use case is complex or does not cleanly fit the Spark computing model
- You want a lighter-weight transition from local computing to cluster computing
- You want to interoperate with other technologies and don’t mind installing multiple packages

### Components of Dask

<img src="assets/dask-overview.png"
     width="85%"
     alt="Dask components\">
     

# Parallel and Distributed Machine Learning

[Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning.

### Types of Scaling

There are a couple of distinct scaling problems you might face.
The scaling strategy depends on which problem you're facing.

1. CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. Memory-bound: Data is larger than RAM, and sampling isn't an option.

![](assets/ml-dimensions.png)

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator
* For large datasets, use `dask_ml` estimators

### Machine Learning with sklearn

In [1]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
estimator = SVC(random_state=0)
estimator.fit(X, y)
print(estimator.score(X, y))
estimator.support_vectors_[:4]

0.905


array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959]])

### Hyperparameter Optimization

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

In [2]:
%%time

estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)
grid_search.best_params_, grid_search.best_score_

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] END ................................C=0.001, kernel=rbf; total time=   8.4s
[CV] END ................................C=0.001, kernel=rbf; total time=   9.9s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.7s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.8s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.9s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.8s
[CV] END ................................C=10.0, kernel=poly; total time=   3.5s
[CV] END ................................C=10.0, kernel=poly; total time=   3.3s
CPU times: user 46.5 s, sys: 348 ms, total: 46.9 s
Wall time: 47 s


({'C': 10.0, 'kernel': 'rbf'}, 0.9086000000000001)

### Single-machine parallelism with scikit-learn

![](assets/unmerged_grid_search_graph.svg)

Scikit-Learn has nice *single-machine* parallelism, via Joblib.
Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword.
This controls the number of CPU cores that will be used.

In [3]:
%%time

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)
grid_search.best_params_, grid_search.best_score_

Fitting 2 folds for each of 4 candidates, totalling 8 fits
CPU times: user 9.3 s, sys: 250 ms, total: 9.55 s
Wall time: 28.9 s


({'C': 10.0, 'kernel': 'rbf'}, 0.9086000000000001)

### Multi-machine parallelism with Dask

![](assets/merged_grid_search_graph.svg)

Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model. 

If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to `Client` something like

```
c = Client('tcp://my.scheduler.address:8786')
```

Details on the many ways to create a cluster can be found [here](https://docs.dask.org/en/latest/setup/single-distributed.html).

In [4]:
from dask.distributed import Client
import joblib

client = Client(n_workers=4)
client

0,1
Client  Scheduler: tcp://127.0.0.1:49198  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 16.00 GiB


In [5]:
%%time

estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)

with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)
    
grid_search.best_params_, grid_search.best_score_

Fitting 2 folds for each of 4 candidates, totalling 8 fits
CPU times: user 10.3 s, sys: 337 ms, total: 10.7 s
Wall time: 27.1 s


({'C': 10.0, 'kernel': 'rbf'}, 0.9086000000000001)

### Training on Large Datasets

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [6]:
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using scikit-learn.

In [7]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

array([[ 1.00796679,  4.34582168,  2.15175661,  1.04337835, -1.82115164,
         2.81149666, -1.18757701,  7.74628882,  9.36761449, -2.20570731,
         5.71142324,  0.41084221,  1.34168817,  8.4568751 , -8.59042755,
        -8.35194302, -9.55383028,  6.68605157,  5.34481483,  7.35044606],
       [ 9.49283024,  6.1422784 , -0.97484846,  5.8604399 , -7.61126963,
         2.86555735, -7.25390288,  8.89609285,  0.33510318, -1.79181328,
        -4.66192239,  5.43323887, -0.86162507,  1.3705568 , -9.7904172 ,
         2.3613231 ,  2.20516237,  2.20604823,  8.76464833,  3.47795068],
       [-2.67206588, -1.30103177,  3.98418492, -8.88040428,  3.27735964,
         3.51616445, -5.81395151, -7.42287114, -3.73476887, -2.89520363,
         1.49435043, -1.35811028,  9.91250767, -7.86133474, -5.78975793,
        -6.54897163,  3.08083281, -5.18975209, -0.85563107, -5.06615534],
       [-6.85980599, -7.87144648,  3.33572279, -7.00394241, -5.97224874,
        -2.55638942,  6.36329802, -7.97988653,  

The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

In [8]:
n_samples_per_block = 200000
n_blocks = 100

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)
X

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000000, 20)","(200000, 20)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.98 GiB 30.52 MiB Shape (20000000, 20) (200000, 20) Count 400 Tasks 100 Chunks Type float64 numpy.ndarray",20  20000000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000000, 20)","(200000, 20)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [9]:
# X = X.persist()  # Only run this on the cluster.

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [10]:
%%time

from dask_ml.cluster import KMeans

clf = KMeans(init_max_iter=3, oversampling_factor=10)
clf.fit(X)
clf.labels_[:10].compute()

CPU times: user 16.4 s, sys: 2.23 s, total: 18.6 s
Wall time: 1min 29s


array([0, 1, 0, 3, 3, 3, 4, 1, 3, 7], dtype=int32)

In [11]:
client.shutdown()

# Dask-SQL

Core features

* SQL parsing, optimization, planning, translation for Dask
* Start with data from...
    * files in the cloud (e.g., S3)
    * any data in Python (e.g., Pandas or Dask Dataframe)
    * modern data catalog/aggregation like Intake (https://github.com/intake/intake)
    * __direct from enterprise data lakes/warehouses: Hive Metastore, Databricks, etc.__
        * Bring the SQL integration power of Spark right into the Python/Dask world
* Query cached datasets to leverage the speed of a large distributed memory pool

Bonus features
* user-defined functions
* a SQL server
* ML in SQL
* a command-line client
* more in the works!

Learn more...
* Homepage: https://nils-braun.github.io/dask-sql/
* Docs: https://dask-sql.readthedocs.io/en/latest/
* Source: https://github.com/nils-braun/dask-sql

In [1]:
from dask.distributed import Client

client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:51435  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 16.00 GiB


In [2]:
from dask_sql import Context

c = Context()

In [3]:
import dask.dataframe as dd
import os

df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [4]:
c.create_table("flights", df)

result = c.sql('SELECT * FROM flights LIMIT 10')
result

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [5]:
type(result)

dask.dataframe.core.DataFrame

In [6]:
result.compute()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0
5,1990-01-06,6,1539.0,1540,1653.0,1701,US,33,,74.0,...,,-8.0,-1.0,EWR,PIT,319.0,,,False,0
6,1990-01-07,7,1553.0,1540,1713.0,1701,US,33,,80.0,...,,12.0,13.0,EWR,PIT,319.0,,,False,0
7,1990-01-08,1,1543.0,1540,1656.0,1701,US,33,,73.0,...,,-5.0,3.0,EWR,PIT,319.0,,,False,0
8,1990-01-09,2,1540.0,1540,1704.0,1701,US,33,,84.0,...,,3.0,0.0,EWR,PIT,319.0,,,False,0
9,1990-01-10,3,1608.0,1540,1740.0,1701,US,33,,92.0,...,,39.0,28.0,EWR,PIT,319.0,,,False,0


In [7]:
query = '''
SELECT
    Origin
    , AVG(DepDelay) AS avg_dep_delay
FROM
    flights
GROUP BY Origin
'''

result = c.sql(query)
result.compute()

Unnamed: 0,Origin,avg_dep_delay
0,EWR,10.295469
1,JFK,10.351299
2,LGA,7.431142


### What about "creating the table completely in SQL"?

First, let's go "full SQL" so we don't even need to wrap our queries in Python...

In [8]:
c.ipython_magic()  # ignore the error?

In [9]:
%%sql

CREATE TABLE allsql WITH (
    format = 'csv',
    location = 'data/nycflights/1990.csv' -- any Dask-accessible source or format (cloud/S3/..., parquet/ORC/...)
)

In [11]:
%%sql

SELECT
    Origin
    , AVG(DepDelay) AS avg_dep_delay
FROM
    allsql
GROUP BY Origin

Unnamed: 0,Origin,avg_dep_delay
0,EWR,9.168411
1,JFK,11.857274
2,LGA,8.560045
