Dask-ML
====================================================================

__(initial notes are courtesy of the Dask project homepage at ml.dask.org)__

Dask-ML provides scalable machine learning in Python using [Dask](https://dask.org/) alongside popular machine learning libraries like [Scikit-Learn](http://scikit-learn.org/).

The idea is to support Pandas + Scikit style ML for parallel scenarios, with code patterns you're used to:

```python
import dask.dataframe as dd
df = dd.read_parquet('...')
data = df[['age', 'income', 'married']]
labels = df['outcome']

from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression()
lr.fit(data, labels)
```

How does this work?
-------------------------------------------------------------------------------------------

Modern machine learning algorithms employ a wide variety of techniques. Scaling these requires a similarly wide variety of different approaches. Generally solutions fall into the following three categories:

### Parallelize Scikit-Learn Directly

Scikit-Learn already provides parallel computing on a single machine with [Joblib](http://joblib.readthedocs.io/en/latest/). Dask extends this parallelism to many machines in a cluster. This works well for modest data sizes but large computations, such as random forests, hyper-parameter optimization, and more.

```python
from dask.distributed import Client
import joblib

client = Client()  # Connect to a Dask Cluster

with joblib.parallel_backend('dask'):
    # Your normal scikit-learn code here
```

See [Dask-ML Joblib documentation](https://ml.dask.org/joblib.html) for more information.

*Note that this is an active collaboration with the Scikit-Learn development team. This functionality is progressing quickly but is in a state of rapid change.*

### Reimplement Scalable Algorithms with Dask Array

Some machine learning algorithms are easy to write down as Numpy algorithms. In these cases we can replace Numpy arrays with Dask arrays to achieve scalable algorithms easily. This is employed for [linear models](https://ml.dask.org/glm.html), [pre-processing](https://ml.dask.org/preprocessing.html), and [clustering](https://ml.dask.org/clustering.html).

```python
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression

lr = LogisticRegression()
lr.fit(data, labels)
```

### Partner with other distributed libraries

Other machine learning libraries like XGBoost and TensorFlow already have distributed solutions that work quite well. Dask-ML makes no attempt to re-implement these systems. Instead, Dask-ML makes it easy to use normal Dask workflows to prepare and set up data, then it deploys XGBoost or Tensorflow *alongside* Dask, and hands the data over.

```python
from dask_ml.xgboost import XGBRegressor

est = XGBRegressor(...)
est.fit(train, train_labels)
```

See [Dask-ML + XGBoost](https://ml.dask.org/xgboost.html) or [Dask-ML + TensorFlow](https://ml.dask.org/tensorflow.html) documentation for more information.

Scikit-Learn API[](https://ml.dask.org/#scikit-learn-api "Permalink to this headline")
--------------------------------------------------------------------------------------

In all cases Dask-ML endeavors to provide a single unified interface around the familiar NumPy, Pandas, and Scikit-Learn APIs. Users familiar with Scikit-Learn should feel at home with Dask-ML.

* * *

# Let's try it:

In [None]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1, memory_limit='1GB')

client

In [None]:
import dask.dataframe

ddf = dask.dataframe.read_parquet('data/california')

ddf

In [None]:
ddf2 = ddf[['delay', 'distance', 'origin']]
ddf3 = ddf2[ddf2.origin.isin(['SFO', 'OAK', 'SJC'])]

In [None]:
ddf3.head(npartitions=-1) # look at all partitions

In [None]:
ddf4 = ddf3.categorize()
ddf4

In [None]:
prepared = dask.dataframe.reshape.get_dummies(ddf4)
prepared

Alternatively, we could use scikit-learn style preprocessing steps, though the relevant APIs are still evolving a bit:

```python
from sklearn.pipeline import make_pipeline
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LinearRegression

pipe = make_pipeline(
    Categorizer(),
    DummyEncoder()
)

pipe.fit(ddf)

prepared = pipe.transform(ddf)
```

In [None]:
y = prepared.delay.to_dask_array(lengths=True)
y

In [None]:
X = prepared.drop('delay', axis=1).to_dask_array(lengths=True)
X

We can see that the chunks define regular Numpy arrays

In [None]:
type(X.blocks[0].compute())

Let's "rechunk" the arrays:

In [None]:
chunksize = 15000 # rows/records
X = X.rechunk(chunks=chunksize)
X

In [None]:
y = y.rechunk(chunks=chunksize)

In [None]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y)

X_train

In [None]:
y_train

In [None]:
from dask_ml.linear_model import LinearRegression

lr = LinearRegression(solver='lbfgs')
lr_model = lr.fit(X_train, y_train)

In [None]:
lr_model.coef_

In [None]:
y_predicted = lr_model.predict(X_test)

y_predicted

In [None]:
y_test

We knew (from our exploratory analysis and plots) that we wouldn't get anything meaningful from a linear regression ... let's confirm that null hypothesis :)

In [None]:
from dask_ml.metrics import mean_squared_error
from math import sqrt

sqrt(mean_squared_error(y_test, y_predicted))

In [None]:
y.std().compute()

In [None]:
client.close()