# Scalable Machine Learning with Dask-ML

*https://coiled.io/blog/scalable-machine-learning.html*

Standard disclaimer: Python has great tools for single-node machine learning.
Distributed computing is fundamentally harder, so think twice before reaching for
Dask-ML.

In [None]:
import coiled
cluster = coiled.Cluster(n_workers=10)

from dask.distributed import Client
client = Client(cluster)
client

## When *might* you need scalable / distributed ML?

1. You have a compute-bound problem. Slow training time is affecting your workflow.
2. You have a memory-bound problem *and* more data helps.

## Dimensions of Scale

![](dimensions.png)

## 1. CPU (Compute)-Bound Problems

In this type of problem, your dataset fits in RAM just fine, but you're waiting around for your CPU (or GPU, TPU) to finish it's computations. This commonly occurs when there's many mostly independent components to your estimator

* Hyperparameter Optimization: Many CV-splits / hyperparameter combinations
* Ensemble estimator: Combine predictions from many estimators

These are relatively straightforward to parallelize. Here's a small example.

In [None]:
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV

X, y = make_classification(n_samples=5_000, random_state=0)
X[:5]

param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           cv=5,
                           n_jobs=-1)
grid_search

Fitting a single estimator takes 1-2 seconds on my laptop.

In [None]:
%time grid_search.estimator.fit(X, y)

Now let's fit the whole grid search on the cluster.

In [None]:
%%time
import joblib

with joblib.parallel_backend('dask', scatter=[X, y]):
    grid_search.fit(X, y)

## What Happened?

Internally, scikit-learn parallelizes `for` loops with joblib. Typically that parallel `for` loop uses threads or processes on a single machine. We worked with the scikit-learn / joblib devs to implement a `dask` parallel backend, so you can parallelize things on a cluster.

<img src="joblib.png" width="100%"/>

<img src="joblib-dask.png" width="100%"/>

Pretty much anything that uses joblib internally can use the Dask joblib backend.

* Anything in scikit-learn with `n_jobs` (fitting trees in a Random Forest, voting methods, hyperparamter optimization, ...)
* TPOT

## Real Example

Let's apply the joblib parallelization to a more realistic example.

In [None]:
import dask.dataframe as dd

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    blocksize="16 MiB",
    storage_options=dict(anon=True),
)
df

In [None]:
sdf = df.sample(frac=0.001, random_state=0).compute()
sdf.head()

In [None]:
sdf = sdf.dropna()
len(sdf)

In [None]:
sdf.dtypes

In [None]:
import pandas as pd

vendor_dtype = pd.CategoricalDtype([1, 2, 4])
ratecode_dtype = pd.CategoricalDtype([1, 2, 3, 4, 5, 6, 99])
store_and_fwd_flag = pd.CategoricalDtype(["N", "Y"])
payment_type = pd.CategoricalDtype([1, 2, 3, 4, 5])

dtypes = {
    "VendorID": vendor_dtype,
    "RatecodeID": ratecode_dtype,
    "store_and_fwd_flag": store_and_fwd_flag,
    "payment_type": payment_type,
    "passenger_count": "int",
}
sdf = sdf.astype(dtypes)
X = sdf.drop(["tip_amount", "total_amount"], axis="columns")
y = sdf["tip_amount"] > 0
y.mean()

In [None]:
sdf.dtypes

In [None]:
import sklearn.pipeline
import sklearn.compose
import sklearn.preprocessing
from sklearn.experimental import enable_hist_gradient_boosting  # noqa
import sklearn.ensemble

In [None]:
def transform_datetime(X):
    out = []
    for k, v in X.items():
        out.append(v.dt.dayofweek.rename(f"{k}_dow"))
        out.append(v.dt.hour.rename(f"{k}_hour"))

    return pd.concat(out, axis="columns")

def transform_location(X):    
    return (X["PULocationID"] == X["DOLocationID"]).to_frame(name="same_location")


# Dummy encode the categorical columns
onehot_columns = list(sdf.select_dtypes(include="category"))
onehot_categories = [sdf[col].dtype.categories for col in onehot_columns]
ohe = sklearn.preprocessing.OneHotEncoder(categories=onehot_categories, sparse=False)

# Datetime & Dummy encode the datetime columns
datetime_columns = list(sdf.select_dtypes(include="datetime"))
dte = sklearn.pipeline.make_pipeline(
    sklearn.preprocessing.FunctionTransformer(transform_datetime),
    sklearn.preprocessing.OneHotEncoder( 
        categories=[
            list(range(7)),  # day of week
            list(range(24))  # hour
        ] * len(datetime_columns),
        sparse=False,
    )
)

# Location encode location IDs
location_columns = ["PULocationID", "DOLocationID"]
le = sklearn.preprocessing.FunctionTransformer(transform_location)

# Compose the pipeline
preprocess = sklearn.compose.make_column_transformer(
    (ohe, onehot_columns),
    (dte, datetime_columns),
    (le, location_columns),
    remainder="passthrough"
)

scale = sklearn.preprocessing.StandardScaler()

pipe = sklearn.pipeline.make_pipeline(
    preprocess,
    scale,
    sklearn.ensemble.GradientBoostingClassifier(),
)

In [None]:
sklearn.set_config(display="diagram")
pipe

In [None]:
%time _ = pipe.fit(X, y)

In [None]:
pipe.score(X, y)

In [None]:
import dask_ml.model_selection

param_grid = {
    "gradientboostingclassifier__learning_rate": [0.001, 0.01, .1, 1],
    "gradientboostingclassifier__max_leaf_nodes": [10, 31, 50],
}
search = dask_ml.model_selection.GridSearchCV(pipe, param_grid)
search

In [None]:
%%time
_ = search.fit(X[:50_000], y[:50_000])

In [None]:
cluster.scale(20)

## 2. Memory-Bound Problems

![](dimensions.png)

Dask-ML also has tools for working with larger-than-memory datasets. We can break this scaling challenge into a couple components

1. Data structures: NumPy & pandas were built for in-memory problems.
2. ML Algorithms: Many algorithms in (e.g.) Scikit-Learn were built for (in-memory) NumPy arrays.

But first, plot your learning rate!

Let's make a very simple / fast to train model.

In [None]:
import sklearn.linear_model

lr = sklearn.linear_model.LogisticRegression(max_iter=500)
pipe = sklearn.pipeline.make_pipeline(preprocess, scale, lr)
pipe

In [None]:
%time _ = pipe.fit(X, y)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

We'll use scikit-learn's built-in [`learning_curve`](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.learning_curve.html) to measure how the model does on various sample sizes (all of which fit in memory). It uses `joblib` internally, so we can use Dask to do the learning curve in parallel on the cluster.

In [None]:
xx = X[:50_000]
yy = y[:50_000]

with joblib.parallel_backend("dask", scatter=[xx, yy]):
    train_sizes, train_scores, test_scores, fit_times, _ = (
        sklearn.model_selection.learning_curve(pipe, xx, yy,
                                               return_times=True)
    )

In [None]:
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
fit_times_mean = np.mean(fit_times, axis=1)
fit_times_std = np.std(fit_times, axis=1)

fig, axes = plt.subplots(nrows=3, figsize=(6, 15))
# Plot learning curve
axes[0].grid()
axes[0].fill_between(train_sizes, train_scores_mean - train_scores_std,
                     train_scores_mean + train_scores_std, alpha=0.1,
                     color="r")
axes[0].fill_between(train_sizes, test_scores_mean - test_scores_std,
                     test_scores_mean + test_scores_std, alpha=0.1,
                     color="g")
axes[0].plot(train_sizes, train_scores_mean, 'o-', color="r",
             label="Training score")
axes[0].plot(train_sizes, test_scores_mean, 'o-', color="g",
             label="Cross-validation score")
axes[0].legend(loc="best")

# Plot n_samples vs fit_times
axes[1].grid()
axes[1].plot(train_sizes, fit_times_mean, 'o-')
axes[1].fill_between(train_sizes, fit_times_mean - fit_times_std,
                     fit_times_mean + fit_times_std, alpha=0.1)
axes[1].set_xlabel("Training examples")
axes[1].set_ylabel("fit_times")
axes[1].set_title("Scalability of the model")

# Plot fit_time vs score
axes[2].grid()
axes[2].plot(fit_times_mean, test_scores_mean, 'o-')
axes[2].fill_between(fit_times_mean, test_scores_mean - test_scores_std,
                     test_scores_mean + test_scores_std, alpha=0.1)
axes[2].set_xlabel("fit_times")
axes[2].set_ylabel("Score")
axes[2].set_title("Performance of the model");

So this model overfits at 5,000 samples, but by 15,000 samples has essentially plateaued. Throwing more data at this model isn't really going to improve your fit.

But assuming it did, how would we scale our earlier pipeline? The first issue is that we can't fit it in a NumPy array / pandas DataFrame. So we (the user and Dask-ML) use Dask Array and Dask DataFrame instead.

```python
>>> df = dd.read_csv(...)    # Instead of pandas
>>> arr = da.from_zarr(...)  # Instead of NumPy
```

The second issue was around ML algorithms. Depending on the algorithm, implementing a parallel / distributed version can be difficult (or impossible). Many preprocessing tasks are doable.

* [`dask_ml.preprocessing`](https://ml.dask.org/preprocessing.html)
  - MinMaxScaler
  - QuantileTransformer
  - LabelEncoder
  - OneHotEncoder
  - ...
* [`dask_ml.feature_extraction`](https://ml.dask.org/modules/api.html#dask-ml-feature-extraction-text-feature-extraction)
  - CountVectorizer
  - HashingVectorizer
  - FeatureHasher
  
On the other hand, things like a parallel, distributed SGD or gradient boosted tree are feasible, but a lot of work to implement and maintain.

We'll often continue to re-use scikit-learn & others by using the `partial_fit` API. We feed blocks of a Dask Array / DataFrame to scikit-learn. Since these are just NumPy arrays or pandas DataFrames, scikit-learn already knows what to do. And it can incrementally learn the parameters. See https://ml.dask.org/incremental.html for more.

### Blockwise Ensemble Method

Dask-ML recently added blockwise ensemble voting estimators. They're an interesting way to combine the best of Dask and existing ML libraries like scikit-learn.

Let's generate a big dataset.

In [None]:
import dask
import dask_ml.datasets

X, y = dask_ml.datasets.make_classification(n_samples=10_000_000,
                                            n_features=1000,
                                            n_informative=10,
                                            shift=2, scale=2,
                                            chunks=50_000)
X

We'll persist it in memory on the cluster, and bring back a small sample to play with.

In [None]:
X, y = dask.persist(X, y)
xx, yy = dask.compute(X[:1000], y[:1000])

Take some estimator like `RidgeClassifier`.

In [None]:
subestimator = sklearn.linear_model.RidgeClassifier()
subestimator

In [None]:
%time subestimator.fit(xx, yy)

We can apply this estimator to the whole dataset.

In [None]:
meta = subestimator.predict(xx)
X.map_blocks(subestimator.predict, drop_axis=1, meta=meta)[::1000].compute()

This estimator has only seen a bit of the data.

In [None]:
import sklearn.linear_model
import dask_ml.ensemble

subestimator = sklearn.linear_model.RidgeClassifier(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
    subestimator,
    classes=[0, 1]
)
clf

In [None]:
clf.fit(X, y)

In [None]:
yhat = clf.predict(X)
yhat[::100].compute()