# Scale Scikit-Learn with Dask and Joblib

<img src="https://raw.githubusercontent.com/scikit-learn/scikit-learn/master/doc/logos/scikit-learn-logo-notext.png"/> <img src="http://joblib.readthedocs.io/en/latest/_static/joblib_logo.svg" width="20%"/> 

Many Scikit-Learn operations already support parallelism with [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.

Dask can scale these operations by replacing Joblib's single-machine system with a distributed cluster.  This lets you train those estimators using all the cores of your *cluster*, by changing one line of code. 

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).

In [1]:
from dask.distributed import Client, progress
client = Client()
client.restart()

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:58697, threads: 1>>
Traceback (most recent call last):
  File "C:\Users\Santhosh\Anaconda3\lib\site-packages\psutil\_pswindows.py", line 636, in wrapper
    return fun(self, *args, **kwargs)
  File "C:\Users\Santhosh\Anaconda3\lib\site-packages\psutil\_pswindows.py", line 752, in memory_info
    t = self._get_raw_meminfo()
  File "C:\Users\Santhosh\Anaconda3\lib\site-packages\psutil\_pswindows.py", line 727, in _get_raw_meminfo
    return cext.proc_memory_info(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Santhosh\Anaconda3\lib\site-packages\tornado\ioloop.py", line 1229, in _run
    return self.callback()
  File

0,1
Client  Scheduler: tcp://127.0.0.1:58530  Dashboard: http://127.0.0.1:58533/status,Cluster  Workers: 12  Cores: 12  Memory: 34.08 GB


## Create a Dataset, Model, and Grid Search

In [2]:
# import dask_ml.joblib  # register the distriubted backend
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import numpy as np
import pandas as pd

We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`.

In [3]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]

array([[-1.06377997,  0.67640868,  1.06935647, -0.21758002,  0.46021477,
        -0.39916689, -0.07918751,  1.20938491, -0.78531472, -0.17218611,
        -1.08535744, -0.99311895,  0.30693511,  0.06405769, -1.0542328 ,
        -0.52749607, -0.0741832 , -0.35562842,  1.05721416, -0.90259159],
       [ 0.0708476 , -1.69528125,  2.44944917, -0.5304942 , -0.93296221,
         2.86520354,  2.43572851, -1.61850016,  1.30071691,  0.34840246,
         0.54493439,  0.22532411,  0.60556322, -0.19210097, -0.06802699,
         0.9716812 , -1.79204799,  0.01708348, -0.37566904, -0.62323644],
       [ 0.94028404, -0.49214582,  0.67795602, -0.22775445,  1.40175261,
         1.23165333, -0.77746425,  0.01561602,  1.33171299,  1.08477266,
        -0.97805157, -0.05012039,  0.94838552, -0.17342825, -0.47767184,
         0.76089649,  1.00115812, -0.06946407,  1.35904607, -1.18958963],
       [-0.29951677,  0.75988955,  0.18280267, -1.55023271,  0.33821802,
         0.36324148, -2.10052547, -0.4380675 , -

We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $C$ hyperparameter.

In [4]:
param_grid = {"C": np.logspace(-3, 1, 30),
              "gamma": [0.05, 0.5, 2],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           cv=3,
                           n_jobs=-1)

## Train on a Cluster

To fit that normally, we'd call

```python
grid_search.fit(X, y)
```

To fit it using the cluster, we just need to use a context manager provided by joblib.
We'll pre-scatter the data to each worker, which can help with performance.

In [6]:
%%time

# import dask_ml.joblib
from sklearn.externals import joblib

with joblib.parallel_backend('dask', scatter=[X, y]):
    grid_search.fit(X, y)

Wall time: 1min 46s


We fit 1620 different models, one for each hyper-parameter and CV-split combination. The training was distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

In [7]:
res = pd.DataFrame(grid_search.cv_results_)
res.head()

Unnamed: 0,mean_fit_time,std_fit_time,mean_score_time,std_score_time,param_C,param_gamma,param_kernel,param_shrinking,params,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score
0,0.359617,0.010162,0.025397,0.007508,0.001,0.05,rbf,True,"{'C': 0.001, 'gamma': 0.05, 'kernel': 'rbf', '...",0.502994,0.501502,0.501502,0.502,0.000704,395
1,0.377639,0.004707,0.044737,0.024125,0.001,0.05,rbf,False,"{'C': 0.001, 'gamma': 0.05, 'kernel': 'rbf', '...",0.502994,0.501502,0.501502,0.502,0.000704,395
2,0.276921,0.020757,0.031485,0.017138,0.001,0.05,poly,True,"{'C': 0.001, 'gamma': 0.05, 'kernel': 'poly', ...",0.502994,0.501502,0.501502,0.502,0.000704,395
3,0.326826,0.026213,0.021993,0.012968,0.001,0.05,poly,False,"{'C': 0.001, 'gamma': 0.05, 'kernel': 'poly', ...",0.502994,0.501502,0.501502,0.502,0.000704,395
4,0.429173,0.000419,0.036755,0.015399,0.001,0.05,sigmoid,True,"{'C': 0.001, 'gamma': 0.05, 'kernel': 'sigmoid...",0.502994,0.501502,0.501502,0.502,0.000704,395


## The result is just a Scikit-Learn object

Dask didn't replace Scikti-Learn, so everything is the same as you are accustomed to.

In [8]:
grid_search.score(X, y)

0.986

For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html).