# Parallel and Distributed Machine Learning

[Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning.

## Types of Scaling

There are a couple of distinct scaling problems you might face.
The scaling strategy depends on which problem you're facing.

1. Large Models: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. Large Datasets: Data is larger than RAM, and sampling isn't an option.

![](static/ml-dimensions-color.png)

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use distributed joblib and your favorite scikit-learn estimator
* For large datasets, use `dask_ml` estimators or load the data with Dask and pass it off to a distributed machine learning library (e.g. XGBoost)

## Scikit-Learn in 5 Minutes

Scikit-Learn has a nice, consistent API.

1. You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.
2. You call `estimator.fit(X, y)` to train the estimator.
3. Use `estimator` to inspect attributes, make predictions, etc. 

Let's generate some random data.

In [None]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
X[:8]

In [None]:
y[:8]

We'll fit a [Suppport Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html).

Create the estimator and fit it.

In [None]:
estimator = SVC(random_state=0, gamma='scale')
estimator.fit(X, y)

Inspect the learned attributes.

In [None]:
estimator.support_vectors_[:4]

Check the accuracy.

In [None]:
estimator.score(X, y)

## Hyperparameters

Most models have *hyperparameters*. They affect the fit, but are specified up front instead of learned during training.

In [None]:
estimator = SVC(C=0.00001, shrinking=False, random_state=0, gamma='scale')
estimator.fit(X, y)
estimator.support_vectors_[:4]

In [None]:
estimator.score(X, y)

## Hyperparameter Optimization

There are a few ways to learn the best hyperparameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

## Single-machine parallelism with scikit-learn

![](static/sklearn-parallel.png)

As you may suspect, this brute-force search can be done in parallel. Each of the `(hyperparameter, cv-split)`
combinations in a grid search can be done independently from the rest.

Internally, Scikit-Learn uses a library called [joblib](https://joblib.readthedocs.io) to do things in parallel.
By default, a process pool is typically used.

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

## Multi-machine parallelism with Dask

![](static/sklearn-parallel-dask.png)

The Dask and Joblib developers have implemented a [distributed backend](https://joblib.readthedocs.io/en/latest/auto_examples/parallel/distributed_backend_simple.html#sphx-glr-auto-examples-parallel-distributed-backend-simple-py) for joblib so that a Dask cluster can be used to parallelize scikit-learn.
To use it, you need to

1. Connect to a `Client`
2. Make sure your `.fit` call occurs in a `joblib.parallel_backend` context.

In [None]:
from dask.distributed import Client
from dask_kubernetes import KubeCluster

cluster = KubeCluster(n_workers=10)
cluster

In [None]:
client = Client(cluster)
client

That grid search fit 8 models. It'd be silly to parallelize that on a cluster. Let's try it on a larger problem (more hyperparameters).

In [None]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    'kernel': ['rbf', 'poly', 'linear'],
    'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)

In [None]:
from sklearn.externals import joblib

with joblib.parallel_backend("dask"):
    grid_search.fit(X, y)

## Scalable Estimators

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

Some scikit-learn estimators can be trained *incrementally*, on batches of data at a time. As new data arrives, the learned parameters are updated, taking the previous values into account. Instead of using `estimator.fit`, you use `estimator.partial_fit`.  This dovetails with Dask Array's and Dask DataFrame's blocked structure: we can iterate through the blocks of a large dask array to incrementally train a model on a large dataset.

[`dask_ml.wrappers.Incremental`](http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) provides a nice bridge between a larger-than-memory Dask Array and these incremental meta-estiamtors.

In [None]:
import dask
import dask.array as da
import dask_ml.datasets
import dask_ml.wrappers
import sklearn.linear_model
from distributed.utils import format_bytes

In [None]:
X, y = dask_ml.datasets.make_classification(n_samples=100_000_000, n_features=30, chunks=1_000_000)
X

In [None]:
format_bytes(X.nbytes)

In [None]:
X, y = dask.persist(X, y)

## Exercise: Large Example

Use `dask_ml.wrappers.Incremental` to wrap an `sklearn.linear_model.SGDClassifier` to fit the model incrementally.
See http://ml.dask.org/incremental.html and http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental if you need guidance.

Hints:

1. Incremental is a meta-estimator. It should wrap a scikit-learn estimator like `sklearn.linear_model.SGDClassifier`.
2. `Incremental(estimator).fit(X, y, **fit_kwargs)` requires that you pass all the arguments required for `estimator.partial_fit(X, y, **fit_kwargs)` (check the [docs](http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html))

In [None]:
# Your solution here
inc = dask_ml.wrappers.Incremental(...)

In [None]:
%load solutions/07-machine-learning-incremental.py

**Questions**


1. What pattern do you notice on the task stream? Is this computation distributed? Is it parallel?
2. Check the return types of `Incremental.predict` and `Incremental.score`. Are these operations eager or lazy? Are they serial or parallel?

In [None]:
inc.score(X, y)

In [None]:
preds = inc.predict(X)
preds

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [None]:
from dask_ml.cluster import KMeans

In [None]:
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [None]:
%time clf.fit(X)

In [None]:
clf.labels_

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt

subset = X[::10_000]
plt.scatter(subset[:, 0], subset[:, 1], c=clf.labels_[::10_000], alpha=0.25, marker='.')

## Learn More

Additional resources at http://ml.dask.org.